Unverified Commit f458b376 authored by Nick Jüttner's avatar Nick Jüttner Committed by GitHub
Browse files

Merge pull request #589 from jessfraz/cache-results-azure

add cache to limit calls to providers
parents 933b7ff9 4759789a
......@@ -160,7 +160,7 @@ func main() {
case "noop":
r, err = registry.NewNoopRegistry(p)
case "txt":
r, err = registry.NewTXTRegistry(p, cfg.TXTPrefix, cfg.TXTOwnerID)
r, err = registry.NewTXTRegistry(p, cfg.TXTPrefix, cfg.TXTOwnerID, cfg.TXTCacheInterval)
case "aws-sd":
r, err = registry.NewAWSSDRegistry(p.(*provider.AWSSDProvider), cfg.TXTOwnerID)
default:
......
......@@ -78,6 +78,7 @@ type Config struct {
LogFormat string
MetricsAddress string
LogLevel string
TXTCacheInterval time.Duration
}
var defaultConfig = &Config{
......@@ -112,6 +113,7 @@ var defaultConfig = &Config{
Registry: "txt",
TXTOwnerID: "default",
TXTPrefix: "",
TXTCacheInterval: time.Hour,
Interval: time.Minute,
Once: false,
DryRun: false,
......@@ -201,6 +203,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("txt-prefix", "When using the TXT registry, a custom string that's prefixed to each ownership DNS record (optional)").Default(defaultConfig.TXTPrefix).StringVar(&cfg.TXTPrefix)
// Flags related to the main control loop
app.Flag("txt-cache-interval", "The interval between cache synchronizations in duration format (default: 1h)").Default(defaultConfig.TXTCacheInterval.String()).DurationVar(&cfg.TXTCacheInterval)
app.Flag("interval", "The interval between two consecutive synchronizations in duration format (default: 1m)").Default(defaultConfig.Interval.String()).DurationVar(&cfg.Interval)
app.Flag("once", "When enabled, exits the synchronization loop after the first iteration (default: disabled)").BoolVar(&cfg.Once)
app.Flag("dry-run", "When enabled, prints DNS record changes rather than actually performing them (default: disabled)").BoolVar(&cfg.DryRun)
......
......@@ -57,6 +57,7 @@ var (
Registry: "txt",
TXTOwnerID: "default",
TXTPrefix: "",
TXTCacheInterval: time.Hour,
Interval: time.Minute,
Once: false,
DryRun: false,
......@@ -95,6 +96,7 @@ var (
Registry: "noop",
TXTOwnerID: "owner-1",
TXTPrefix: "associated-txt-record",
TXTCacheInterval: 12 * time.Hour,
Interval: 10 * time.Minute,
Once: true,
DryRun: true,
......@@ -157,6 +159,7 @@ func TestParseFlags(t *testing.T) {
"--registry=noop",
"--txt-owner-id=owner-1",
"--txt-prefix=associated-txt-record",
"--txt-cache-interval=12h",
"--interval=10m",
"--once",
"--dry-run",
......@@ -200,6 +203,7 @@ func TestParseFlags(t *testing.T) {
"EXTERNAL_DNS_REGISTRY": "noop",
"EXTERNAL_DNS_TXT_OWNER_ID": "owner-1",
"EXTERNAL_DNS_TXT_PREFIX": "associated-txt-record",
"EXTERNAL_DNS_TXT_CACHE_INTERVAL": "12h",
"EXTERNAL_DNS_INTERVAL": "10m",
"EXTERNAL_DNS_ONCE": "1",
"EXTERNAL_DNS_DRY_RUN": "1",
......
......@@ -18,12 +18,14 @@ package registry
import (
"errors"
"time"
"strings"
"github.com/kubernetes-incubator/external-dns/endpoint"
"github.com/kubernetes-incubator/external-dns/plan"
"github.com/kubernetes-incubator/external-dns/provider"
log "github.com/sirupsen/logrus"
)
// TXTRegistry implements registry interface with ownership implemented via associated TXT records
......@@ -31,10 +33,15 @@ type TXTRegistry struct {
provider provider.Provider
ownerID string //refers to the owner id of the current instance
mapper nameMapper
// cache the records in memory and update on an interval instead.
recordsCache []*endpoint.Endpoint
recordsCacheRefreshTime time.Time
cacheInterval time.Duration
}
// NewTXTRegistry returns new TXTRegistry object
func NewTXTRegistry(provider provider.Provider, txtPrefix, ownerID string) (*TXTRegistry, error) {
func NewTXTRegistry(provider provider.Provider, txtPrefix, ownerID string, cacheInterval time.Duration) (*TXTRegistry, error) {
if ownerID == "" {
return nil, errors.New("owner id cannot be empty")
}
......@@ -42,9 +49,10 @@ func NewTXTRegistry(provider provider.Provider, txtPrefix, ownerID string) (*TXT
mapper := newPrefixNameMapper(txtPrefix)
return &TXTRegistry{
provider: provider,
ownerID: ownerID,
mapper: mapper,
provider: provider,
ownerID: ownerID,
mapper: mapper,
cacheInterval: cacheInterval,
}, nil
}
......@@ -52,6 +60,13 @@ func NewTXTRegistry(provider provider.Provider, txtPrefix, ownerID string) (*TXT
// If TXT records was created previously to indicate ownership its corresponding value
// will be added to the endpoints Labels map
func (im *TXTRegistry) Records() ([]*endpoint.Endpoint, error) {
// If we have the zones cached AND we have refreshed the cache since the
// last given interval, then just use the cached results.
if im.recordsCache != nil && time.Since(im.recordsCacheRefreshTime) < im.cacheInterval {
log.Debug("Using cached records.")
return im.recordsCache, nil
}
records, err := im.provider.Records()
if err != nil {
return nil, err
......@@ -91,6 +106,10 @@ func (im *TXTRegistry) Records() ([]*endpoint.Endpoint, error) {
}
}
// Update the cache.
im.recordsCache = endpoints
im.recordsCacheRefreshTime = time.Now()
return endpoints, nil
}
......@@ -107,6 +126,11 @@ func (im *TXTRegistry) ApplyChanges(changes *plan.Changes) error {
r.Labels[endpoint.OwnerLabelKey] = im.ownerID
txt := endpoint.NewEndpoint(im.mapper.toTXTName(r.DNSName), endpoint.RecordTypeTXT, r.Labels.Serialize(true))
filteredChanges.Create = append(filteredChanges.Create, txt)
// Add to the cache.
if im.recordsCache != nil {
im.recordsCache = append(im.recordsCache, txt)
}
}
for _, r := range filteredChanges.Delete {
......@@ -115,12 +139,18 @@ func (im *TXTRegistry) ApplyChanges(changes *plan.Changes) error {
// when we delete TXT records for which value has changed (due to new label) this would still work because
// !!! TXT record value is uniquely generated from the Labels of the endpoint. Hence old TXT record can be uniquely reconstructed
filteredChanges.Delete = append(filteredChanges.Delete, txt)
// Remove from the cache.
im.removeFromCache(txt)
}
// make sure TXT records are consistently updated as well
for _, r := range filteredChanges.UpdateNew {
txt := endpoint.NewEndpoint(im.mapper.toTXTName(r.DNSName), endpoint.RecordTypeTXT, r.Labels.Serialize(true))
filteredChanges.UpdateNew = append(filteredChanges.UpdateNew, txt)
// Update the cache.
im.updateCache(txt)
}
// make sure TXT records are consistently updated as well
for _, r := range filteredChanges.UpdateOld {
......@@ -128,6 +158,9 @@ func (im *TXTRegistry) ApplyChanges(changes *plan.Changes) error {
// when we updateOld TXT records for which value has changed (due to new label) this would still work because
// !!! TXT record value is uniquely generated from the Labels of the endpoint. Hence old TXT record can be uniquely reconstructed
filteredChanges.UpdateOld = append(filteredChanges.UpdateOld, txt)
// Update the cache.
im.updateCache(txt)
}
return im.provider.ApplyChanges(filteredChanges)
......@@ -167,3 +200,36 @@ func (pr prefixNameMapper) toEndpointName(txtDNSName string) string {
func (pr prefixNameMapper) toTXTName(endpointDNSName string) string {
return pr.prefix + endpointDNSName
}
func (im *TXTRegistry) removeFromCache(txt *endpoint.Endpoint) {
if im.recordsCache == nil || txt == nil {
// return early.
return
}
for i, e := range im.recordsCache {
if e.DNSName == txt.DNSName && e.RecordType == txt.RecordType {
// We found a match delete the endpoint from the cache.
im.recordsCache = append(im.recordsCache[:i], im.recordsCache[i+1:]...)
return
}
}
}
func (im *TXTRegistry) updateCache(txt *endpoint.Endpoint) {
if im.recordsCache == nil || txt == nil {
// return early.
return
}
for i, e := range im.recordsCache {
if e.DNSName == txt.DNSName && e.RecordType == txt.RecordType {
// We found a match update the endpoint in the cache.
im.recordsCache[i] = txt
return
}
}
// We couldn't find a match so let's just add it to the cache.
im.recordsCache = append(im.recordsCache, txt)
}
......@@ -18,6 +18,7 @@ package registry
import (
"testing"
"time"
"github.com/kubernetes-incubator/external-dns/endpoint"
"github.com/kubernetes-incubator/external-dns/internal/testutils"
......@@ -40,10 +41,10 @@ func TestTXTRegistry(t *testing.T) {
func testTXTRegistryNew(t *testing.T) {
p := provider.NewInMemoryProvider()
_, err := NewTXTRegistry(p, "txt", "")
_, err := NewTXTRegistry(p, "txt", "", time.Hour)
require.Error(t, err)
r, err := NewTXTRegistry(p, "txt", "owner")
r, err := NewTXTRegistry(p, "txt", "owner", time.Hour)
require.NoError(t, err)
_, ok := r.mapper.(prefixNameMapper)
......@@ -51,7 +52,7 @@ func testTXTRegistryNew(t *testing.T) {
assert.Equal(t, "owner", r.ownerID)
assert.Equal(t, p, r.provider)
r, err = NewTXTRegistry(p, "", "owner")
r, err = NewTXTRegistry(p, "", "owner", time.Hour)
require.NoError(t, err)
_, ok = r.mapper.(prefixNameMapper)
......@@ -130,7 +131,7 @@ func testTXTRegistryRecordsPrefixed(t *testing.T) {
},
}
r, _ := NewTXTRegistry(p, "txt.", "owner")
r, _ := NewTXTRegistry(p, "txt.", "owner", time.Hour)
records, _ := r.Records()
assert.True(t, testutils.SameEndpoints(records, expectedRecords))
......@@ -204,7 +205,7 @@ func testTXTRegistryRecordsNoPrefix(t *testing.T) {
},
}
r, _ := NewTXTRegistry(p, "", "owner")
r, _ := NewTXTRegistry(p, "", "owner", time.Hour)
records, _ := r.Records()
assert.True(t, testutils.SameEndpoints(records, expectedRecords))
......@@ -231,7 +232,7 @@ func testTXTRegistryApplyChangesWithPrefix(t *testing.T) {
newEndpointWithOwner("txt.foobar.test-zone.example.org", "\"heritage=external-dns,external-dns/owner=owner\"", endpoint.RecordTypeTXT, ""),
},
})
r, _ := NewTXTRegistry(p, "txt.", "owner")
r, _ := NewTXTRegistry(p, "txt.", "owner", time.Hour)
changes := &plan.Changes{
Create: []*endpoint.Endpoint{
......@@ -300,7 +301,7 @@ func testTXTRegistryApplyChangesNoPrefix(t *testing.T) {
newEndpointWithOwner("foobar.test-zone.example.org", "\"heritage=external-dns,external-dns/owner=owner\"", endpoint.RecordTypeTXT, ""),
},
})
r, _ := NewTXTRegistry(p, "", "owner")
r, _ := NewTXTRegistry(p, "", "owner", time.Hour)
changes := &plan.Changes{
Create: []*endpoint.Endpoint{
......@@ -347,6 +348,55 @@ func testTXTRegistryApplyChangesNoPrefix(t *testing.T) {
require.NoError(t, err)
}
func TestCacheMethods(t *testing.T) {
cache := []*endpoint.Endpoint{
newEndpointWithOwner("thing.com", "1.2.3.4", "A", "owner"),
newEndpointWithOwner("thing1.com", "1.2.3.6", "A", "owner"),
newEndpointWithOwner("thing2.com", "1.2.3.4", "CNAME", "owner"),
newEndpointWithOwner("thing3.com", "1.2.3.4", "A", "owner"),
newEndpointWithOwner("thing4.com", "1.2.3.4", "A", "owner"),
}
registry := &TXTRegistry{
recordsCache: cache,
cacheInterval: time.Hour,
}
// test updating a record.
registry.updateCache(newEndpointWithOwner("thing.com", "1.2.3.6", "A", "owner2"))
found := false
// ensure it was updated
for _, e := range registry.recordsCache {
if e.DNSName == "thing.com" && e.RecordType == "A" {
t.Logf("targets: %#v", e.Targets)
if e.Targets.Same([]string{"1.2.3.6"}) {
found = true
break
}
}
}
if !found {
t.Fatal("could not find updated record in cache")
}
// test deleting a record
registry.removeFromCache(newEndpointWithOwner("thing.com", "1.2.3.6", "A", "owner2"))
// ensure it was deleted
found = false
for _, e := range registry.recordsCache {
if e.DNSName == "thing.com" && e.RecordType == "A" {
if e.Targets.Same([]string{"1.2.3.6"}) {
found = true
break
}
}
}
if found {
t.Fatal("should not have been able to find record after deleting")
}
}
/**
helper methods
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment