cmd/devp2p: be very correct about route53 change splitting (#20820)

Turns out the way RDATA limits work is documented after all,
I just didn't search right. The trick to make it work is to
count UPSERTs twice.

This also adds an additional check to ensure TTL changes are
applied on existing records.
This commit is contained in:
Felix Lange 2020-03-26 23:55:33 +01:00 committed by GitHub
parent 87a411b839
commit d3c1e654f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 14 deletions

@ -32,11 +32,13 @@ import (
"gopkg.in/urfave/cli.v1" "gopkg.in/urfave/cli.v1"
) )
// Route53 limits change sets to 32k of 'RDATA size'. DNS changes need to be split up into const (
// multiple batches to work around the limit. Unfortunately I cannot find any // Route53 limits change sets to 32k of 'RDATA size'. Change sets are also limited to
// documentation explaining how the RDATA size of a change set is computed and the best we // 1000 items. UPSERTs count double.
// can do is estimate it. For this reason, our internal limit is much lower than 32k. // https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/DNSLimitations.html#limits-api-requests-changeresourcerecordsets
const route53ChangeLimit = 20000 route53ChangeSizeLimit = 32000
route53ChangeCountLimit = 1000
)
var ( var (
route53AccessKeyFlag = cli.StringFlag{ route53AccessKeyFlag = cli.StringFlag{
@ -104,7 +106,7 @@ func (c *route53Client) deploy(name string, t *dnsdisc.Tree) error {
} }
// Submit change batches. // Submit change batches.
batches := splitChanges(changes, route53ChangeLimit) batches := splitChanges(changes, route53ChangeSizeLimit, route53ChangeCountLimit)
for i, changes := range batches { for i, changes := range batches {
log.Info(fmt.Sprintf("Submitting %d changes to Route53", len(changes))) log.Info(fmt.Sprintf("Submitting %d changes to Route53", len(changes)))
batch := new(route53.ChangeBatch) batch := new(route53.ChangeBatch)
@ -178,7 +180,7 @@ func (c *route53Client) computeChanges(name string, records map[string]string, e
// Entry is unknown, push a new one // Entry is unknown, push a new one
log.Info(fmt.Sprintf("Creating %s = %q", path, val)) log.Info(fmt.Sprintf("Creating %s = %q", path, val))
changes = append(changes, newTXTChange("CREATE", path, ttl, splitTXT(val))) changes = append(changes, newTXTChange("CREATE", path, ttl, splitTXT(val)))
} else if prevValue != val { } else if prevValue != val || prevRecords.ttl != ttl {
// Entry already exists, only change its content. // Entry already exists, only change its content.
log.Info(fmt.Sprintf("Updating %s from %q to %q", path, prevValue, val)) log.Info(fmt.Sprintf("Updating %s from %q to %q", path, prevValue, val))
changes = append(changes, newTXTChange("UPSERT", path, ttl, splitTXT(val))) changes = append(changes, newTXTChange("UPSERT", path, ttl, splitTXT(val)))
@ -214,18 +216,26 @@ func sortChanges(changes []*route53.Change) {
// splitChanges splits up DNS changes such that each change batch // splitChanges splits up DNS changes such that each change batch
// is smaller than the given RDATA limit. // is smaller than the given RDATA limit.
func splitChanges(changes []*route53.Change, limit int) [][]*route53.Change { func splitChanges(changes []*route53.Change, sizeLimit, countLimit int) [][]*route53.Change {
var batches [][]*route53.Change var (
var batchSize int batches [][]*route53.Change
batchSize int
batchCount int
)
for _, ch := range changes { for _, ch := range changes {
// Start new batch if this change pushes the current one over the limit. // Start new batch if this change pushes the current one over the limit.
size := changeSize(ch) count := changeCount(ch)
if len(batches) == 0 || batchSize+size > limit { size := changeSize(ch) * count
overSize := batchSize+size > sizeLimit
overCount := batchCount+count > countLimit
if len(batches) == 0 || overSize || overCount {
batches = append(batches, nil) batches = append(batches, nil)
batchSize = 0 batchSize = 0
batchCount = 0
} }
batches[len(batches)-1] = append(batches[len(batches)-1], ch) batches[len(batches)-1] = append(batches[len(batches)-1], ch)
batchSize += size batchSize += size
batchCount += count
} }
return batches return batches
} }
@ -241,6 +251,13 @@ func changeSize(ch *route53.Change) int {
return size return size
} }
func changeCount(ch *route53.Change) int {
if *ch.Action == "UPSERT" {
return 2
}
return 1
}
// collectRecords collects all TXT records below the given name. // collectRecords collects all TXT records below the given name.
func (c *route53Client) collectRecords(name string) (map[string]recordSet, error) { func (c *route53Client) collectRecords(name string) (map[string]recordSet, error) {
log.Info(fmt.Sprintf("Retrieving existing TXT records on %s (%s)", name, c.zoneID)) log.Info(fmt.Sprintf("Retrieving existing TXT records on %s (%s)", name, c.zoneID))

@ -140,11 +140,23 @@ func TestRoute53ChangeSort(t *testing.T) {
t.Fatalf("wrong changes (got %d, want %d)", len(changes), len(wantChanges)) t.Fatalf("wrong changes (got %d, want %d)", len(changes), len(wantChanges))
} }
// Check splitting according to size.
wantSplit := [][]*route53.Change{ wantSplit := [][]*route53.Change{
wantChanges[:4], wantChanges[:4],
wantChanges[4:8], wantChanges[4:6],
wantChanges[6:],
} }
split := splitChanges(changes, 600) split := splitChanges(changes, 600, 4000)
if !reflect.DeepEqual(split, wantSplit) {
t.Fatalf("wrong split batches: got %d, want %d", len(split), len(wantSplit))
}
// Check splitting according to count.
wantSplit = [][]*route53.Change{
wantChanges[:5],
wantChanges[5:],
}
split = splitChanges(changes, 10000, 6)
if !reflect.DeepEqual(split, wantSplit) { if !reflect.DeepEqual(split, wantSplit) {
t.Fatalf("wrong split batches: got %d, want %d", len(split), len(wantSplit)) t.Fatalf("wrong split batches: got %d, want %d", len(split), len(wantSplit))
} }