common/prque: generic priority queue (#26290)
* common, core, eth, les, trie: make prque generic * les/vflux/server: fixed issues in priorityPool * common, core, eth, les, trie: make priority also generic in prque * les/flowcontrol: add test case for priority accumulator overflow * les/flowcontrol: avoid priority value overflow * common/prque: use int priority in some tests No need to convert to int64 when we can just change the type used by the queue. * common/prque: remove comment about int64 range --------- Co-authored-by: Zsolt Felfoldi <zsfelfoldi@gmail.com> Co-authored-by: Felix Lange <fjl@twurst.com>
This commit is contained in:
parent
6a148dd5c3
commit
bf1798e04e
@ -21,6 +21,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common/mclock"
|
"github.com/ethereum/go-ethereum/common/mclock"
|
||||||
|
"golang.org/x/exp/constraints"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LazyQueue is a priority queue data structure where priorities can change over
|
// LazyQueue is a priority queue data structure where priorities can change over
|
||||||
@ -32,31 +33,31 @@ import (
|
|||||||
//
|
//
|
||||||
// If the upper estimate is exceeded then Update should be called for that item.
|
// If the upper estimate is exceeded then Update should be called for that item.
|
||||||
// A global Refresh function should also be called periodically.
|
// A global Refresh function should also be called periodically.
|
||||||
type LazyQueue struct {
|
type LazyQueue[P constraints.Ordered, V any] struct {
|
||||||
clock mclock.Clock
|
clock mclock.Clock
|
||||||
// Items are stored in one of two internal queues ordered by estimated max
|
// Items are stored in one of two internal queues ordered by estimated max
|
||||||
// priority until the next and the next-after-next refresh. Update and Refresh
|
// priority until the next and the next-after-next refresh. Update and Refresh
|
||||||
// always places items in queue[1].
|
// always places items in queue[1].
|
||||||
queue [2]*sstack
|
queue [2]*sstack[P, V]
|
||||||
popQueue *sstack
|
popQueue *sstack[P, V]
|
||||||
period time.Duration
|
period time.Duration
|
||||||
maxUntil mclock.AbsTime
|
maxUntil mclock.AbsTime
|
||||||
indexOffset int
|
indexOffset int
|
||||||
setIndex SetIndexCallback
|
setIndex SetIndexCallback[V]
|
||||||
priority PriorityCallback
|
priority PriorityCallback[P, V]
|
||||||
maxPriority MaxPriorityCallback
|
maxPriority MaxPriorityCallback[P, V]
|
||||||
lastRefresh1, lastRefresh2 mclock.AbsTime
|
lastRefresh1, lastRefresh2 mclock.AbsTime
|
||||||
}
|
}
|
||||||
|
|
||||||
type (
|
type (
|
||||||
PriorityCallback func(data interface{}) int64 // actual priority callback
|
PriorityCallback[P constraints.Ordered, V any] func(data V) P // actual priority callback
|
||||||
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
|
MaxPriorityCallback[P constraints.Ordered, V any] func(data V, until mclock.AbsTime) P // estimated maximum priority callback
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewLazyQueue creates a new lazy queue
|
// NewLazyQueue creates a new lazy queue
|
||||||
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
|
func NewLazyQueue[P constraints.Ordered, V any](setIndex SetIndexCallback[V], priority PriorityCallback[P, V], maxPriority MaxPriorityCallback[P, V], clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue[P, V] {
|
||||||
q := &LazyQueue{
|
q := &LazyQueue[P, V]{
|
||||||
popQueue: newSstack(nil, false),
|
popQueue: newSstack[P, V](nil),
|
||||||
setIndex: setIndex,
|
setIndex: setIndex,
|
||||||
priority: priority,
|
priority: priority,
|
||||||
maxPriority: maxPriority,
|
maxPriority: maxPriority,
|
||||||
@ -71,13 +72,13 @@ func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPrior
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Reset clears the contents of the queue
|
// Reset clears the contents of the queue
|
||||||
func (q *LazyQueue) Reset() {
|
func (q *LazyQueue[P, V]) Reset() {
|
||||||
q.queue[0] = newSstack(q.setIndex0, false)
|
q.queue[0] = newSstack[P, V](q.setIndex0)
|
||||||
q.queue[1] = newSstack(q.setIndex1, false)
|
q.queue[1] = newSstack[P, V](q.setIndex1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Refresh performs queue re-evaluation if necessary
|
// Refresh performs queue re-evaluation if necessary
|
||||||
func (q *LazyQueue) Refresh() {
|
func (q *LazyQueue[P, V]) Refresh() {
|
||||||
now := q.clock.Now()
|
now := q.clock.Now()
|
||||||
for time.Duration(now-q.lastRefresh2) >= q.period*2 {
|
for time.Duration(now-q.lastRefresh2) >= q.period*2 {
|
||||||
q.refresh(now)
|
q.refresh(now)
|
||||||
@ -87,10 +88,10 @@ func (q *LazyQueue) Refresh() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// refresh re-evaluates items in the older queue and swaps the two queues
|
// refresh re-evaluates items in the older queue and swaps the two queues
|
||||||
func (q *LazyQueue) refresh(now mclock.AbsTime) {
|
func (q *LazyQueue[P, V]) refresh(now mclock.AbsTime) {
|
||||||
q.maxUntil = now.Add(q.period)
|
q.maxUntil = now.Add(q.period)
|
||||||
for q.queue[0].Len() != 0 {
|
for q.queue[0].Len() != 0 {
|
||||||
q.Push(heap.Pop(q.queue[0]).(*item).value)
|
q.Push(heap.Pop(q.queue[0]).(*item[P, V]).value)
|
||||||
}
|
}
|
||||||
q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
|
q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
|
||||||
q.indexOffset = 1 - q.indexOffset
|
q.indexOffset = 1 - q.indexOffset
|
||||||
@ -98,22 +99,22 @@ func (q *LazyQueue) refresh(now mclock.AbsTime) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Push adds an item to the queue
|
// Push adds an item to the queue
|
||||||
func (q *LazyQueue) Push(data interface{}) {
|
func (q *LazyQueue[P, V]) Push(data V) {
|
||||||
heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)})
|
heap.Push(q.queue[1], &item[P, V]{data, q.maxPriority(data, q.maxUntil)})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update updates the upper priority estimate for the item with the given queue index
|
// Update updates the upper priority estimate for the item with the given queue index
|
||||||
func (q *LazyQueue) Update(index int) {
|
func (q *LazyQueue[P, V]) Update(index int) {
|
||||||
q.Push(q.Remove(index))
|
q.Push(q.Remove(index))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pop removes and returns the item with the greatest actual priority
|
// Pop removes and returns the item with the greatest actual priority
|
||||||
func (q *LazyQueue) Pop() (interface{}, int64) {
|
func (q *LazyQueue[P, V]) Pop() (V, P) {
|
||||||
var (
|
var (
|
||||||
resData interface{}
|
resData V
|
||||||
resPri int64
|
resPri P
|
||||||
)
|
)
|
||||||
q.MultiPop(func(data interface{}, priority int64) bool {
|
q.MultiPop(func(data V, priority P) bool {
|
||||||
resData = data
|
resData = data
|
||||||
resPri = priority
|
resPri = priority
|
||||||
return false
|
return false
|
||||||
@ -123,7 +124,7 @@ func (q *LazyQueue) Pop() (interface{}, int64) {
|
|||||||
|
|
||||||
// peekIndex returns the index of the internal queue where the item with the
|
// peekIndex returns the index of the internal queue where the item with the
|
||||||
// highest estimated priority is or -1 if both are empty
|
// highest estimated priority is or -1 if both are empty
|
||||||
func (q *LazyQueue) peekIndex() int {
|
func (q *LazyQueue[P, V]) peekIndex() int {
|
||||||
if q.queue[0].Len() != 0 {
|
if q.queue[0].Len() != 0 {
|
||||||
if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
|
if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
|
||||||
return 1
|
return 1
|
||||||
@ -139,17 +140,17 @@ func (q *LazyQueue) peekIndex() int {
|
|||||||
// MultiPop pops multiple items from the queue and is more efficient than calling
|
// MultiPop pops multiple items from the queue and is more efficient than calling
|
||||||
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
|
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
|
||||||
// when the callback returns false or there are no more items to pop.
|
// when the callback returns false or there are no more items to pop.
|
||||||
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
|
func (q *LazyQueue[P, V]) MultiPop(callback func(data V, priority P) bool) {
|
||||||
nextIndex := q.peekIndex()
|
nextIndex := q.peekIndex()
|
||||||
for nextIndex != -1 {
|
for nextIndex != -1 {
|
||||||
data := heap.Pop(q.queue[nextIndex]).(*item).value
|
data := heap.Pop(q.queue[nextIndex]).(*item[P, V]).value
|
||||||
heap.Push(q.popQueue, &item{data, q.priority(data)})
|
heap.Push(q.popQueue, &item[P, V]{data, q.priority(data)})
|
||||||
nextIndex = q.peekIndex()
|
nextIndex = q.peekIndex()
|
||||||
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
|
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
|
||||||
i := heap.Pop(q.popQueue).(*item)
|
i := heap.Pop(q.popQueue).(*item[P, V])
|
||||||
if !callback(i.value, i.priority) {
|
if !callback(i.value, i.priority) {
|
||||||
for q.popQueue.Len() != 0 {
|
for q.popQueue.Len() != 0 {
|
||||||
q.Push(heap.Pop(q.popQueue).(*item).value)
|
q.Push(heap.Pop(q.popQueue).(*item[P, V]).value)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -159,31 +160,28 @@ func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) boo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PopItem pops the item from the queue only, dropping the associated priority value.
|
// PopItem pops the item from the queue only, dropping the associated priority value.
|
||||||
func (q *LazyQueue) PopItem() interface{} {
|
func (q *LazyQueue[P, V]) PopItem() V {
|
||||||
i, _ := q.Pop()
|
i, _ := q.Pop()
|
||||||
return i
|
return i
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove removes the item with the given index.
|
// Remove removes the item with the given index.
|
||||||
func (q *LazyQueue) Remove(index int) interface{} {
|
func (q *LazyQueue[P, V]) Remove(index int) V {
|
||||||
if index < 0 {
|
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item[P, V]).value
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Empty checks whether the priority queue is empty.
|
// Empty checks whether the priority queue is empty.
|
||||||
func (q *LazyQueue) Empty() bool {
|
func (q *LazyQueue[P, V]) Empty() bool {
|
||||||
return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
|
return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the number of items in the priority queue.
|
// Size returns the number of items in the priority queue.
|
||||||
func (q *LazyQueue) Size() int {
|
func (q *LazyQueue[P, V]) Size() int {
|
||||||
return q.queue[0].Len() + q.queue[1].Len()
|
return q.queue[0].Len() + q.queue[1].Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
|
// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
|
||||||
func (q *LazyQueue) setIndex0(data interface{}, index int) {
|
func (q *LazyQueue[P, V]) setIndex0(data V, index int) {
|
||||||
if index == -1 {
|
if index == -1 {
|
||||||
q.setIndex(data, -1)
|
q.setIndex(data, -1)
|
||||||
} else {
|
} else {
|
||||||
@ -192,6 +190,6 @@ func (q *LazyQueue) setIndex0(data interface{}, index int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
|
// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
|
||||||
func (q *LazyQueue) setIndex1(data interface{}, index int) {
|
func (q *LazyQueue[P, V]) setIndex1(data V, index int) {
|
||||||
q.setIndex(data, index+index+1)
|
q.setIndex(data, index+index+1)
|
||||||
}
|
}
|
||||||
|
@ -19,65 +19,59 @@ package prque
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/heap"
|
"container/heap"
|
||||||
|
|
||||||
|
"golang.org/x/exp/constraints"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Priority queue data structure.
|
// Priority queue data structure.
|
||||||
type Prque struct {
|
type Prque[P constraints.Ordered, V any] struct {
|
||||||
cont *sstack
|
cont *sstack[P, V]
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new priority queue.
|
// New creates a new priority queue.
|
||||||
func New(setIndex SetIndexCallback) *Prque {
|
func New[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *Prque[P, V] {
|
||||||
return &Prque{newSstack(setIndex, false)}
|
return &Prque[P, V]{newSstack[P, V](setIndex)}
|
||||||
}
|
|
||||||
|
|
||||||
// NewWrapAround creates a new priority queue with wrap-around priority handling.
|
|
||||||
func NewWrapAround(setIndex SetIndexCallback) *Prque {
|
|
||||||
return &Prque{newSstack(setIndex, true)}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pushes a value with a given priority into the queue, expanding if necessary.
|
// Pushes a value with a given priority into the queue, expanding if necessary.
|
||||||
func (p *Prque) Push(data interface{}, priority int64) {
|
func (p *Prque[P, V]) Push(data V, priority P) {
|
||||||
heap.Push(p.cont, &item{data, priority})
|
heap.Push(p.cont, &item[P, V]{data, priority})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peek returns the value with the greatest priority but does not pop it off.
|
// Peek returns the value with the greatest priority but does not pop it off.
|
||||||
func (p *Prque) Peek() (interface{}, int64) {
|
func (p *Prque[P, V]) Peek() (V, P) {
|
||||||
item := p.cont.blocks[0][0]
|
item := p.cont.blocks[0][0]
|
||||||
return item.value, item.priority
|
return item.value, item.priority
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pops the value with the greatest priority off the stack and returns it.
|
// Pops the value with the greatest priority off the stack and returns it.
|
||||||
// Currently no shrinking is done.
|
// Currently no shrinking is done.
|
||||||
func (p *Prque) Pop() (interface{}, int64) {
|
func (p *Prque[P, V]) Pop() (V, P) {
|
||||||
item := heap.Pop(p.cont).(*item)
|
item := heap.Pop(p.cont).(*item[P, V])
|
||||||
return item.value, item.priority
|
return item.value, item.priority
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pops only the item from the queue, dropping the associated priority value.
|
// Pops only the item from the queue, dropping the associated priority value.
|
||||||
func (p *Prque) PopItem() interface{} {
|
func (p *Prque[P, V]) PopItem() V {
|
||||||
return heap.Pop(p.cont).(*item).value
|
return heap.Pop(p.cont).(*item[P, V]).value
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove removes the element with the given index.
|
// Remove removes the element with the given index.
|
||||||
func (p *Prque) Remove(i int) interface{} {
|
func (p *Prque[P, V]) Remove(i int) V {
|
||||||
if i < 0 {
|
return heap.Remove(p.cont, i).(*item[P, V]).value
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return heap.Remove(p.cont, i)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Checks whether the priority queue is empty.
|
// Checks whether the priority queue is empty.
|
||||||
func (p *Prque) Empty() bool {
|
func (p *Prque[P, V]) Empty() bool {
|
||||||
return p.cont.Len() == 0
|
return p.cont.Len() == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the number of element in the priority queue.
|
// Returns the number of element in the priority queue.
|
||||||
func (p *Prque) Size() int {
|
func (p *Prque[P, V]) Size() int {
|
||||||
return p.cont.Len()
|
return p.cont.Len()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clears the contents of the priority queue.
|
// Clears the contents of the priority queue.
|
||||||
func (p *Prque) Reset() {
|
func (p *Prque[P, V]) Reset() {
|
||||||
*p = *New(p.cont.setIndex)
|
*p = *New[P, V](p.cont.setIndex)
|
||||||
}
|
}
|
||||||
|
@ -21,22 +21,24 @@ func TestPrque(t *testing.T) {
|
|||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
data[i] = rand.Int()
|
data[i] = rand.Int()
|
||||||
}
|
}
|
||||||
queue := New(nil)
|
queue := New[int, int](nil)
|
||||||
|
|
||||||
for rep := 0; rep < 2; rep++ {
|
for rep := 0; rep < 2; rep++ {
|
||||||
// Fill a priority queue with the above data
|
// Fill a priority queue with the above data
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
queue.Push(data[i], int64(prio[i]))
|
queue.Push(data[i], prio[i])
|
||||||
if queue.Size() != i+1 {
|
if queue.Size() != i+1 {
|
||||||
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
|
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Create a map the values to the priorities for easier verification
|
// Create a map the values to the priorities for easier verification
|
||||||
dict := make(map[int64]int)
|
dict := make(map[int]int)
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
dict[int64(prio[i])] = data[i]
|
dict[prio[i]] = data[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pop out the elements in priority order and verify them
|
// Pop out the elements in priority order and verify them
|
||||||
prevPrio := int64(size + 1)
|
prevPrio := size + 1
|
||||||
for !queue.Empty() {
|
for !queue.Empty() {
|
||||||
val, prio := queue.Pop()
|
val, prio := queue.Pop()
|
||||||
if prio > prevPrio {
|
if prio > prevPrio {
|
||||||
@ -59,22 +61,23 @@ func TestReset(t *testing.T) {
|
|||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
data[i] = rand.Int()
|
data[i] = rand.Int()
|
||||||
}
|
}
|
||||||
queue := New(nil)
|
queue := New[int, int](nil)
|
||||||
|
|
||||||
for rep := 0; rep < 2; rep++ {
|
for rep := 0; rep < 2; rep++ {
|
||||||
// Fill a priority queue with the above data
|
// Fill a priority queue with the above data
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
queue.Push(data[i], int64(prio[i]))
|
queue.Push(data[i], prio[i])
|
||||||
if queue.Size() != i+1 {
|
if queue.Size() != i+1 {
|
||||||
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
|
t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Create a map the values to the priorities for easier verification
|
// Create a map the values to the priorities for easier verification
|
||||||
dict := make(map[int64]int)
|
dict := make(map[int]int)
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
dict[int64(prio[i])] = data[i]
|
dict[prio[i]] = data[i]
|
||||||
}
|
}
|
||||||
// Pop out half the elements in priority order and verify them
|
// Pop out half the elements in priority order and verify them
|
||||||
prevPrio := int64(size + 1)
|
prevPrio := size + 1
|
||||||
for i := 0; i < size/2; i++ {
|
for i := 0; i < size/2; i++ {
|
||||||
val, prio := queue.Pop()
|
val, prio := queue.Pop()
|
||||||
if prio > prevPrio {
|
if prio > prevPrio {
|
||||||
@ -104,7 +107,7 @@ func BenchmarkPush(b *testing.B) {
|
|||||||
}
|
}
|
||||||
// Execute the benchmark
|
// Execute the benchmark
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
queue := New(nil)
|
queue := New[int64, int](nil)
|
||||||
for i := 0; i < len(data); i++ {
|
for i := 0; i < len(data); i++ {
|
||||||
queue.Push(data[i], prio[i])
|
queue.Push(data[i], prio[i])
|
||||||
}
|
}
|
||||||
@ -118,7 +121,7 @@ func BenchmarkPop(b *testing.B) {
|
|||||||
data[i] = rand.Int()
|
data[i] = rand.Int()
|
||||||
prio[i] = rand.Int63()
|
prio[i] = rand.Int63()
|
||||||
}
|
}
|
||||||
queue := New(nil)
|
queue := New[int64, int](nil)
|
||||||
for i := 0; i < len(data); i++ {
|
for i := 0; i < len(data); i++ {
|
||||||
queue.Push(data[i], prio[i])
|
queue.Push(data[i], prio[i])
|
||||||
}
|
}
|
||||||
|
@ -10,53 +10,50 @@
|
|||||||
|
|
||||||
package prque
|
package prque
|
||||||
|
|
||||||
|
import "golang.org/x/exp/constraints"
|
||||||
|
|
||||||
// The size of a block of data
|
// The size of a block of data
|
||||||
const blockSize = 4096
|
const blockSize = 4096
|
||||||
|
|
||||||
// A prioritized item in the sorted stack.
|
// A prioritized item in the sorted stack.
|
||||||
//
|
type item[P constraints.Ordered, V any] struct {
|
||||||
// Note: priorities can "wrap around" the int64 range, a comes before b if (a.priority - b.priority) > 0.
|
value V
|
||||||
// The difference between the lowest and highest priorities in the queue at any point should be less than 2^63.
|
priority P
|
||||||
type item struct {
|
|
||||||
value interface{}
|
|
||||||
priority int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetIndexCallback is called when the element is moved to a new index.
|
// SetIndexCallback is called when the element is moved to a new index.
|
||||||
// Providing SetIndexCallback is optional, it is needed only if the application needs
|
// Providing SetIndexCallback is optional, it is needed only if the application needs
|
||||||
// to delete elements other than the top one.
|
// to delete elements other than the top one.
|
||||||
type SetIndexCallback func(data interface{}, index int)
|
type SetIndexCallback[V any] func(data V, index int)
|
||||||
|
|
||||||
// Internal sortable stack data structure. Implements the Push and Pop ops for
|
// Internal sortable stack data structure. Implements the Push and Pop ops for
|
||||||
// the stack (heap) functionality and the Len, Less and Swap methods for the
|
// the stack (heap) functionality and the Len, Less and Swap methods for the
|
||||||
// sortability requirements of the heaps.
|
// sortability requirements of the heaps.
|
||||||
type sstack struct {
|
type sstack[P constraints.Ordered, V any] struct {
|
||||||
setIndex SetIndexCallback
|
setIndex SetIndexCallback[V]
|
||||||
size int
|
size int
|
||||||
capacity int
|
capacity int
|
||||||
offset int
|
offset int
|
||||||
wrapAround bool
|
|
||||||
|
|
||||||
blocks [][]*item
|
blocks [][]*item[P, V]
|
||||||
active []*item
|
active []*item[P, V]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new, empty stack.
|
// Creates a new, empty stack.
|
||||||
func newSstack(setIndex SetIndexCallback, wrapAround bool) *sstack {
|
func newSstack[P constraints.Ordered, V any](setIndex SetIndexCallback[V]) *sstack[P, V] {
|
||||||
result := new(sstack)
|
result := new(sstack[P, V])
|
||||||
result.setIndex = setIndex
|
result.setIndex = setIndex
|
||||||
result.active = make([]*item, blockSize)
|
result.active = make([]*item[P, V], blockSize)
|
||||||
result.blocks = [][]*item{result.active}
|
result.blocks = [][]*item[P, V]{result.active}
|
||||||
result.capacity = blockSize
|
result.capacity = blockSize
|
||||||
result.wrapAround = wrapAround
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pushes a value onto the stack, expanding it if necessary. Required by
|
// Pushes a value onto the stack, expanding it if necessary. Required by
|
||||||
// heap.Interface.
|
// heap.Interface.
|
||||||
func (s *sstack) Push(data interface{}) {
|
func (s *sstack[P, V]) Push(data any) {
|
||||||
if s.size == s.capacity {
|
if s.size == s.capacity {
|
||||||
s.active = make([]*item, blockSize)
|
s.active = make([]*item[P, V], blockSize)
|
||||||
s.blocks = append(s.blocks, s.active)
|
s.blocks = append(s.blocks, s.active)
|
||||||
s.capacity += blockSize
|
s.capacity += blockSize
|
||||||
s.offset = 0
|
s.offset = 0
|
||||||
@ -65,16 +62,16 @@ func (s *sstack) Push(data interface{}) {
|
|||||||
s.offset = 0
|
s.offset = 0
|
||||||
}
|
}
|
||||||
if s.setIndex != nil {
|
if s.setIndex != nil {
|
||||||
s.setIndex(data.(*item).value, s.size)
|
s.setIndex(data.(*item[P, V]).value, s.size)
|
||||||
}
|
}
|
||||||
s.active[s.offset] = data.(*item)
|
s.active[s.offset] = data.(*item[P, V])
|
||||||
s.offset++
|
s.offset++
|
||||||
s.size++
|
s.size++
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pops a value off the stack and returns it. Currently no shrinking is done.
|
// Pops a value off the stack and returns it. Currently no shrinking is done.
|
||||||
// Required by heap.Interface.
|
// Required by heap.Interface.
|
||||||
func (s *sstack) Pop() (res interface{}) {
|
func (s *sstack[P, V]) Pop() (res any) {
|
||||||
s.size--
|
s.size--
|
||||||
s.offset--
|
s.offset--
|
||||||
if s.offset < 0 {
|
if s.offset < 0 {
|
||||||
@ -83,28 +80,24 @@ func (s *sstack) Pop() (res interface{}) {
|
|||||||
}
|
}
|
||||||
res, s.active[s.offset] = s.active[s.offset], nil
|
res, s.active[s.offset] = s.active[s.offset], nil
|
||||||
if s.setIndex != nil {
|
if s.setIndex != nil {
|
||||||
s.setIndex(res.(*item).value, -1)
|
s.setIndex(res.(*item[P, V]).value, -1)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the length of the stack. Required by sort.Interface.
|
// Returns the length of the stack. Required by sort.Interface.
|
||||||
func (s *sstack) Len() int {
|
func (s *sstack[P, V]) Len() int {
|
||||||
return s.size
|
return s.size
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compares the priority of two elements of the stack (higher is first).
|
// Compares the priority of two elements of the stack (higher is first).
|
||||||
// Required by sort.Interface.
|
// Required by sort.Interface.
|
||||||
func (s *sstack) Less(i, j int) bool {
|
func (s *sstack[P, V]) Less(i, j int) bool {
|
||||||
a, b := s.blocks[i/blockSize][i%blockSize].priority, s.blocks[j/blockSize][j%blockSize].priority
|
return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority
|
||||||
if s.wrapAround {
|
|
||||||
return a-b > 0
|
|
||||||
}
|
|
||||||
return a > b
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Swaps two elements in the stack. Required by sort.Interface.
|
// Swaps two elements in the stack. Required by sort.Interface.
|
||||||
func (s *sstack) Swap(i, j int) {
|
func (s *sstack[P, V]) Swap(i, j int) {
|
||||||
ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize
|
ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize
|
||||||
a, b := s.blocks[jb][jo], s.blocks[ib][io]
|
a, b := s.blocks[jb][jo], s.blocks[ib][io]
|
||||||
if s.setIndex != nil {
|
if s.setIndex != nil {
|
||||||
@ -115,6 +108,6 @@ func (s *sstack) Swap(i, j int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Resets the stack, effectively clearing its contents.
|
// Resets the stack, effectively clearing its contents.
|
||||||
func (s *sstack) Reset() {
|
func (s *sstack[P, V]) Reset() {
|
||||||
*s = *newSstack(s.setIndex, false)
|
*s = *newSstack[P, V](s.setIndex)
|
||||||
}
|
}
|
||||||
|
@ -17,23 +17,23 @@ import (
|
|||||||
func TestSstack(t *testing.T) {
|
func TestSstack(t *testing.T) {
|
||||||
// Create some initial data
|
// Create some initial data
|
||||||
size := 16 * blockSize
|
size := 16 * blockSize
|
||||||
data := make([]*item, size)
|
data := make([]*item[int64, int], size)
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
data[i] = &item{rand.Int(), rand.Int63()}
|
data[i] = &item[int64, int]{rand.Int(), rand.Int63()}
|
||||||
}
|
}
|
||||||
stack := newSstack(nil, false)
|
stack := newSstack[int64, int](nil)
|
||||||
for rep := 0; rep < 2; rep++ {
|
for rep := 0; rep < 2; rep++ {
|
||||||
// Push all the data into the stack, pop out every second
|
// Push all the data into the stack, pop out every second
|
||||||
secs := []*item{}
|
secs := []*item[int64, int]{}
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
stack.Push(data[i])
|
stack.Push(data[i])
|
||||||
if i%2 == 0 {
|
if i%2 == 0 {
|
||||||
secs = append(secs, stack.Pop().(*item))
|
secs = append(secs, stack.Pop().(*item[int64, int]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rest := []*item{}
|
rest := []*item[int64, int]{}
|
||||||
for stack.Len() > 0 {
|
for stack.Len() > 0 {
|
||||||
rest = append(rest, stack.Pop().(*item))
|
rest = append(rest, stack.Pop().(*item[int64, int]))
|
||||||
}
|
}
|
||||||
// Make sure the contents of the resulting slices are ok
|
// Make sure the contents of the resulting slices are ok
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
@ -50,12 +50,12 @@ func TestSstack(t *testing.T) {
|
|||||||
func TestSstackSort(t *testing.T) {
|
func TestSstackSort(t *testing.T) {
|
||||||
// Create some initial data
|
// Create some initial data
|
||||||
size := 16 * blockSize
|
size := 16 * blockSize
|
||||||
data := make([]*item, size)
|
data := make([]*item[int64, int], size)
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
data[i] = &item{rand.Int(), int64(i)}
|
data[i] = &item[int64, int]{rand.Int(), int64(i)}
|
||||||
}
|
}
|
||||||
// Push all the data into the stack
|
// Push all the data into the stack
|
||||||
stack := newSstack(nil, false)
|
stack := newSstack[int64, int](nil)
|
||||||
for _, val := range data {
|
for _, val := range data {
|
||||||
stack.Push(val)
|
stack.Push(val)
|
||||||
}
|
}
|
||||||
@ -72,18 +72,18 @@ func TestSstackSort(t *testing.T) {
|
|||||||
func TestSstackReset(t *testing.T) {
|
func TestSstackReset(t *testing.T) {
|
||||||
// Create some initial data
|
// Create some initial data
|
||||||
size := 16 * blockSize
|
size := 16 * blockSize
|
||||||
data := make([]*item, size)
|
data := make([]*item[int64, int], size)
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
data[i] = &item{rand.Int(), rand.Int63()}
|
data[i] = &item[int64, int]{rand.Int(), rand.Int63()}
|
||||||
}
|
}
|
||||||
stack := newSstack(nil, false)
|
stack := newSstack[int64, int](nil)
|
||||||
for rep := 0; rep < 2; rep++ {
|
for rep := 0; rep < 2; rep++ {
|
||||||
// Push all the data into the stack, pop out every second
|
// Push all the data into the stack, pop out every second
|
||||||
secs := []*item{}
|
secs := []*item[int64, int]{}
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
stack.Push(data[i])
|
stack.Push(data[i])
|
||||||
if i%2 == 0 {
|
if i%2 == 0 {
|
||||||
secs = append(secs, stack.Pop().(*item))
|
secs = append(secs, stack.Pop().(*item[int64, int]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Reset and verify both pulled and stack contents
|
// Reset and verify both pulled and stack contents
|
||||||
|
@ -171,7 +171,7 @@ type BlockChain struct {
|
|||||||
|
|
||||||
db ethdb.Database // Low level persistent database to store final content in
|
db ethdb.Database // Low level persistent database to store final content in
|
||||||
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
|
snaps *snapshot.Tree // Snapshot tree for fast trie leaf access
|
||||||
triegc *prque.Prque // Priority queue mapping block numbers to tries to gc
|
triegc *prque.Prque[int64, common.Hash] // Priority queue mapping block numbers to tries to gc
|
||||||
gcproc time.Duration // Accumulates canonical block processing for trie dumping
|
gcproc time.Duration // Accumulates canonical block processing for trie dumping
|
||||||
lastWrite uint64 // Last block when the state was flushed
|
lastWrite uint64 // Last block when the state was flushed
|
||||||
flushInterval int64 // Time interval (processing time) after which to flush a state
|
flushInterval int64 // Time interval (processing time) after which to flush a state
|
||||||
@ -261,7 +261,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
|
|||||||
db: db,
|
db: db,
|
||||||
triedb: triedb,
|
triedb: triedb,
|
||||||
flushInterval: int64(cacheConfig.TrieTimeLimit),
|
flushInterval: int64(cacheConfig.TrieTimeLimit),
|
||||||
triegc: prque.New(nil),
|
triegc: prque.New[int64, common.Hash](nil),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
chainmu: syncx.NewClosableMutex(),
|
chainmu: syncx.NewClosableMutex(),
|
||||||
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
|
bodyCache: lru.NewCache[common.Hash, *types.Body](bodyCacheLimit),
|
||||||
@ -957,7 +957,7 @@ func (bc *BlockChain) Stop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for !bc.triegc.Empty() {
|
for !bc.triegc.Empty() {
|
||||||
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
|
triedb.Dereference(bc.triegc.PopItem())
|
||||||
}
|
}
|
||||||
if size, _ := triedb.Size(); size != 0 {
|
if size, _ := triedb.Size(); size != 0 {
|
||||||
log.Error("Dangling trie nodes after full cleanup")
|
log.Error("Dangling trie nodes after full cleanup")
|
||||||
@ -1391,7 +1391,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||||||
bc.triegc.Push(root, number)
|
bc.triegc.Push(root, number)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
bc.triedb.Dereference(root.(common.Hash))
|
bc.triedb.Dereference(root)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -191,7 +191,7 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
|
|||||||
// in to be [to-1]. Therefore, setting lastNum to means that the
|
// in to be [to-1]. Therefore, setting lastNum to means that the
|
||||||
// prqueue gap-evaluation will work correctly
|
// prqueue gap-evaluation will work correctly
|
||||||
lastNum = to
|
lastNum = to
|
||||||
queue = prque.New(nil)
|
queue = prque.New[int64, *blockTxHashes](nil)
|
||||||
// for stats reporting
|
// for stats reporting
|
||||||
blocks, txs = 0, 0
|
blocks, txs = 0, 0
|
||||||
)
|
)
|
||||||
@ -210,7 +210,7 @@ func indexTransactions(db ethdb.Database, from uint64, to uint64, interrupt chan
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Next block available, pop it off and index it
|
// Next block available, pop it off and index it
|
||||||
delivery := queue.PopItem().(*blockTxHashes)
|
delivery := queue.PopItem()
|
||||||
lastNum = delivery.number
|
lastNum = delivery.number
|
||||||
WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
|
WriteTxLookupEntries(batch, delivery.number, delivery.hashes)
|
||||||
blocks++
|
blocks++
|
||||||
@ -282,7 +282,7 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
|
|||||||
// we expect the first number to come in to be [from]. Therefore, setting
|
// we expect the first number to come in to be [from]. Therefore, setting
|
||||||
// nextNum to from means that the prqueue gap-evaluation will work correctly
|
// nextNum to from means that the prqueue gap-evaluation will work correctly
|
||||||
nextNum = from
|
nextNum = from
|
||||||
queue = prque.New(nil)
|
queue = prque.New[int64, *blockTxHashes](nil)
|
||||||
// for stats reporting
|
// for stats reporting
|
||||||
blocks, txs = 0, 0
|
blocks, txs = 0, 0
|
||||||
)
|
)
|
||||||
@ -299,7 +299,7 @@ func unindexTransactions(db ethdb.Database, from uint64, to uint64, interrupt ch
|
|||||||
if hook != nil && !hook(nextNum) {
|
if hook != nil && !hook(nextNum) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
delivery := queue.PopItem().(*blockTxHashes)
|
delivery := queue.PopItem()
|
||||||
nextNum = delivery.number + 1
|
nextNum = delivery.number + 1
|
||||||
DeleteTxLookupEntries(batch, delivery.hashes)
|
DeleteTxLookupEntries(batch, delivery.hashes)
|
||||||
txs += len(delivery.hashes)
|
txs += len(delivery.hashes)
|
||||||
|
@ -1395,7 +1395,7 @@ func (pool *TxPool) truncatePending() {
|
|||||||
|
|
||||||
pendingBeforeCap := pending
|
pendingBeforeCap := pending
|
||||||
// Assemble a spam order to penalize large transactors first
|
// Assemble a spam order to penalize large transactors first
|
||||||
spammers := prque.New(nil)
|
spammers := prque.New[int64, common.Address](nil)
|
||||||
for addr, list := range pool.pending {
|
for addr, list := range pool.pending {
|
||||||
// Only evict transactions from high rollers
|
// Only evict transactions from high rollers
|
||||||
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
|
if !pool.locals.contains(addr) && uint64(list.Len()) > pool.config.AccountSlots {
|
||||||
@ -1407,12 +1407,12 @@ func (pool *TxPool) truncatePending() {
|
|||||||
for pending > pool.config.GlobalSlots && !spammers.Empty() {
|
for pending > pool.config.GlobalSlots && !spammers.Empty() {
|
||||||
// Retrieve the next offender if not local address
|
// Retrieve the next offender if not local address
|
||||||
offender, _ := spammers.Pop()
|
offender, _ := spammers.Pop()
|
||||||
offenders = append(offenders, offender.(common.Address))
|
offenders = append(offenders, offender)
|
||||||
|
|
||||||
// Equalize balances until all the same or below threshold
|
// Equalize balances until all the same or below threshold
|
||||||
if len(offenders) > 1 {
|
if len(offenders) > 1 {
|
||||||
// Calculate the equalization threshold for all current offenders
|
// Calculate the equalization threshold for all current offenders
|
||||||
threshold := pool.pending[offender.(common.Address)].Len()
|
threshold := pool.pending[offender].Len()
|
||||||
|
|
||||||
// Iteratively reduce all offenders until below limit or threshold reached
|
// Iteratively reduce all offenders until below limit or threshold reached
|
||||||
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
|
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
|
||||||
|
@ -91,8 +91,8 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
ordering := make(map[*eth.Request]int)
|
ordering := make(map[*eth.Request]int)
|
||||||
timeouts := prque.New(func(data interface{}, index int) {
|
timeouts := prque.New[int64, *eth.Request](func(data *eth.Request, index int) {
|
||||||
ordering[data.(*eth.Request)] = index
|
ordering[data] = index
|
||||||
})
|
})
|
||||||
|
|
||||||
timeout := time.NewTimer(0)
|
timeout := time.NewTimer(0)
|
||||||
@ -268,14 +268,12 @@ func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error {
|
|||||||
// below is purely for to catch programming errors, given the correct
|
// below is purely for to catch programming errors, given the correct
|
||||||
// code, there's no possible order of events that should result in a
|
// code, there's no possible order of events that should result in a
|
||||||
// timeout firing for a non-existent event.
|
// timeout firing for a non-existent event.
|
||||||
item, exp := timeouts.Peek()
|
req, exp := timeouts.Peek()
|
||||||
if now, at := time.Now(), time.Unix(0, -exp); now.Before(at) {
|
if now, at := time.Now(), time.Unix(0, -exp); now.Before(at) {
|
||||||
log.Error("Timeout triggered but not reached", "left", at.Sub(now))
|
log.Error("Timeout triggered but not reached", "left", at.Sub(now))
|
||||||
timeout.Reset(at.Sub(now))
|
timeout.Reset(at.Sub(now))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
req := item.(*eth.Request)
|
|
||||||
|
|
||||||
// Stop tracking the timed out request from a timing perspective,
|
// Stop tracking the timed out request from a timing perspective,
|
||||||
// cancel it, so it's not considered in-flight anymore, but keep
|
// cancel it, so it's not considered in-flight anymore, but keep
|
||||||
// the peer marked busy to prevent assigning a second request and
|
// the peer marked busy to prevent assigning a second request and
|
||||||
|
@ -115,7 +115,7 @@ type queue struct {
|
|||||||
// Headers are "special", they download in batches, supported by a skeleton chain
|
// Headers are "special", they download in batches, supported by a skeleton chain
|
||||||
headerHead common.Hash // Hash of the last queued header to verify order
|
headerHead common.Hash // Hash of the last queued header to verify order
|
||||||
headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
|
headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
|
||||||
headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for
|
headerTaskQueue *prque.Prque[int64, uint64] // Priority queue of the skeleton indexes to fetch the filling headers for
|
||||||
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
|
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
|
||||||
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
|
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
|
||||||
headerResults []*types.Header // Result cache accumulating the completed headers
|
headerResults []*types.Header // Result cache accumulating the completed headers
|
||||||
@ -126,12 +126,12 @@ type queue struct {
|
|||||||
|
|
||||||
// All data retrievals below are based on an already assembles header chain
|
// All data retrievals below are based on an already assembles header chain
|
||||||
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
|
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
|
||||||
blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
|
blockTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the blocks (bodies) for
|
||||||
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
|
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
|
||||||
blockWakeCh chan bool // Channel to notify the block fetcher of new tasks
|
blockWakeCh chan bool // Channel to notify the block fetcher of new tasks
|
||||||
|
|
||||||
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
|
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
|
||||||
receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
|
receiptTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the receipts for
|
||||||
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
|
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
|
||||||
receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks
|
receiptWakeCh chan bool // Channel to notify when receipt fetcher of new tasks
|
||||||
|
|
||||||
@ -150,9 +150,9 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
|
|||||||
lock := new(sync.RWMutex)
|
lock := new(sync.RWMutex)
|
||||||
q := &queue{
|
q := &queue{
|
||||||
headerContCh: make(chan bool, 1),
|
headerContCh: make(chan bool, 1),
|
||||||
blockTaskQueue: prque.New(nil),
|
blockTaskQueue: prque.New[int64, *types.Header](nil),
|
||||||
blockWakeCh: make(chan bool, 1),
|
blockWakeCh: make(chan bool, 1),
|
||||||
receiptTaskQueue: prque.New(nil),
|
receiptTaskQueue: prque.New[int64, *types.Header](nil),
|
||||||
receiptWakeCh: make(chan bool, 1),
|
receiptWakeCh: make(chan bool, 1),
|
||||||
active: sync.NewCond(lock),
|
active: sync.NewCond(lock),
|
||||||
lock: lock,
|
lock: lock,
|
||||||
@ -258,7 +258,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
|
|||||||
}
|
}
|
||||||
// Schedule all the header retrieval tasks for the skeleton assembly
|
// Schedule all the header retrieval tasks for the skeleton assembly
|
||||||
q.headerTaskPool = make(map[uint64]*types.Header)
|
q.headerTaskPool = make(map[uint64]*types.Header)
|
||||||
q.headerTaskQueue = prque.New(nil)
|
q.headerTaskQueue = prque.New[int64, uint64](nil)
|
||||||
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
|
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
|
||||||
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
|
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
|
||||||
q.headerHashes = make([]common.Hash, len(skeleton)*MaxHeaderFetch)
|
q.headerHashes = make([]common.Hash, len(skeleton)*MaxHeaderFetch)
|
||||||
@ -428,12 +428,12 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
|
|||||||
for send == 0 && !q.headerTaskQueue.Empty() {
|
for send == 0 && !q.headerTaskQueue.Empty() {
|
||||||
from, _ := q.headerTaskQueue.Pop()
|
from, _ := q.headerTaskQueue.Pop()
|
||||||
if q.headerPeerMiss[p.id] != nil {
|
if q.headerPeerMiss[p.id] != nil {
|
||||||
if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
|
if _, ok := q.headerPeerMiss[p.id][from]; ok {
|
||||||
skip = append(skip, from.(uint64))
|
skip = append(skip, from)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
send = from.(uint64)
|
send = from
|
||||||
}
|
}
|
||||||
// Merge all the skipped batches back
|
// Merge all the skipped batches back
|
||||||
for _, from := range skip {
|
for _, from := range skip {
|
||||||
@ -485,7 +485,7 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo
|
|||||||
// item - the fetchRequest
|
// item - the fetchRequest
|
||||||
// progress - whether any progress was made
|
// progress - whether any progress was made
|
||||||
// throttle - if the caller should throttle for a while
|
// throttle - if the caller should throttle for a while
|
||||||
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
|
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header],
|
||||||
pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
|
pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
|
||||||
// Short circuit if the pool has been depleted, or if the peer's already
|
// Short circuit if the pool has been depleted, or if the peer's already
|
||||||
// downloading something (sanity check not to corrupt state)
|
// downloading something (sanity check not to corrupt state)
|
||||||
@ -503,8 +503,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
|
|||||||
for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ {
|
for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ {
|
||||||
// the task queue will pop items in order, so the highest prio block
|
// the task queue will pop items in order, so the highest prio block
|
||||||
// is also the lowest block number.
|
// is also the lowest block number.
|
||||||
h, _ := taskQueue.Peek()
|
header, _ := taskQueue.Peek()
|
||||||
header := h.(*types.Header)
|
|
||||||
// we can ask the resultcache if this header is within the
|
// we can ask the resultcache if this header is within the
|
||||||
// "prioritized" segment of blocks. If it is not, we need to throttle
|
// "prioritized" segment of blocks. If it is not, we need to throttle
|
||||||
|
|
||||||
@ -627,12 +627,14 @@ func (q *queue) ExpireReceipts(peer string) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// expire is the generic check that moves a specific expired task from a pending
|
// expire is the generic check that moves a specific expired task from a pending
|
||||||
// pool back into a task pool.
|
// pool back into a task pool. The syntax on the passed taskQueue is a bit weird
|
||||||
|
// as we would need a generic expire method to handle both types, but that is not
|
||||||
|
// supported at the moment at least (Go 1.19).
|
||||||
//
|
//
|
||||||
// Note, this method expects the queue lock to be already held. The reason the
|
// Note, this method expects the queue lock to be already held. The reason the
|
||||||
// lock is not obtained in here is that the parameters already need to access
|
// lock is not obtained in here is that the parameters already need to access
|
||||||
// the queue, so they already need a lock anyway.
|
// the queue, so they already need a lock anyway.
|
||||||
func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue *prque.Prque) int {
|
func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue interface{}) int {
|
||||||
// Retrieve the request being expired and log an error if it's non-existent,
|
// Retrieve the request being expired and log an error if it's non-existent,
|
||||||
// as there's no order of events that should lead to such expirations.
|
// as there's no order of events that should lead to such expirations.
|
||||||
req := pendPool[peer]
|
req := pendPool[peer]
|
||||||
@ -644,10 +646,10 @@ func (q *queue) expire(peer string, pendPool map[string]*fetchRequest, taskQueue
|
|||||||
|
|
||||||
// Return any non-satisfied requests to the pool
|
// Return any non-satisfied requests to the pool
|
||||||
if req.From > 0 {
|
if req.From > 0 {
|
||||||
taskQueue.Push(req.From, -int64(req.From))
|
taskQueue.(*prque.Prque[int64, uint64]).Push(req.From, -int64(req.From))
|
||||||
}
|
}
|
||||||
for _, header := range req.Headers {
|
for _, header := range req.Headers {
|
||||||
taskQueue.Push(header, -int64(header.Number.Uint64()))
|
taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64()))
|
||||||
}
|
}
|
||||||
return len(req.Headers)
|
return len(req.Headers)
|
||||||
}
|
}
|
||||||
@ -824,7 +826,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt, recei
|
|||||||
// reason this lock is not obtained in here is because the parameters already need
|
// reason this lock is not obtained in here is because the parameters already need
|
||||||
// to access the queue, so they already need a lock anyway.
|
// to access the queue, so they already need a lock anyway.
|
||||||
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
|
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
|
||||||
taskQueue *prque.Prque, pendPool map[string]*fetchRequest,
|
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest,
|
||||||
reqTimer metrics.Timer, resInMeter metrics.Meter, resDropMeter metrics.Meter,
|
reqTimer metrics.Timer, resInMeter metrics.Meter, resDropMeter metrics.Meter,
|
||||||
results int, validate func(index int, header *types.Header) error,
|
results int, validate func(index int, header *types.Header) error,
|
||||||
reconstruct func(index int, result *fetchResult)) (int, error) {
|
reconstruct func(index int, result *fetchResult)) (int, error) {
|
||||||
|
@ -175,7 +175,7 @@ type BlockFetcher struct {
|
|||||||
completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
|
completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
|
||||||
|
|
||||||
// Block cache
|
// Block cache
|
||||||
queue *prque.Prque // Queue containing the import operations (block number sorted)
|
queue *prque.Prque[int64, *blockOrHeaderInject] // Queue containing the import operations (block number sorted)
|
||||||
queues map[string]int // Per peer block counts to prevent memory exhaustion
|
queues map[string]int // Per peer block counts to prevent memory exhaustion
|
||||||
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
|
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
|
||||||
|
|
||||||
@ -212,7 +212,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
|
|||||||
fetching: make(map[common.Hash]*blockAnnounce),
|
fetching: make(map[common.Hash]*blockAnnounce),
|
||||||
fetched: make(map[common.Hash][]*blockAnnounce),
|
fetched: make(map[common.Hash][]*blockAnnounce),
|
||||||
completing: make(map[common.Hash]*blockAnnounce),
|
completing: make(map[common.Hash]*blockAnnounce),
|
||||||
queue: prque.New(nil),
|
queue: prque.New[int64, *blockOrHeaderInject](nil),
|
||||||
queues: make(map[string]int),
|
queues: make(map[string]int),
|
||||||
queued: make(map[common.Hash]*blockOrHeaderInject),
|
queued: make(map[common.Hash]*blockOrHeaderInject),
|
||||||
getHeader: getHeader,
|
getHeader: getHeader,
|
||||||
@ -351,7 +351,7 @@ func (f *BlockFetcher) loop() {
|
|||||||
// Import any queued blocks that could potentially fit
|
// Import any queued blocks that could potentially fit
|
||||||
height := f.chainHeight()
|
height := f.chainHeight()
|
||||||
for !f.queue.Empty() {
|
for !f.queue.Empty() {
|
||||||
op := f.queue.PopItem().(*blockOrHeaderInject)
|
op := f.queue.PopItem()
|
||||||
hash := op.hash()
|
hash := op.hash()
|
||||||
if f.queueChangeHook != nil {
|
if f.queueChangeHook != nil {
|
||||||
f.queueChangeHook(hash, false)
|
f.queueChangeHook(hash, false)
|
||||||
|
6
go.mod
6
go.mod
@ -59,11 +59,12 @@ require (
|
|||||||
github.com/tyler-smith/go-bip39 v1.1.0
|
github.com/tyler-smith/go-bip39 v1.1.0
|
||||||
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
|
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
|
||||||
golang.org/x/crypto v0.1.0
|
golang.org/x/crypto v0.1.0
|
||||||
|
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9
|
||||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
|
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
|
||||||
golang.org/x/sys v0.3.0
|
golang.org/x/sys v0.3.0
|
||||||
golang.org/x/text v0.4.0
|
golang.org/x/text v0.4.0
|
||||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||||
golang.org/x/tools v0.1.12
|
golang.org/x/tools v0.2.0
|
||||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
|
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -116,8 +117,7 @@ require (
|
|||||||
github.com/tklauser/go-sysconf v0.3.5 // indirect
|
github.com/tklauser/go-sysconf v0.3.5 // indirect
|
||||||
github.com/tklauser/numcpus v0.2.2 // indirect
|
github.com/tklauser/numcpus v0.2.2 // indirect
|
||||||
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
|
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
|
||||||
golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 // indirect
|
golang.org/x/mod v0.6.0 // indirect
|
||||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
|
|
||||||
golang.org/x/net v0.1.0 // indirect
|
golang.org/x/net v0.1.0 // indirect
|
||||||
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
|
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
|
||||||
google.golang.org/protobuf v1.27.1 // indirect
|
google.golang.org/protobuf v1.27.1 // indirect
|
||||||
|
14
go.sum
14
go.sum
@ -298,7 +298,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||||||
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||||
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
|
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
|
||||||
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
|
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
|
||||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||||
github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa h1:Q75Upo5UN4JbPFURXZ8nLKYUvF85dyFRop/vQ0Rv+64=
|
github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa h1:Q75Upo5UN4JbPFURXZ8nLKYUvF85dyFRop/vQ0Rv+64=
|
||||||
@ -657,8 +657,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
|
|||||||
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
|
||||||
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
|
||||||
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
|
||||||
golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5 h1:rxKZ2gOnYxjfmakvUUqh9Gyb6KXfrj7JWTxORTYqb0E=
|
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9 h1:yZNXmy+j/JpX19vZkVktWqAo7Gny4PBWYYK3zskGpx4=
|
||||||
golang.org/x/exp v0.0.0-20220426173459-3bcf042a4bf5/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
|
golang.org/x/exp v0.0.0-20221126150942-6ab00d035af9/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
|
||||||
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
|
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
|
||||||
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
|
||||||
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||||
@ -680,8 +680,8 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
|
|||||||
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
|
golang.org/x/mod v0.6.0 h1:b9gGHsz9/HhJ3HF5DHQytPpuwocVTChQJK3AvoLRD5I=
|
||||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI=
|
||||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||||
@ -872,8 +872,8 @@ golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc
|
|||||||
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
|
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
|
||||||
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
|
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
|
||||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||||
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
|
golang.org/x/tools v0.2.0 h1:G6AHpWxTMGY1KyEYoAQ5WTtIekUUvDNjan3ugu60JvE=
|
||||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA=
|
||||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
@ -115,7 +115,7 @@ type queue struct {
|
|||||||
// Headers are "special", they download in batches, supported by a skeleton chain
|
// Headers are "special", they download in batches, supported by a skeleton chain
|
||||||
headerHead common.Hash // Hash of the last queued header to verify order
|
headerHead common.Hash // Hash of the last queued header to verify order
|
||||||
headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
|
headerTaskPool map[uint64]*types.Header // Pending header retrieval tasks, mapping starting indexes to skeleton headers
|
||||||
headerTaskQueue *prque.Prque // Priority queue of the skeleton indexes to fetch the filling headers for
|
headerTaskQueue *prque.Prque[int64, uint64] // Priority queue of the skeleton indexes to fetch the filling headers for
|
||||||
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
|
headerPeerMiss map[string]map[uint64]struct{} // Set of per-peer header batches known to be unavailable
|
||||||
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
|
headerPendPool map[string]*fetchRequest // Currently pending header retrieval operations
|
||||||
headerResults []*types.Header // Result cache accumulating the completed headers
|
headerResults []*types.Header // Result cache accumulating the completed headers
|
||||||
@ -125,11 +125,11 @@ type queue struct {
|
|||||||
|
|
||||||
// All data retrievals below are based on an already assembles header chain
|
// All data retrievals below are based on an already assembles header chain
|
||||||
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
|
blockTaskPool map[common.Hash]*types.Header // Pending block (body) retrieval tasks, mapping hashes to headers
|
||||||
blockTaskQueue *prque.Prque // Priority queue of the headers to fetch the blocks (bodies) for
|
blockTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the blocks (bodies) for
|
||||||
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
|
blockPendPool map[string]*fetchRequest // Currently pending block (body) retrieval operations
|
||||||
|
|
||||||
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
|
receiptTaskPool map[common.Hash]*types.Header // Pending receipt retrieval tasks, mapping hashes to headers
|
||||||
receiptTaskQueue *prque.Prque // Priority queue of the headers to fetch the receipts for
|
receiptTaskQueue *prque.Prque[int64, *types.Header] // Priority queue of the headers to fetch the receipts for
|
||||||
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
|
receiptPendPool map[string]*fetchRequest // Currently pending receipt retrieval operations
|
||||||
|
|
||||||
resultCache *resultStore // Downloaded but not yet delivered fetch results
|
resultCache *resultStore // Downloaded but not yet delivered fetch results
|
||||||
@ -147,8 +147,8 @@ func newQueue(blockCacheLimit int, thresholdInitialSize int) *queue {
|
|||||||
lock := new(sync.RWMutex)
|
lock := new(sync.RWMutex)
|
||||||
q := &queue{
|
q := &queue{
|
||||||
headerContCh: make(chan bool),
|
headerContCh: make(chan bool),
|
||||||
blockTaskQueue: prque.New(nil),
|
blockTaskQueue: prque.New[int64, *types.Header](nil),
|
||||||
receiptTaskQueue: prque.New(nil),
|
receiptTaskQueue: prque.New[int64, *types.Header](nil),
|
||||||
active: sync.NewCond(lock),
|
active: sync.NewCond(lock),
|
||||||
lock: lock,
|
lock: lock,
|
||||||
}
|
}
|
||||||
@ -262,7 +262,7 @@ func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
|
|||||||
}
|
}
|
||||||
// Schedule all the header retrieval tasks for the skeleton assembly
|
// Schedule all the header retrieval tasks for the skeleton assembly
|
||||||
q.headerTaskPool = make(map[uint64]*types.Header)
|
q.headerTaskPool = make(map[uint64]*types.Header)
|
||||||
q.headerTaskQueue = prque.New(nil)
|
q.headerTaskQueue = prque.New[int64, uint64](nil)
|
||||||
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
|
q.headerPeerMiss = make(map[string]map[uint64]struct{}) // Reset availability to correct invalid chains
|
||||||
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
|
q.headerResults = make([]*types.Header, len(skeleton)*MaxHeaderFetch)
|
||||||
q.headerProced = 0
|
q.headerProced = 0
|
||||||
@ -424,12 +424,12 @@ func (q *queue) ReserveHeaders(p *peerConnection, count int) *fetchRequest {
|
|||||||
for send == 0 && !q.headerTaskQueue.Empty() {
|
for send == 0 && !q.headerTaskQueue.Empty() {
|
||||||
from, _ := q.headerTaskQueue.Pop()
|
from, _ := q.headerTaskQueue.Pop()
|
||||||
if q.headerPeerMiss[p.id] != nil {
|
if q.headerPeerMiss[p.id] != nil {
|
||||||
if _, ok := q.headerPeerMiss[p.id][from.(uint64)]; ok {
|
if _, ok := q.headerPeerMiss[p.id][from]; ok {
|
||||||
skip = append(skip, from.(uint64))
|
skip = append(skip, from)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
send = from.(uint64)
|
send = from
|
||||||
}
|
}
|
||||||
// Merge all the skipped batches back
|
// Merge all the skipped batches back
|
||||||
for _, from := range skip {
|
for _, from := range skip {
|
||||||
@ -481,7 +481,7 @@ func (q *queue) ReserveReceipts(p *peerConnection, count int) (*fetchRequest, bo
|
|||||||
// item - the fetchRequest
|
// item - the fetchRequest
|
||||||
// progress - whether any progress was made
|
// progress - whether any progress was made
|
||||||
// throttle - if the caller should throttle for a while
|
// throttle - if the caller should throttle for a while
|
||||||
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque,
|
func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common.Hash]*types.Header, taskQueue *prque.Prque[int64, *types.Header],
|
||||||
pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
|
pendPool map[string]*fetchRequest, kind uint) (*fetchRequest, bool, bool) {
|
||||||
// Short circuit if the pool has been depleted, or if the peer's already
|
// Short circuit if the pool has been depleted, or if the peer's already
|
||||||
// downloading something (sanity check not to corrupt state)
|
// downloading something (sanity check not to corrupt state)
|
||||||
@ -499,8 +499,8 @@ func (q *queue) reserveHeaders(p *peerConnection, count int, taskPool map[common
|
|||||||
for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ {
|
for proc := 0; len(send) < count && !taskQueue.Empty(); proc++ {
|
||||||
// the task queue will pop items in order, so the highest prio block
|
// the task queue will pop items in order, so the highest prio block
|
||||||
// is also the lowest block number.
|
// is also the lowest block number.
|
||||||
h, _ := taskQueue.Peek()
|
header, _ := taskQueue.Peek()
|
||||||
header := h.(*types.Header)
|
|
||||||
// we can ask the resultcache if this header is within the
|
// we can ask the resultcache if this header is within the
|
||||||
// "prioritized" segment of blocks. If it is not, we need to throttle
|
// "prioritized" segment of blocks. If it is not, we need to throttle
|
||||||
|
|
||||||
@ -591,12 +591,12 @@ func (q *queue) CancelReceipts(request *fetchRequest) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
|
// Cancel aborts a fetch request, returning all pending hashes to the task queue.
|
||||||
func (q *queue) cancel(request *fetchRequest, taskQueue *prque.Prque, pendPool map[string]*fetchRequest) {
|
func (q *queue) cancel(request *fetchRequest, taskQueue interface{}, pendPool map[string]*fetchRequest) {
|
||||||
if request.From > 0 {
|
if request.From > 0 {
|
||||||
taskQueue.Push(request.From, -int64(request.From))
|
taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From))
|
||||||
}
|
}
|
||||||
for _, header := range request.Headers {
|
for _, header := range request.Headers {
|
||||||
taskQueue.Push(header, -int64(header.Number.Uint64()))
|
taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64()))
|
||||||
}
|
}
|
||||||
delete(pendPool, request.Peer.id)
|
delete(pendPool, request.Peer.id)
|
||||||
}
|
}
|
||||||
@ -655,7 +655,7 @@ func (q *queue) ExpireReceipts(timeout time.Duration) map[string]int {
|
|||||||
// Note, this method expects the queue lock to be already held. The
|
// Note, this method expects the queue lock to be already held. The
|
||||||
// reason the lock is not obtained in here is because the parameters already need
|
// reason the lock is not obtained in here is because the parameters already need
|
||||||
// to access the queue, so they already need a lock anyway.
|
// to access the queue, so they already need a lock anyway.
|
||||||
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue *prque.Prque, timeoutMeter metrics.Meter) map[string]int {
|
func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest, taskQueue interface{}, timeoutMeter metrics.Meter) map[string]int {
|
||||||
// Iterate over the expired requests and return each to the queue
|
// Iterate over the expired requests and return each to the queue
|
||||||
expiries := make(map[string]int)
|
expiries := make(map[string]int)
|
||||||
for id, request := range pendPool {
|
for id, request := range pendPool {
|
||||||
@ -665,10 +665,10 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
|
|||||||
|
|
||||||
// Return any non satisfied requests to the pool
|
// Return any non satisfied requests to the pool
|
||||||
if request.From > 0 {
|
if request.From > 0 {
|
||||||
taskQueue.Push(request.From, -int64(request.From))
|
taskQueue.(*prque.Prque[int64, uint64]).Push(request.From, -int64(request.From))
|
||||||
}
|
}
|
||||||
for _, header := range request.Headers {
|
for _, header := range request.Headers {
|
||||||
taskQueue.Push(header, -int64(header.Number.Uint64()))
|
taskQueue.(*prque.Prque[int64, *types.Header]).Push(header, -int64(header.Number.Uint64()))
|
||||||
}
|
}
|
||||||
// Add the peer to the expiry report along the number of failed requests
|
// Add the peer to the expiry report along the number of failed requests
|
||||||
expiries[id] = len(request.Headers)
|
expiries[id] = len(request.Headers)
|
||||||
@ -831,7 +831,7 @@ func (q *queue) DeliverReceipts(id string, receiptList [][]*types.Receipt) (int,
|
|||||||
// reason this lock is not obtained in here is because the parameters already need
|
// reason this lock is not obtained in here is because the parameters already need
|
||||||
// to access the queue, so they already need a lock anyway.
|
// to access the queue, so they already need a lock anyway.
|
||||||
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
|
func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header,
|
||||||
taskQueue *prque.Prque, pendPool map[string]*fetchRequest, reqTimer metrics.Timer,
|
taskQueue *prque.Prque[int64, *types.Header], pendPool map[string]*fetchRequest, reqTimer metrics.Timer,
|
||||||
results int, validate func(index int, header *types.Header) error,
|
results int, validate func(index int, header *types.Header) error,
|
||||||
reconstruct func(index int, result *fetchResult)) (int, error) {
|
reconstruct func(index int, result *fetchResult)) (int, error) {
|
||||||
// Short circuit if the data was never requested
|
// Short circuit if the data was never requested
|
||||||
|
@ -177,7 +177,7 @@ type BlockFetcher struct {
|
|||||||
completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
|
completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
|
||||||
|
|
||||||
// Block cache
|
// Block cache
|
||||||
queue *prque.Prque // Queue containing the import operations (block number sorted)
|
queue *prque.Prque[int64, *blockOrHeaderInject] // Queue containing the import operations (block number sorted)
|
||||||
queues map[string]int // Per peer block counts to prevent memory exhaustion
|
queues map[string]int // Per peer block counts to prevent memory exhaustion
|
||||||
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
|
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)
|
||||||
|
|
||||||
@ -214,7 +214,7 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
|
|||||||
fetching: make(map[common.Hash]*blockAnnounce),
|
fetching: make(map[common.Hash]*blockAnnounce),
|
||||||
fetched: make(map[common.Hash][]*blockAnnounce),
|
fetched: make(map[common.Hash][]*blockAnnounce),
|
||||||
completing: make(map[common.Hash]*blockAnnounce),
|
completing: make(map[common.Hash]*blockAnnounce),
|
||||||
queue: prque.New(nil),
|
queue: prque.New[int64, *blockOrHeaderInject](nil),
|
||||||
queues: make(map[string]int),
|
queues: make(map[string]int),
|
||||||
queued: make(map[common.Hash]*blockOrHeaderInject),
|
queued: make(map[common.Hash]*blockOrHeaderInject),
|
||||||
getHeader: getHeader,
|
getHeader: getHeader,
|
||||||
@ -353,7 +353,7 @@ func (f *BlockFetcher) loop() {
|
|||||||
// Import any queued blocks that could potentially fit
|
// Import any queued blocks that could potentially fit
|
||||||
height := f.chainHeight()
|
height := f.chainHeight()
|
||||||
for !f.queue.Empty() {
|
for !f.queue.Empty() {
|
||||||
op := f.queue.PopItem().(*blockOrHeaderInject)
|
op := f.queue.PopItem()
|
||||||
hash := op.hash()
|
hash := op.hash()
|
||||||
if f.queueChangeHook != nil {
|
if f.queueChangeHook != nil {
|
||||||
f.queueChangeHook(hash, false)
|
f.queueChangeHook(hash, false)
|
||||||
|
@ -75,10 +75,11 @@ type ClientManager struct {
|
|||||||
// (totalRecharge / sumRecharge)*FixedPointMultiplier or 0 if sumRecharge==0
|
// (totalRecharge / sumRecharge)*FixedPointMultiplier or 0 if sumRecharge==0
|
||||||
rcLastUpdate mclock.AbsTime // last time the recharge integrator was updated
|
rcLastUpdate mclock.AbsTime // last time the recharge integrator was updated
|
||||||
rcLastIntValue int64 // last updated value of the recharge integrator
|
rcLastIntValue int64 // last updated value of the recharge integrator
|
||||||
|
priorityOffset int64 // offset for prque priority values ensures that all priorities stay in the int64 range
|
||||||
// recharge queue is a priority queue with currently recharging client nodes
|
// recharge queue is a priority queue with currently recharging client nodes
|
||||||
// as elements. The priority value is rcFullIntValue which allows to quickly
|
// as elements. The priority value is rcFullIntValue which allows to quickly
|
||||||
// determine which client will first finish recharge.
|
// determine which client will first finish recharge.
|
||||||
rcQueue *prque.Prque
|
rcQueue *prque.Prque[int64, *ClientNode]
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClientManager returns a new client manager.
|
// NewClientManager returns a new client manager.
|
||||||
@ -107,7 +108,7 @@ type ClientManager struct {
|
|||||||
func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager {
|
func NewClientManager(curve PieceWiseLinear, clock mclock.Clock) *ClientManager {
|
||||||
cm := &ClientManager{
|
cm := &ClientManager{
|
||||||
clock: clock,
|
clock: clock,
|
||||||
rcQueue: prque.NewWrapAround(func(a interface{}, i int) { a.(*ClientNode).queueIndex = i }),
|
rcQueue: prque.New[int64, *ClientNode](func(a *ClientNode, i int) { a.queueIndex = i }),
|
||||||
capLastUpdate: clock.Now(),
|
capLastUpdate: clock.Now(),
|
||||||
stop: make(chan chan struct{}),
|
stop: make(chan chan struct{}),
|
||||||
}
|
}
|
||||||
@ -288,13 +289,13 @@ func (cm *ClientManager) updateRecharge(now mclock.AbsTime) {
|
|||||||
}
|
}
|
||||||
dt := now - lastUpdate
|
dt := now - lastUpdate
|
||||||
// fetch the client that finishes first
|
// fetch the client that finishes first
|
||||||
rcqNode := cm.rcQueue.PopItem().(*ClientNode) // if sumRecharge > 0 then the queue cannot be empty
|
rcqNode := cm.rcQueue.PopItem() // if sumRecharge > 0 then the queue cannot be empty
|
||||||
// check whether it has already finished
|
// check whether it has already finished
|
||||||
dtNext := mclock.AbsTime(float64(rcqNode.rcFullIntValue-cm.rcLastIntValue) / bonusRatio)
|
dtNext := mclock.AbsTime(float64(rcqNode.rcFullIntValue-cm.rcLastIntValue) / bonusRatio)
|
||||||
if dt < dtNext {
|
if dt < dtNext {
|
||||||
// not finished yet, put it back, update integrator according
|
// not finished yet, put it back, update integrator according
|
||||||
// to current bonusRatio and return
|
// to current bonusRatio and return
|
||||||
cm.rcQueue.Push(rcqNode, -rcqNode.rcFullIntValue)
|
cm.addToQueue(rcqNode)
|
||||||
cm.rcLastIntValue += int64(bonusRatio * float64(dt))
|
cm.rcLastIntValue += int64(bonusRatio * float64(dt))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -308,6 +309,20 @@ func (cm *ClientManager) updateRecharge(now mclock.AbsTime) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cm *ClientManager) addToQueue(node *ClientNode) {
|
||||||
|
if cm.priorityOffset-node.rcFullIntValue < -0x4000000000000000 {
|
||||||
|
cm.priorityOffset += 0x4000000000000000
|
||||||
|
// recreate priority queue with new offset to avoid overflow; should happen very rarely
|
||||||
|
newRcQueue := prque.New[int64, *ClientNode](func(a *ClientNode, i int) { a.queueIndex = i })
|
||||||
|
for cm.rcQueue.Size() > 0 {
|
||||||
|
n := cm.rcQueue.PopItem()
|
||||||
|
newRcQueue.Push(n, cm.priorityOffset-n.rcFullIntValue)
|
||||||
|
}
|
||||||
|
cm.rcQueue = newRcQueue
|
||||||
|
}
|
||||||
|
cm.rcQueue.Push(node, cm.priorityOffset-node.rcFullIntValue)
|
||||||
|
}
|
||||||
|
|
||||||
// updateNodeRc updates a node's corrBufValue and adds an external correction value.
|
// updateNodeRc updates a node's corrBufValue and adds an external correction value.
|
||||||
// It also adds or removes the rcQueue entry and updates ServerParams and sumRecharge if necessary.
|
// It also adds or removes the rcQueue entry and updates ServerParams and sumRecharge if necessary.
|
||||||
func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *ServerParams, now mclock.AbsTime) {
|
func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *ServerParams, now mclock.AbsTime) {
|
||||||
@ -344,7 +359,7 @@ func (cm *ClientManager) updateNodeRc(node *ClientNode, bvc int64, params *Serve
|
|||||||
}
|
}
|
||||||
node.rcLastIntValue = cm.rcLastIntValue
|
node.rcLastIntValue = cm.rcLastIntValue
|
||||||
node.rcFullIntValue = cm.rcLastIntValue + (int64(node.params.BufLimit)-node.corrBufValue)*FixedPointMultiplier/int64(node.params.MinRecharge)
|
node.rcFullIntValue = cm.rcLastIntValue + (int64(node.params.BufLimit)-node.corrBufValue)*FixedPointMultiplier/int64(node.params.MinRecharge)
|
||||||
cm.rcQueue.Push(node, -node.rcFullIntValue)
|
cm.addToQueue(node)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package flowcontrol
|
package flowcontrol
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -44,16 +45,17 @@ const (
|
|||||||
// maximum permitted rate. The max capacity nodes are changed multiple times during
|
// maximum permitted rate. The max capacity nodes are changed multiple times during
|
||||||
// a single test.
|
// a single test.
|
||||||
func TestConstantTotalCapacity(t *testing.T) {
|
func TestConstantTotalCapacity(t *testing.T) {
|
||||||
testConstantTotalCapacity(t, 10, 1, 0)
|
testConstantTotalCapacity(t, 10, 1, 0, false)
|
||||||
testConstantTotalCapacity(t, 10, 1, 1)
|
testConstantTotalCapacity(t, 10, 1, 1, false)
|
||||||
testConstantTotalCapacity(t, 30, 1, 0)
|
testConstantTotalCapacity(t, 30, 1, 0, false)
|
||||||
testConstantTotalCapacity(t, 30, 2, 3)
|
testConstantTotalCapacity(t, 30, 2, 3, false)
|
||||||
testConstantTotalCapacity(t, 100, 1, 0)
|
testConstantTotalCapacity(t, 100, 1, 0, false)
|
||||||
testConstantTotalCapacity(t, 100, 3, 5)
|
testConstantTotalCapacity(t, 100, 3, 5, false)
|
||||||
testConstantTotalCapacity(t, 100, 5, 10)
|
testConstantTotalCapacity(t, 100, 5, 10, false)
|
||||||
|
testConstantTotalCapacity(t, 100, 3, 5, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, randomSend int) {
|
func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, randomSend int, priorityOverflow bool) {
|
||||||
clock := &mclock.Simulated{}
|
clock := &mclock.Simulated{}
|
||||||
nodes := make([]*testNode, nodeCount)
|
nodes := make([]*testNode, nodeCount)
|
||||||
var totalCapacity uint64
|
var totalCapacity uint64
|
||||||
@ -62,6 +64,10 @@ func testConstantTotalCapacity(t *testing.T, nodeCount, maxCapacityNodes, random
|
|||||||
totalCapacity += nodes[i].capacity
|
totalCapacity += nodes[i].capacity
|
||||||
}
|
}
|
||||||
m := NewClientManager(PieceWiseLinear{{0, totalCapacity}}, clock)
|
m := NewClientManager(PieceWiseLinear{{0, totalCapacity}}, clock)
|
||||||
|
if priorityOverflow {
|
||||||
|
// provoke a situation where rcLastUpdate overflow needs to be handled
|
||||||
|
m.rcLastIntValue = math.MaxInt64 - 10000000000
|
||||||
|
}
|
||||||
for _, n := range nodes {
|
for _, n := range nodes {
|
||||||
n.bufLimit = n.capacity * 6000
|
n.bufLimit = n.capacity * 6000
|
||||||
n.node = NewClientNode(m, ServerParams{BufLimit: n.bufLimit, MinRecharge: n.capacity})
|
n.node = NewClientNode(m, ServerParams{BufLimit: n.bufLimit, MinRecharge: n.capacity})
|
||||||
|
@ -39,7 +39,7 @@ type servingQueue struct {
|
|||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
threadCount int // number of currently running threads
|
threadCount int // number of currently running threads
|
||||||
queue *prque.Prque // priority queue for waiting or suspended tasks
|
queue *prque.Prque[int64, *servingTask] // priority queue for waiting or suspended tasks
|
||||||
best *servingTask // the highest priority task (not included in the queue)
|
best *servingTask // the highest priority task (not included in the queue)
|
||||||
suspendBias int64 // priority bias against suspending an already running task
|
suspendBias int64 // priority bias against suspending an already running task
|
||||||
}
|
}
|
||||||
@ -123,7 +123,7 @@ func (t *servingTask) waitOrStop() bool {
|
|||||||
// newServingQueue returns a new servingQueue
|
// newServingQueue returns a new servingQueue
|
||||||
func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
|
func newServingQueue(suspendBias int64, utilTarget float64) *servingQueue {
|
||||||
sq := &servingQueue{
|
sq := &servingQueue{
|
||||||
queue: prque.NewWrapAround(nil),
|
queue: prque.New[int64, *servingTask](nil),
|
||||||
suspendBias: suspendBias,
|
suspendBias: suspendBias,
|
||||||
queueAddCh: make(chan *servingTask, 100),
|
queueAddCh: make(chan *servingTask, 100),
|
||||||
queueBestCh: make(chan *servingTask),
|
queueBestCh: make(chan *servingTask),
|
||||||
@ -214,7 +214,7 @@ func (sq *servingQueue) freezePeers() {
|
|||||||
}
|
}
|
||||||
sq.best = nil
|
sq.best = nil
|
||||||
for sq.queue.Size() > 0 {
|
for sq.queue.Size() > 0 {
|
||||||
task := sq.queue.PopItem().(*servingTask)
|
task := sq.queue.PopItem()
|
||||||
tasks := peerMap[task.peer]
|
tasks := peerMap[task.peer]
|
||||||
if tasks == nil {
|
if tasks == nil {
|
||||||
bufValue, bufLimit := task.peer.fcClient.BufferStatus()
|
bufValue, bufLimit := task.peer.fcClient.BufferStatus()
|
||||||
@ -251,7 +251,7 @@ func (sq *servingQueue) freezePeers() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if sq.queue.Size() > 0 {
|
if sq.queue.Size() > 0 {
|
||||||
sq.best = sq.queue.PopItem().(*servingTask)
|
sq.best = sq.queue.PopItem()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -310,7 +310,7 @@ func (sq *servingQueue) queueLoop() {
|
|||||||
if sq.queue.Size() == 0 {
|
if sq.queue.Size() == 0 {
|
||||||
sq.best = nil
|
sq.best = nil
|
||||||
} else {
|
} else {
|
||||||
sq.best, _ = sq.queue.PopItem().(*servingTask)
|
sq.best = sq.queue.PopItem()
|
||||||
}
|
}
|
||||||
case <-sq.quit:
|
case <-sq.quit:
|
||||||
return
|
return
|
||||||
|
@ -77,8 +77,8 @@ type priorityPool struct {
|
|||||||
// temporary state if tempState is not empty
|
// temporary state if tempState is not empty
|
||||||
tempState []*ppNodeInfo
|
tempState []*ppNodeInfo
|
||||||
activeCount, activeCap uint64
|
activeCount, activeCap uint64
|
||||||
activeQueue *prque.LazyQueue
|
activeQueue *prque.LazyQueue[int64, *ppNodeInfo]
|
||||||
inactiveQueue *prque.Prque
|
inactiveQueue *prque.Prque[int64, *ppNodeInfo]
|
||||||
}
|
}
|
||||||
|
|
||||||
// ppNodeInfo is the internal node descriptor of priorityPool
|
// ppNodeInfo is the internal node descriptor of priorityPool
|
||||||
@ -104,7 +104,7 @@ func newPriorityPool(ns *nodestate.NodeStateMachine, setup *serverSetup, clock m
|
|||||||
setup: setup,
|
setup: setup,
|
||||||
ns: ns,
|
ns: ns,
|
||||||
clock: clock,
|
clock: clock,
|
||||||
inactiveQueue: prque.New(inactiveSetIndex),
|
inactiveQueue: prque.New[int64, *ppNodeInfo](inactiveSetIndex),
|
||||||
minCap: minCap,
|
minCap: minCap,
|
||||||
activeBias: activeBias,
|
activeBias: activeBias,
|
||||||
capacityStepDiv: capacityStepDiv,
|
capacityStepDiv: capacityStepDiv,
|
||||||
@ -183,8 +183,7 @@ func (pp *priorityPool) requestCapacity(node *enode.Node, minTarget, maxTarget u
|
|||||||
}
|
}
|
||||||
pp.setTempCapacity(c, maxTarget)
|
pp.setTempCapacity(c, maxTarget)
|
||||||
c.minTarget = minTarget
|
c.minTarget = minTarget
|
||||||
pp.activeQueue.Remove(c.activeIndex)
|
pp.removeFromQueues(c)
|
||||||
pp.inactiveQueue.Remove(c.inactiveIndex)
|
|
||||||
pp.activeQueue.Push(c)
|
pp.activeQueue.Push(c)
|
||||||
pp.enforceLimits()
|
pp.enforceLimits()
|
||||||
updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
|
updates := pp.finalizeChanges(c.tempCapacity >= minTarget && c.tempCapacity <= maxTarget && c.tempCapacity != c.capacity)
|
||||||
@ -250,13 +249,13 @@ func (pp *priorityPool) Limits() (uint64, uint64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// inactiveSetIndex callback updates ppNodeInfo item index in inactiveQueue
|
// inactiveSetIndex callback updates ppNodeInfo item index in inactiveQueue
|
||||||
func inactiveSetIndex(a interface{}, index int) {
|
func inactiveSetIndex(a *ppNodeInfo, index int) {
|
||||||
a.(*ppNodeInfo).inactiveIndex = index
|
a.inactiveIndex = index
|
||||||
}
|
}
|
||||||
|
|
||||||
// activeSetIndex callback updates ppNodeInfo item index in activeQueue
|
// activeSetIndex callback updates ppNodeInfo item index in activeQueue
|
||||||
func activeSetIndex(a interface{}, index int) {
|
func activeSetIndex(a *ppNodeInfo, index int) {
|
||||||
a.(*ppNodeInfo).activeIndex = index
|
a.activeIndex = index
|
||||||
}
|
}
|
||||||
|
|
||||||
// invertPriority inverts a priority value. The active queue uses inverted priorities
|
// invertPriority inverts a priority value. The active queue uses inverted priorities
|
||||||
@ -269,8 +268,7 @@ func invertPriority(p int64) int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// activePriority callback returns actual priority of ppNodeInfo item in activeQueue
|
// activePriority callback returns actual priority of ppNodeInfo item in activeQueue
|
||||||
func activePriority(a interface{}) int64 {
|
func activePriority(c *ppNodeInfo) int64 {
|
||||||
c := a.(*ppNodeInfo)
|
|
||||||
if c.bias == 0 {
|
if c.bias == 0 {
|
||||||
return invertPriority(c.nodePriority.priority(c.tempCapacity))
|
return invertPriority(c.nodePriority.priority(c.tempCapacity))
|
||||||
} else {
|
} else {
|
||||||
@ -279,8 +277,7 @@ func activePriority(a interface{}) int64 {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// activeMaxPriority callback returns estimated maximum priority of ppNodeInfo item in activeQueue
|
// activeMaxPriority callback returns estimated maximum priority of ppNodeInfo item in activeQueue
|
||||||
func (pp *priorityPool) activeMaxPriority(a interface{}, until mclock.AbsTime) int64 {
|
func (pp *priorityPool) activeMaxPriority(c *ppNodeInfo, until mclock.AbsTime) int64 {
|
||||||
c := a.(*ppNodeInfo)
|
|
||||||
future := time.Duration(until - pp.clock.Now())
|
future := time.Duration(until - pp.clock.Now())
|
||||||
if future < 0 {
|
if future < 0 {
|
||||||
future = 0
|
future = 0
|
||||||
@ -293,6 +290,16 @@ func (pp *priorityPool) inactivePriority(p *ppNodeInfo) int64 {
|
|||||||
return p.nodePriority.priority(pp.minCap)
|
return p.nodePriority.priority(pp.minCap)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// removeFromQueues removes the node from the active/inactive queues
|
||||||
|
func (pp *priorityPool) removeFromQueues(c *ppNodeInfo) {
|
||||||
|
if c.activeIndex >= 0 {
|
||||||
|
pp.activeQueue.Remove(c.activeIndex)
|
||||||
|
}
|
||||||
|
if c.inactiveIndex >= 0 {
|
||||||
|
pp.inactiveQueue.Remove(c.inactiveIndex)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// connectNode is called when a new node has been added to the pool (inactiveFlag set)
|
// connectNode is called when a new node has been added to the pool (inactiveFlag set)
|
||||||
// Note: this function should run inside a NodeStateMachine operation
|
// Note: this function should run inside a NodeStateMachine operation
|
||||||
func (pp *priorityPool) connectNode(c *ppNodeInfo) {
|
func (pp *priorityPool) connectNode(c *ppNodeInfo) {
|
||||||
@ -320,8 +327,7 @@ func (pp *priorityPool) disconnectNode(c *ppNodeInfo) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.connected = false
|
c.connected = false
|
||||||
pp.activeQueue.Remove(c.activeIndex)
|
pp.removeFromQueues(c)
|
||||||
pp.inactiveQueue.Remove(c.inactiveIndex)
|
|
||||||
|
|
||||||
var updates []capUpdate
|
var updates []capUpdate
|
||||||
if c.capacity != 0 {
|
if c.capacity != 0 {
|
||||||
@ -411,11 +417,11 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) {
|
|||||||
return nil, math.MinInt64
|
return nil, math.MinInt64
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
c *ppNodeInfo
|
lastNode *ppNodeInfo
|
||||||
maxActivePriority int64
|
maxActivePriority int64
|
||||||
)
|
)
|
||||||
pp.activeQueue.MultiPop(func(data interface{}, priority int64) bool {
|
pp.activeQueue.MultiPop(func(c *ppNodeInfo, priority int64) bool {
|
||||||
c = data.(*ppNodeInfo)
|
lastNode = c
|
||||||
pp.setTempState(c)
|
pp.setTempState(c)
|
||||||
maxActivePriority = priority
|
maxActivePriority = priority
|
||||||
if c.tempCapacity == c.minTarget || pp.activeCount > pp.maxCount {
|
if c.tempCapacity == c.minTarget || pp.activeCount > pp.maxCount {
|
||||||
@ -433,7 +439,7 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) {
|
|||||||
}
|
}
|
||||||
return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount
|
return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount
|
||||||
})
|
})
|
||||||
return c, invertPriority(maxActivePriority)
|
return lastNode, invertPriority(maxActivePriority)
|
||||||
}
|
}
|
||||||
|
|
||||||
// finalizeChanges either commits or reverts temporary changes. The necessary capacity
|
// finalizeChanges either commits or reverts temporary changes. The necessary capacity
|
||||||
@ -442,8 +448,7 @@ func (pp *priorityPool) enforceLimits() (*ppNodeInfo, int64) {
|
|||||||
func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
|
func (pp *priorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
|
||||||
for _, c := range pp.tempState {
|
for _, c := range pp.tempState {
|
||||||
// always remove and push back in order to update biased priority
|
// always remove and push back in order to update biased priority
|
||||||
pp.activeQueue.Remove(c.activeIndex)
|
pp.removeFromQueues(c)
|
||||||
pp.inactiveQueue.Remove(c.inactiveIndex)
|
|
||||||
oldCapacity := c.capacity
|
oldCapacity := c.capacity
|
||||||
if commit {
|
if commit {
|
||||||
c.capacity = c.tempCapacity
|
c.capacity = c.tempCapacity
|
||||||
@ -496,7 +501,7 @@ func (pp *priorityPool) updateFlags(updates []capUpdate) {
|
|||||||
// tryActivate tries to activate inactive nodes if possible
|
// tryActivate tries to activate inactive nodes if possible
|
||||||
func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
|
func (pp *priorityPool) tryActivate(commit bool) []capUpdate {
|
||||||
for pp.inactiveQueue.Size() > 0 {
|
for pp.inactiveQueue.Size() > 0 {
|
||||||
c := pp.inactiveQueue.PopItem().(*ppNodeInfo)
|
c := pp.inactiveQueue.PopItem()
|
||||||
pp.setTempState(c)
|
pp.setTempState(c)
|
||||||
pp.setTempBias(c, pp.activeBias)
|
pp.setTempBias(c, pp.activeBias)
|
||||||
pp.setTempCapacity(c, pp.minCap)
|
pp.setTempCapacity(c, pp.minCap)
|
||||||
@ -524,8 +529,7 @@ func (pp *priorityPool) updatePriority(node *enode.Node) {
|
|||||||
pp.lock.Unlock()
|
pp.lock.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pp.activeQueue.Remove(c.activeIndex)
|
pp.removeFromQueues(c)
|
||||||
pp.inactiveQueue.Remove(c.inactiveIndex)
|
|
||||||
if c.capacity != 0 {
|
if c.capacity != 0 {
|
||||||
pp.activeQueue.Push(c)
|
pp.activeQueue.Push(c)
|
||||||
} else {
|
} else {
|
||||||
|
@ -160,7 +160,7 @@ type Sync struct {
|
|||||||
membatch *syncMemBatch // Memory buffer to avoid frequent database writes
|
membatch *syncMemBatch // Memory buffer to avoid frequent database writes
|
||||||
nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path
|
nodeReqs map[string]*nodeRequest // Pending requests pertaining to a trie node path
|
||||||
codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash
|
codeReqs map[common.Hash]*codeRequest // Pending requests pertaining to a code hash
|
||||||
queue *prque.Prque // Priority queue with the pending requests
|
queue *prque.Prque[int64, any] // Priority queue with the pending requests
|
||||||
fetches map[int]int // Number of active fetches per trie node depth
|
fetches map[int]int // Number of active fetches per trie node depth
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -172,7 +172,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
|
|||||||
membatch: newSyncMemBatch(),
|
membatch: newSyncMemBatch(),
|
||||||
nodeReqs: make(map[string]*nodeRequest),
|
nodeReqs: make(map[string]*nodeRequest),
|
||||||
codeReqs: make(map[common.Hash]*codeRequest),
|
codeReqs: make(map[common.Hash]*codeRequest),
|
||||||
queue: prque.New(nil),
|
queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy
|
||||||
fetches: make(map[int]int),
|
fetches: make(map[int]int),
|
||||||
}
|
}
|
||||||
ts.AddSubTrie(root, nil, common.Hash{}, nil, callback)
|
ts.AddSubTrie(root, nil, common.Hash{}, nil, callback)
|
||||||
|
Loading…
Reference in New Issue
Block a user