les: UDP pre-negotiation of available server capacity (#22183)

This PR implements the first one of the "lespay" UDP queries which
is already useful in itself: the capacity query. The server pool is making
use of this query by doing a cheap UDP query to determine whether it is
worth starting the more expensive TCP connection process.
This commit is contained in:
Felföldi Zsolt 2021-03-01 10:24:20 +01:00 committed by GitHub
parent 498458b410
commit d96870428f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 915 additions and 89 deletions

@ -48,7 +48,7 @@ type LazyQueue struct {
}
type (
PriorityCallback func(data interface{}, now mclock.AbsTime) int64 // actual priority callback
PriorityCallback func(data interface{}) int64 // actual priority callback
MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
)
@ -139,11 +139,10 @@ func (q *LazyQueue) peekIndex() int {
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
now := q.clock.Now()
nextIndex := q.peekIndex()
for nextIndex != -1 {
data := heap.Pop(q.queue[nextIndex]).(*item).value
heap.Push(q.popQueue, &item{data, q.priority(data, now)})
heap.Push(q.popQueue, &item{data, q.priority(data)})
nextIndex = q.peekIndex()
for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
i := heap.Pop(q.popQueue).(*item)

@ -40,7 +40,7 @@ type lazyItem struct {
index int
}
func testPriority(a interface{}, now mclock.AbsTime) int64 {
func testPriority(a interface{}) int64 {
return a.(*lazyItem).p
}

@ -36,13 +36,16 @@ import (
"github.com/ethereum/go-ethereum/eth/gasprice"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/les/vflux"
vfc "github.com/ethereum/go-ethereum/les/vflux/client"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
)
@ -58,7 +61,7 @@ type LightEthereum struct {
txPool *light.TxPool
blockchain *light.LightChain
serverPool *vfc.ServerPool
dialCandidates enode.Iterator
serverPoolIterator enode.Iterator
pruner *pruner
bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
@ -112,7 +115,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
p2pConfig: &stack.Config().P2P,
}
leth.serverPool, leth.dialCandidates = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, nil, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool, leth.serverPoolIterator = vfc.NewServerPool(lesDb, []byte("serverpool:"), time.Second, leth.prenegQuery, &mclock.System{}, config.UltraLightServers, requestList)
leth.serverPool.AddMetrics(suggestedTimeoutGauge, totalValueGauge, serverSelectableGauge, serverConnectedGauge, sessionValueMeter, serverDialedMeter)
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.GetTimeout)
@ -189,6 +192,62 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
return leth, nil
}
// VfluxRequest sends a batch of requests to the given node through discv5 UDP TalkRequest and returns the responses
func (s *LightEthereum) VfluxRequest(n *enode.Node, reqs vflux.Requests) vflux.Replies {
reqsEnc, _ := rlp.EncodeToBytes(&reqs)
repliesEnc, _ := s.p2pServer.DiscV5.TalkRequest(s.serverPool.DialNode(n), "vfx", reqsEnc)
var replies vflux.Replies
if len(repliesEnc) == 0 || rlp.DecodeBytes(repliesEnc, &replies) != nil {
return nil
}
return replies
}
// vfxVersion returns the version number of the "les" service subdomain of the vflux UDP
// service, as advertised in the ENR record
func (s *LightEthereum) vfxVersion(n *enode.Node) uint {
if n.Seq() == 0 {
var err error
if n, err = s.p2pServer.DiscV5.RequestENR(n); n != nil && err == nil && n.Seq() != 0 {
s.serverPool.Persist(n)
} else {
return 0
}
}
var les []rlp.RawValue
if err := n.Load(enr.WithEntry("les", &les)); err != nil || len(les) < 1 {
return 0
}
var version uint
rlp.DecodeBytes(les[0], &version) // Ignore additional fields (for forward compatibility).
return version
}
// prenegQuery sends a capacity query to the given server node to determine whether
// a connection slot is immediately available
func (s *LightEthereum) prenegQuery(n *enode.Node) int {
if s.vfxVersion(n) < 1 {
// UDP query not supported, always try TCP connection
return 1
}
var requests vflux.Requests
requests.Add("les", vflux.CapacityQueryName, vflux.CapacityQueryReq{
Bias: 180,
AddTokens: []vflux.IntOrInf{{}},
})
replies := s.VfluxRequest(n, requests)
var cqr vflux.CapacityQueryReply
if replies.Get(0, &cqr) != nil || len(cqr) != 1 { // Note: Get returns an error if replies is nil
return -1
}
if cqr[0] > 0 {
return 1
}
return 0
}
type LightDummyAPI struct{}
// Etherbase is the address that mining rewards will be send to
@ -269,7 +328,7 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
return p.Info()
}
return nil
}, s.dialCandidates)
}, s.serverPoolIterator)
}
// Start implements node.Lifecycle, starting all internal goroutines needed by the

@ -24,11 +24,13 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/les/vflux"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/ethereum/go-ethereum/p2p/nodestate"
"github.com/ethereum/go-ethereum/rlp"
)
const (
@ -382,3 +384,56 @@ func (f *clientPool) forClients(ids []enode.ID, cb func(client *clientInfo)) {
}
}
}
// serveCapQuery serves a vflux capacity query. It receives multiple token amount values
// and a bias time value. For each given token amount it calculates the maximum achievable
// capacity in case the amount is added to the balance.
func (f *clientPool) serveCapQuery(id enode.ID, freeID string, data []byte) []byte {
var req vflux.CapacityQueryReq
if rlp.DecodeBytes(data, &req) != nil {
return nil
}
if l := len(req.AddTokens); l == 0 || l > vflux.CapacityQueryMaxLen {
return nil
}
node := f.ns.GetNode(id)
if node == nil {
node = enode.SignNull(&enr.Record{}, id)
}
c, _ := f.ns.GetField(node, clientInfoField).(*clientInfo)
if c == nil {
c = &clientInfo{node: node}
f.ns.SetField(node, clientInfoField, c)
f.ns.SetField(node, connAddressField, freeID)
defer func() {
f.ns.SetField(node, connAddressField, nil)
f.ns.SetField(node, clientInfoField, nil)
}()
if c.balance, _ = f.ns.GetField(node, f.BalanceField).(*vfs.NodeBalance); c.balance == nil {
log.Error("BalanceField is missing", "node", node.ID())
return nil
}
}
// use vfs.CapacityCurve to answer request for multiple newly bought token amounts
curve := f.pp.GetCapacityCurve().Exclude(id)
result := make(vflux.CapacityQueryReply, len(req.AddTokens))
bias := time.Second * time.Duration(req.Bias)
if f.connectedBias > bias {
bias = f.connectedBias
}
pb, _ := c.balance.GetBalance()
for i, addTokens := range req.AddTokens {
add := addTokens.Int64()
result[i] = curve.MaxCapacity(func(capacity uint64) int64 {
return c.balance.EstimatePriority(capacity, add, 0, bias, false) / int64(capacity)
})
if add <= 0 && uint64(-add) >= pb && result[i] > f.minCap {
result[i] = f.minCap
}
if result[i] < f.minCap {
result[i] = 0
}
}
reply, _ := rlp.EncodeToBytes(&result)
return reply
}

@ -508,8 +508,10 @@ func TestNegativeBalanceCalculation(t *testing.T) {
for i := 0; i < 10; i++ {
pool.disconnect(newPoolTestPeer(i, nil))
_, nb := getBalance(pool, newPoolTestPeer(i, nil))
if checkDiff(nb, uint64(time.Minute)/1000) {
t.Fatalf("Negative balance mismatch, want %v, got %v", uint64(time.Minute)/1000, nb)
exp := uint64(time.Minute) / 1000
exp -= exp / 120 // correct for negative balance expiration
if checkDiff(nb, exp) {
t.Fatalf("Negative balance mismatch, want %v, got %v", exp, nb)
}
}
}

@ -27,7 +27,8 @@ import (
// lesEntry is the "les" ENR entry. This is set for LES servers only.
type lesEntry struct {
// Ignore additional fields (for forward compatibility).
_ []rlp.RawValue `rlp:"tail"`
VfxVersion uint
Rest []rlp.RawValue `rlp:"tail"`
}
func (lesEntry) ENRKey() string { return "les" }

@ -26,6 +26,7 @@ import (
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/les/flowcontrol"
"github.com/ethereum/go-ethereum/les/vflux"
vfs "github.com/ethereum/go-ethereum/les/vflux/server"
"github.com/ethereum/go-ethereum/light"
"github.com/ethereum/go-ethereum/log"
@ -68,6 +69,7 @@ type LesServer struct {
archiveMode bool // Flag whether the ethereum node runs in archive mode.
handler *serverHandler
broadcaster *broadcaster
vfluxServer *vfs.Server
privateKey *ecdsa.PrivateKey
// Flow control and capacity management
@ -112,12 +114,14 @@ func NewLesServer(node *node.Node, e ethBackend, config *ethconfig.Config) (*Les
ns: ns,
archiveMode: e.ArchiveMode(),
broadcaster: newBroadcaster(ns),
vfluxServer: vfs.NewServer(time.Millisecond * 10),
fcManager: flowcontrol.NewClientManager(nil, &mclock.System{}),
servingQueue: newServingQueue(int64(time.Millisecond*10), float64(config.LightServ)/100),
threadsBusy: config.LightServ/100 + 1,
threadsIdle: threads,
p2pSrv: node.Server(),
}
srv.vfluxServer.Register(srv)
issync := e.Synced
if config.LightNoSyncServe {
issync = func() bool { return true }
@ -201,7 +205,9 @@ func (s *LesServer) Protocols() []p2p.Protocol {
}, nil)
// Add "les" ENR entries.
for i := range ps {
ps[i].Attributes = []enr.Entry{&lesEntry{}}
ps[i].Attributes = []enr.Entry{&lesEntry{
VfxVersion: 1,
}}
}
return ps
}
@ -211,10 +217,11 @@ func (s *LesServer) Start() error {
s.privateKey = s.p2pSrv.PrivateKey
s.broadcaster.setSignerKey(s.privateKey)
s.handler.start()
s.wg.Add(1)
go s.capacityManagement()
if s.p2pSrv.DiscV5 != nil {
s.p2pSrv.DiscV5.RegisterTalkHandler("vfx", s.vfluxServer.ServeEncoded)
}
return nil
}
@ -228,6 +235,7 @@ func (s *LesServer) Stop() error {
s.costTracker.stop()
s.handler.stop()
s.servingQueue.stop()
s.vfluxServer.Stop()
// Note, bloom trie indexer is closed by parent bloombits indexer.
s.chtIndexer.Close()
@ -311,3 +319,18 @@ func (s *LesServer) dropClient(id enode.ID) {
p.Peer.Disconnect(p2p.DiscRequested)
}
}
// ServiceInfo implements vfs.Service
func (s *LesServer) ServiceInfo() (string, string) {
return "les", "Ethereum light client service"
}
// Handle implements vfs.Service
func (s *LesServer) Handle(id enode.ID, address string, name string, data []byte) []byte {
switch name {
case vflux.CapacityQueryName:
return s.clientPool.serveCapQuery(id, address, data)
default:
return nil
}
}

@ -94,7 +94,7 @@ type nodeHistoryEnc struct {
type queryFunc func(*enode.Node) int
var (
clientSetup = &nodestate.Setup{Version: 1}
clientSetup = &nodestate.Setup{Version: 2}
sfHasValue = clientSetup.NewPersistentFlag("hasValue")
sfQueried = clientSetup.NewFlag("queried")
sfCanDial = clientSetup.NewFlag("canDial")
@ -131,9 +131,25 @@ var (
)
sfiNodeWeight = clientSetup.NewField("nodeWeight", reflect.TypeOf(uint64(0)))
sfiConnectedStats = clientSetup.NewField("connectedStats", reflect.TypeOf(ResponseTimeStats{}))
sfiLocalAddress = clientSetup.NewPersistentField("localAddress", reflect.TypeOf(&enr.Record{}),
func(field interface{}) ([]byte, error) {
if enr, ok := field.(*enr.Record); ok {
enc, err := rlp.EncodeToBytes(enr)
return enc, err
}
return nil, errors.New("invalid field type")
},
func(enc []byte) (interface{}, error) {
var enr enr.Record
if err := rlp.DecodeBytes(enc, &enr); err != nil {
return nil, err
}
return &enr, nil
},
)
)
// newServerPool creates a new server pool
// NewServerPool creates a new server pool
func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duration, query queryFunc, clock mclock.Clock, trustedURLs []string, requestList []RequestInfo) (*ServerPool, enode.Iterator) {
s := &ServerPool{
db: db,
@ -151,15 +167,10 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio
s.mixSources = append(s.mixSources, knownSelector)
s.mixSources = append(s.mixSources, alwaysConnect)
iter := enode.Iterator(s.mixer)
s.dialIterator = s.mixer
if query != nil {
iter = s.addPreNegFilter(iter, query)
s.dialIterator = s.addPreNegFilter(s.dialIterator, query)
}
s.dialIterator = enode.Filter(iter, func(node *enode.Node) bool {
s.ns.SetState(node, sfDialing, sfCanDial, 0)
s.ns.SetState(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
return true
})
s.ns.SubscribeState(nodestate.MergeFlags(sfWaitDialTimeout, sfConnected), func(n *enode.Node, oldState, newState nodestate.Flags) {
if oldState.Equals(sfWaitDialTimeout) && newState.IsEmpty() {
@ -169,7 +180,41 @@ func NewServerPool(db ethdb.KeyValueStore, dbKey []byte, mixTimeout time.Duratio
}
})
return s, s.dialIterator
return s, &serverPoolIterator{
dialIterator: s.dialIterator,
nextFn: func(node *enode.Node) {
s.ns.Operation(func() {
s.ns.SetStateSub(node, sfDialing, sfCanDial, 0)
s.ns.SetStateSub(node, sfWaitDialTimeout, nodestate.Flags{}, time.Second*10)
})
},
nodeFn: s.DialNode,
}
}
type serverPoolIterator struct {
dialIterator enode.Iterator
nextFn func(*enode.Node)
nodeFn func(*enode.Node) *enode.Node
}
// Next implements enode.Iterator
func (s *serverPoolIterator) Next() bool {
if s.dialIterator.Next() {
s.nextFn(s.dialIterator.Node())
return true
}
return false
}
// Node implements enode.Iterator
func (s *serverPoolIterator) Node() *enode.Node {
return s.nodeFn(s.dialIterator.Node())
}
// Close implements enode.Iterator
func (s *serverPoolIterator) Close() {
s.dialIterator.Close()
}
// AddMetrics adds metrics to the server pool. Should be called before Start().
@ -285,7 +330,6 @@ func (s *ServerPool) Start() {
// stop stops the server pool
func (s *ServerPool) Stop() {
s.dialIterator.Close()
if s.fillSet != nil {
s.fillSet.Close()
}
@ -299,18 +343,23 @@ func (s *ServerPool) Stop() {
s.vt.Stop()
}
// registerPeer implements serverPeerSubscriber
// RegisterNode implements serverPeerSubscriber
func (s *ServerPool) RegisterNode(node *enode.Node) (*NodeValueTracker, error) {
if atomic.LoadUint32(&s.started) == 0 {
return nil, errors.New("server pool not started yet")
}
s.ns.SetState(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
nvt := s.vt.Register(node.ID())
s.ns.SetField(node, sfiConnectedStats, nvt.RtStats())
s.ns.Operation(func() {
s.ns.SetStateSub(node, sfConnected, sfDialing.Or(sfWaitDialTimeout), 0)
s.ns.SetFieldSub(node, sfiConnectedStats, nvt.RtStats())
if node.IP().IsLoopback() {
s.ns.SetFieldSub(node, sfiLocalAddress, node.Record())
}
})
return nvt, nil
}
// unregisterPeer implements serverPeerSubscriber
// UnregisterNode implements serverPeerSubscriber
func (s *ServerPool) UnregisterNode(node *enode.Node) {
s.ns.Operation(func() {
s.setRedialWait(node, dialCost, dialWaitStep)
@ -430,6 +479,7 @@ func (s *ServerPool) updateWeight(node *enode.Node, totalValue float64, totalDia
s.ns.SetStateSub(node, nodestate.Flags{}, sfHasValue, 0)
s.ns.SetFieldSub(node, sfiNodeWeight, nil)
s.ns.SetFieldSub(node, sfiNodeHistory, nil)
s.ns.SetFieldSub(node, sfiLocalAddress, nil)
}
s.ns.Persist(node) // saved if node history or hasValue changed
}
@ -520,3 +570,28 @@ func (s *ServerPool) calculateWeight(node *enode.Node) {
func (s *ServerPool) API() *PrivateClientAPI {
return NewPrivateClientAPI(s.vt)
}
type dummyIdentity enode.ID
func (id dummyIdentity) Verify(r *enr.Record, sig []byte) error { return nil }
func (id dummyIdentity) NodeAddr(r *enr.Record) []byte { return id[:] }
// DialNode replaces the given enode with a locally generated one containing the ENR
// stored in the sfiLocalAddress field if present. This workaround ensures that nodes
// on the local network can be dialed at the local address if a connection has been
// successfully established previously.
// Note that NodeStateMachine always remembers the enode with the latest version of
// the remote signed ENR. ENR filtering should be performed on that version while
// dialNode should be used for dialing the node over TCP or UDP.
func (s *ServerPool) DialNode(n *enode.Node) *enode.Node {
if enr, ok := s.ns.GetField(n, sfiLocalAddress).(*enr.Record); ok {
n, _ := enode.New(dummyIdentity(n.ID()), enr)
return n
}
return n
}
// Persist immediately stores the state of a node in the node database
func (s *ServerPool) Persist(n *enode.Node) {
s.ns.Persist(n)
}

@ -56,6 +56,7 @@ type ServerPoolTest struct {
preNeg, preNegFail bool
vt *ValueTracker
sp *ServerPool
spi enode.Iterator
input enode.Iterator
testNodes []spTestNode
trusted []string
@ -148,7 +149,7 @@ func (s *ServerPoolTest) start() {
requestList[i] = RequestInfo{Name: "testreq" + strconv.Itoa(i), InitAmount: 1, InitValue: 1}
}
s.sp, _ = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList)
s.sp, s.spi = NewServerPool(s.db, []byte("sp:"), 0, testQuery, s.clock, s.trusted, requestList)
s.sp.AddSource(s.input)
s.sp.validSchemes = enode.ValidSchemesForTesting
s.sp.unixTime = func() int64 { return int64(s.clock.Now()) / int64(time.Second) }
@ -176,6 +177,7 @@ func (s *ServerPoolTest) start() {
func (s *ServerPoolTest) stop() {
close(s.quit)
s.sp.Stop()
s.spi.Close()
for i := range s.testNodes {
n := &s.testNodes[i]
if n.connected {
@ -208,9 +210,9 @@ func (s *ServerPoolTest) run() {
if s.conn < spTestTarget {
s.dialCount++
s.beginWait()
s.sp.dialIterator.Next()
s.spi.Next()
s.endWait()
dial := s.sp.dialIterator.Node()
dial := s.spi.Node()
id := dial.ID()
idx := testNodeIndex(id)
n := &s.testNodes[idx]

180
les/vflux/requests.go Normal file

@ -0,0 +1,180 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package vflux
import (
"errors"
"math"
"math/big"
"github.com/ethereum/go-ethereum/rlp"
)
var ErrNoReply = errors.New("no reply for given request")
const (
MaxRequestLength = 16 // max number of individual requests in a batch
CapacityQueryName = "cq"
CapacityQueryMaxLen = 16
)
type (
// Request describes a single vflux request inside a batch. Service and request
// type are identified by strings, parameters are RLP encoded.
Request struct {
Service, Name string
Params []byte
}
// Requests are a batch of vflux requests
Requests []Request
// Replies are the replies to a batch of requests
Replies [][]byte
// CapacityQueryReq is the encoding format of the capacity query
CapacityQueryReq struct {
Bias uint64 // seconds
AddTokens []IntOrInf
}
// CapacityQueryReq is the encoding format of the response to the capacity query
CapacityQueryReply []uint64
)
// Add encodes and adds a new request to the batch
func (r *Requests) Add(service, name string, val interface{}) (int, error) {
enc, err := rlp.EncodeToBytes(val)
if err != nil {
return -1, err
}
*r = append(*r, Request{
Service: service,
Name: name,
Params: enc,
})
return len(*r) - 1, nil
}
// Get decodes the reply to the i-th request in the batch
func (r Replies) Get(i int, val interface{}) error {
if i < 0 || i >= len(r) {
return ErrNoReply
}
return rlp.DecodeBytes(r[i], val)
}
const (
IntNonNegative = iota
IntNegative
IntPlusInf
IntMinusInf
)
// IntOrInf is the encoding format for arbitrary length signed integers that can also
// hold the values of +Inf or -Inf
type IntOrInf struct {
Type uint8
Value big.Int
}
// BigInt returns the value as a big.Int or panics if the value is infinity
func (i *IntOrInf) BigInt() *big.Int {
switch i.Type {
case IntNonNegative:
return new(big.Int).Set(&i.Value)
case IntNegative:
return new(big.Int).Neg(&i.Value)
case IntPlusInf:
panic(nil) // caller should check Inf() before trying to convert to big.Int
case IntMinusInf:
panic(nil)
}
return &big.Int{} // invalid type decodes to 0 value
}
// Inf returns 1 if the value is +Inf, -1 if it is -Inf, 0 otherwise
func (i *IntOrInf) Inf() int {
switch i.Type {
case IntPlusInf:
return 1
case IntMinusInf:
return -1
}
return 0 // invalid type decodes to 0 value
}
// Int64 limits the value between MinInt64 and MaxInt64 (even if it is +-Inf) and returns an int64 type
func (i *IntOrInf) Int64() int64 {
switch i.Type {
case IntNonNegative:
if i.Value.IsInt64() {
return i.Value.Int64()
} else {
return math.MaxInt64
}
case IntNegative:
if i.Value.IsInt64() {
return -i.Value.Int64()
} else {
return math.MinInt64
}
case IntPlusInf:
return math.MaxInt64
case IntMinusInf:
return math.MinInt64
}
return 0 // invalid type decodes to 0 value
}
// SetBigInt sets the value to the given big.Int
func (i *IntOrInf) SetBigInt(v *big.Int) {
if v.Sign() >= 0 {
i.Type = IntNonNegative
i.Value.Set(v)
} else {
i.Type = IntNegative
i.Value.Neg(v)
}
}
// SetInt64 sets the value to the given int64. Note that MaxInt64 translates to +Inf
// while MinInt64 translates to -Inf.
func (i *IntOrInf) SetInt64(v int64) {
if v >= 0 {
if v == math.MaxInt64 {
i.Type = IntPlusInf
} else {
i.Type = IntNonNegative
i.Value.SetInt64(v)
}
} else {
if v == math.MinInt64 {
i.Type = IntMinusInf
} else {
i.Type = IntNegative
i.Value.SetInt64(-v)
}
}
}
// SetInf sets the value to +Inf or -Inf
func (i *IntOrInf) SetInf(sign int) {
if sign == 1 {
i.Type = IntPlusInf
} else {
i.Type = IntMinusInf
}
}

@ -243,11 +243,11 @@ func (n *NodeBalance) RequestServed(cost uint64) uint64 {
}
// Priority returns the actual priority based on the current balance
func (n *NodeBalance) Priority(now mclock.AbsTime, capacity uint64) int64 {
func (n *NodeBalance) Priority(capacity uint64) int64 {
n.lock.Lock()
defer n.lock.Unlock()
n.updateBalance(now)
n.updateBalance(n.bt.clock.Now())
return n.balanceToPriority(n.balance, capacity)
}
@ -256,16 +256,35 @@ func (n *NodeBalance) Priority(now mclock.AbsTime, capacity uint64) int64 {
// in the current session.
// If update is true then a priority callback is added that turns UpdateFlag on and off
// in case the priority goes below the estimated minimum.
func (n *NodeBalance) EstMinPriority(at mclock.AbsTime, capacity uint64, update bool) int64 {
func (n *NodeBalance) EstimatePriority(capacity uint64, addBalance int64, future, bias time.Duration, update bool) int64 {
n.lock.Lock()
defer n.lock.Unlock()
now := n.bt.clock.Now()
n.updateBalance(now)
b := n.balance
if addBalance != 0 {
offset := n.bt.posExp.LogOffset(now)
old := n.balance.pos.Value(offset)
if addBalance > 0 && (addBalance > maxBalance || old > maxBalance-uint64(addBalance)) {
b.pos = utils.ExpiredValue{}
b.pos.Add(maxBalance, offset)
} else {
b.pos.Add(addBalance, offset)
}
}
if future > 0 {
var avgReqCost float64
dt := time.Duration(n.lastUpdate - n.initTime)
if dt > time.Second {
avgReqCost = float64(n.sumReqCost) * 2 / float64(dt)
}
pri := n.balanceToPriority(n.reducedBalance(at, capacity, avgReqCost), capacity)
b = n.reducedBalance(b, now, future, capacity, avgReqCost)
}
if bias > 0 {
b = n.reducedBalance(b, now+mclock.AbsTime(future), bias, capacity, 0)
}
pri := n.balanceToPriority(b, capacity)
if update {
n.addCallback(balanceCallbackUpdate, pri, n.signalPriorityUpdate)
}
@ -366,7 +385,7 @@ func (n *NodeBalance) deactivate() {
// updateBalance updates balance based on the time factor
func (n *NodeBalance) updateBalance(now mclock.AbsTime) {
if n.active && now > n.lastUpdate {
n.balance = n.reducedBalance(now, n.capacity, 0)
n.balance = n.reducedBalance(n.balance, n.lastUpdate, time.Duration(now-n.lastUpdate), n.capacity, 0)
n.lastUpdate = now
}
}
@ -546,23 +565,25 @@ func (n *NodeBalance) balanceToPriority(b balance, capacity uint64) int64 {
}
// reducedBalance estimates the reduced balance at a given time in the fututre based
// on the current balance, the time factor and an estimated average request cost per time ratio
func (n *NodeBalance) reducedBalance(at mclock.AbsTime, capacity uint64, avgReqCost float64) balance {
dt := float64(at - n.lastUpdate)
b := n.balance
// on the given balance, the time factor and an estimated average request cost per time ratio
func (n *NodeBalance) reducedBalance(b balance, start mclock.AbsTime, dt time.Duration, capacity uint64, avgReqCost float64) balance {
// since the costs are applied continuously during the dt time period we calculate
// the expiration offset at the middle of the period
at := start + mclock.AbsTime(dt/2)
dtf := float64(dt)
if !b.pos.IsZero() {
factor := n.posFactor.timePrice(capacity) + n.posFactor.RequestFactor*avgReqCost
diff := -int64(dt * factor)
diff := -int64(dtf * factor)
dd := b.pos.Add(diff, n.bt.posExp.LogOffset(at))
if dd == diff {
dt = 0
dtf = 0
} else {
dt += float64(dd) / factor
dtf += float64(dd) / factor
}
}
if dt > 0 {
factor := n.negFactor.timePrice(capacity) + n.negFactor.RequestFactor*avgReqCost
b.neg.Add(int64(dt*factor), n.bt.negExp.LogOffset(at))
b.neg.Add(int64(dtf*factor), n.bt.negExp.LogOffset(at))
}
return b
}
@ -588,8 +609,9 @@ func (n *NodeBalance) timeUntil(priority int64) (time.Duration, bool) {
}
dt = float64(posBalance-newBalance) / timePrice
return time.Duration(dt), true
}
} else {
dt = float64(posBalance) / timePrice
}
} else {
if priority > 0 {
return 0, false

@ -231,7 +231,7 @@ func TestBalanceToPriority(t *testing.T) {
}
for _, i := range inputs {
node.SetBalance(i.pos, i.neg)
priority := node.Priority(b.clock.Now(), 1000)
priority := node.Priority(1000)
if priority != i.priority {
t.Fatalf("Priority mismatch, want %v, got %v", i.priority, priority)
}
@ -272,7 +272,7 @@ func TestEstimatedPriority(t *testing.T) {
for _, i := range inputs {
b.clock.Run(i.runTime)
node.RequestServed(i.reqCost)
priority := node.EstMinPriority(b.clock.Now()+mclock.AbsTime(i.futureTime), 1000000000, false)
priority := node.EstimatePriority(1000000000, 0, i.futureTime, 0, false)
if priority != i.priority {
t.Fatalf("Estimated priority mismatch, want %v, got %v", i.priority, priority)
}

@ -101,17 +101,21 @@ type PriorityPool struct {
minCap uint64
activeBias time.Duration
capacityStepDiv uint64
cachedCurve *CapacityCurve
ccUpdatedAt mclock.AbsTime
ccUpdateForced bool
}
// nodePriority interface provides current and estimated future priorities on demand
type nodePriority interface {
// Priority should return the current priority of the node (higher is better)
Priority(now mclock.AbsTime, cap uint64) int64
Priority(cap uint64) int64
// EstMinPriority should return a lower estimate for the minimum of the node priority
// value starting from the current moment until the given time. If the priority goes
// under the returned estimate before the specified moment then it is the caller's
// responsibility to signal with updateFlag.
EstMinPriority(until mclock.AbsTime, cap uint64, update bool) int64
EstimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64
}
// ppNodeInfo is the internal node descriptor of PriorityPool
@ -131,12 +135,12 @@ func NewPriorityPool(ns *nodestate.NodeStateMachine, setup PriorityPoolSetup, cl
ns: ns,
PriorityPoolSetup: setup,
clock: clock,
activeQueue: prque.NewLazyQueue(activeSetIndex, activePriority, activeMaxPriority, clock, lazyQueueRefresh),
inactiveQueue: prque.New(inactiveSetIndex),
minCap: minCap,
activeBias: activeBias,
capacityStepDiv: capacityStepDiv,
}
pp.activeQueue = prque.NewLazyQueue(activeSetIndex, activePriority, pp.activeMaxPriority, clock, lazyQueueRefresh)
ns.SubscribeField(pp.priorityField, func(node *enode.Node, state nodestate.Flags, oldValue, newValue interface{}) {
if newValue != nil {
@ -197,6 +201,9 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias
if targetCap < pp.minCap {
targetCap = pp.minCap
}
if bias < pp.activeBias {
bias = pp.activeBias
}
c, _ := pp.ns.GetField(node, pp.ppNodeInfoField).(*ppNodeInfo)
if c == nil {
log.Error("RequestCapacity called for unknown node", "id", node.ID())
@ -204,9 +211,9 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias
}
var priority int64
if targetCap > c.capacity {
priority = c.nodePriority.EstMinPriority(pp.clock.Now()+mclock.AbsTime(bias), targetCap, false)
priority = c.nodePriority.EstimatePriority(targetCap, 0, 0, bias, false)
} else {
priority = c.nodePriority.Priority(pp.clock.Now(), targetCap)
priority = c.nodePriority.Priority(targetCap)
}
pp.markForChange(c)
pp.setCapacity(c, targetCap)
@ -214,7 +221,7 @@ func (pp *PriorityPool) RequestCapacity(node *enode.Node, targetCap uint64, bias
pp.activeQueue.Remove(c.activeIndex)
pp.inactiveQueue.Remove(c.inactiveIndex)
pp.activeQueue.Push(c)
minPriority = pp.enforceLimits()
_, minPriority = pp.enforceLimits()
// if capacity update is possible now then minPriority == math.MinInt64
// if it is not possible at all then minPriority == math.MaxInt64
allowed = priority > minPriority
@ -281,29 +288,34 @@ func invertPriority(p int64) int64 {
}
// activePriority callback returns actual priority of ppNodeInfo item in activeQueue
func activePriority(a interface{}, now mclock.AbsTime) int64 {
func activePriority(a interface{}) int64 {
c := a.(*ppNodeInfo)
if c.forced {
return math.MinInt64
}
if c.bias == 0 {
return invertPriority(c.nodePriority.Priority(now, c.capacity))
return invertPriority(c.nodePriority.Priority(c.capacity))
} else {
return invertPriority(c.nodePriority.EstimatePriority(c.capacity, 0, 0, c.bias, true))
}
return invertPriority(c.nodePriority.EstMinPriority(now+mclock.AbsTime(c.bias), c.capacity, true))
}
// activeMaxPriority callback returns estimated maximum priority of ppNodeInfo item in activeQueue
func activeMaxPriority(a interface{}, until mclock.AbsTime) int64 {
func (pp *PriorityPool) activeMaxPriority(a interface{}, until mclock.AbsTime) int64 {
c := a.(*ppNodeInfo)
if c.forced {
return math.MinInt64
}
return invertPriority(c.nodePriority.EstMinPriority(until+mclock.AbsTime(c.bias), c.capacity, false))
future := time.Duration(until - pp.clock.Now())
if future < 0 {
future = 0
}
return invertPriority(c.nodePriority.EstimatePriority(c.capacity, 0, future, c.bias, false))
}
// inactivePriority callback returns actual priority of ppNodeInfo item in inactiveQueue
func (pp *PriorityPool) inactivePriority(p *ppNodeInfo) int64 {
return p.nodePriority.Priority(pp.clock.Now(), pp.minCap)
return p.nodePriority.Priority(pp.minCap)
}
// connectedNode is called when a new node has been added to the pool (InactiveFlag set)
@ -379,16 +391,19 @@ func (pp *PriorityPool) setCapacity(n *ppNodeInfo, cap uint64) {
// enforceLimits enforces active node count and total capacity limits. It returns the
// lowest active node priority. Note that this function is performed on the temporary
// internal state.
func (pp *PriorityPool) enforceLimits() int64 {
func (pp *PriorityPool) enforceLimits() (*ppNodeInfo, int64) {
if pp.activeCap <= pp.maxCap && pp.activeCount <= pp.maxCount {
return math.MinInt64
return nil, math.MinInt64
}
var maxActivePriority int64
var (
c *ppNodeInfo
maxActivePriority int64
)
pp.activeQueue.MultiPop(func(data interface{}, priority int64) bool {
c := data.(*ppNodeInfo)
c = data.(*ppNodeInfo)
pp.markForChange(c)
maxActivePriority = priority
if c.capacity == pp.minCap {
if c.capacity == pp.minCap || pp.activeCount > pp.maxCount {
pp.setCapacity(c, 0)
} else {
sub := c.capacity / pp.capacityStepDiv
@ -400,7 +415,7 @@ func (pp *PriorityPool) enforceLimits() int64 {
}
return pp.activeCap > pp.maxCap || pp.activeCount > pp.maxCount
})
return invertPriority(maxActivePriority)
return c, invertPriority(maxActivePriority)
}
// finalizeChanges either commits or reverts temporary changes. The necessary capacity
@ -430,6 +445,9 @@ func (pp *PriorityPool) finalizeChanges(commit bool) (updates []capUpdate) {
c.origCap = 0
}
pp.changed = nil
if commit {
pp.ccUpdateForced = true
}
return
}
@ -472,6 +490,7 @@ func (pp *PriorityPool) tryActivate() []capUpdate {
break
}
}
pp.ccUpdateForced = true
return pp.finalizeChanges(commit)
}
@ -500,3 +519,150 @@ func (pp *PriorityPool) updatePriority(node *enode.Node) {
}
updates = pp.tryActivate()
}
// CapacityCurve is a snapshot of the priority pool contents in a format that can efficiently
// estimate how much capacity could be granted to a given node at a given priority level.
type CapacityCurve struct {
points []curvePoint // curve points sorted in descending order of priority
index map[enode.ID][]int // curve point indexes belonging to each node
exclude []int // curve point indexes of excluded node
excludeFirst bool // true if activeCount == maxCount
}
type curvePoint struct {
freeCap uint64 // available capacity and node count at the current priority level
nextPri int64 // next priority level where more capacity will be available
}
// GetCapacityCurve returns a new or recently cached CapacityCurve based on the contents of the pool
func (pp *PriorityPool) GetCapacityCurve() *CapacityCurve {
pp.lock.Lock()
defer pp.lock.Unlock()
now := pp.clock.Now()
dt := time.Duration(now - pp.ccUpdatedAt)
if !pp.ccUpdateForced && pp.cachedCurve != nil && dt < time.Second*10 {
return pp.cachedCurve
}
pp.ccUpdateForced = false
pp.ccUpdatedAt = now
curve := &CapacityCurve{
index: make(map[enode.ID][]int),
}
pp.cachedCurve = curve
var excludeID enode.ID
excludeFirst := pp.maxCount == pp.activeCount
// reduce node capacities or remove nodes until nothing is left in the queue;
// record the available capacity and the necessary priority after each step
for pp.activeCap > 0 {
cp := curvePoint{}
if pp.activeCap > pp.maxCap {
log.Error("Active capacity is greater than allowed maximum", "active", pp.activeCap, "maximum", pp.maxCap)
} else {
cp.freeCap = pp.maxCap - pp.activeCap
}
// temporarily increase activeCap to enforce reducing or removing a node capacity
tempCap := cp.freeCap + 1
pp.activeCap += tempCap
var next *ppNodeInfo
// enforceLimits removes the lowest priority node if it has minimal capacity,
// otherwise reduces its capacity
next, cp.nextPri = pp.enforceLimits()
pp.activeCap -= tempCap
if next == nil {
log.Error("GetCapacityCurve: cannot remove next element from the priority queue")
break
}
id := next.node.ID()
if excludeFirst {
// if the node count limit is already reached then mark the node with the
// lowest priority for exclusion
curve.excludeFirst = true
excludeID = id
excludeFirst = false
}
// multiple curve points and therefore multiple indexes may belong to a node
// if it was removed in multiple steps (if its capacity was more than the minimum)
curve.index[id] = append(curve.index[id], len(curve.points))
curve.points = append(curve.points, cp)
}
// restore original state of the queue
pp.finalizeChanges(false)
curve.points = append(curve.points, curvePoint{
freeCap: pp.maxCap,
nextPri: math.MaxInt64,
})
if curve.excludeFirst {
curve.exclude = curve.index[excludeID]
}
return curve
}
// Exclude returns a CapacityCurve with the given node excluded from the original curve
func (cc *CapacityCurve) Exclude(id enode.ID) *CapacityCurve {
if exclude, ok := cc.index[id]; ok {
// return a new version of the curve (only one excluded node can be selected)
// Note: if the first node was excluded by default (excludeFirst == true) then
// we can forget about that and exclude the node with the given id instead.
return &CapacityCurve{
points: cc.points,
index: cc.index,
exclude: exclude,
}
}
return cc
}
func (cc *CapacityCurve) getPoint(i int) curvePoint {
cp := cc.points[i]
if i == 0 && cc.excludeFirst {
cp.freeCap = 0
return cp
}
for ii := len(cc.exclude) - 1; ii >= 0; ii-- {
ei := cc.exclude[ii]
if ei < i {
break
}
e1, e2 := cc.points[ei], cc.points[ei+1]
cp.freeCap += e2.freeCap - e1.freeCap
}
return cp
}
// MaxCapacity calculates the maximum capacity available for a node with a given
// (monotonically decreasing) priority vs. capacity function. Note that if the requesting
// node is already in the pool then it should be excluded from the curve in order to get
// the correct result.
func (cc *CapacityCurve) MaxCapacity(priority func(cap uint64) int64) uint64 {
min, max := 0, len(cc.points)-1 // the curve always has at least one point
for min < max {
mid := (min + max) / 2
cp := cc.getPoint(mid)
if cp.freeCap == 0 || priority(cp.freeCap) > cp.nextPri {
min = mid + 1
} else {
max = mid
}
}
cp2 := cc.getPoint(min)
if cp2.freeCap == 0 || min == 0 {
return cp2.freeCap
}
cp1 := cc.getPoint(min - 1)
if priority(cp2.freeCap) > cp1.nextPri {
return cp2.freeCap
}
minc, maxc := cp1.freeCap, cp2.freeCap-1
for minc < maxc {
midc := (minc + maxc + 1) / 2
if midc == 0 || priority(midc) > cp1.nextPri {
minc = midc
} else {
maxc = midc - 1
}
}
return maxc
}

@ -20,6 +20,7 @@ import (
"math/rand"
"reflect"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p/enode"
@ -42,6 +43,7 @@ func init() {
const (
testCapacityStepDiv = 100
testCapacityToleranceDiv = 10
testMinCap = 100
)
type ppTestClient struct {
@ -49,11 +51,11 @@ type ppTestClient struct {
balance, cap uint64
}
func (c *ppTestClient) Priority(now mclock.AbsTime, cap uint64) int64 {
func (c *ppTestClient) Priority(cap uint64) int64 {
return int64(c.balance / cap)
}
func (c *ppTestClient) EstMinPriority(until mclock.AbsTime, cap uint64, update bool) int64 {
func (c *ppTestClient) EstimatePriority(cap uint64, addBalance int64, future, bias time.Duration, update bool) int64 {
return int64(c.balance / cap)
}
@ -67,7 +69,7 @@ func TestPriorityPool(t *testing.T) {
c.cap = newValue.(uint64)
}
})
pp := NewPriorityPool(ns, ppTestSetup, clock, 100, 0, testCapacityStepDiv)
pp := NewPriorityPool(ns, ppTestSetup, clock, testMinCap, 0, testCapacityStepDiv)
ns.Start()
pp.SetLimits(100, 1000000)
clients := make([]*ppTestClient, 100)
@ -94,7 +96,7 @@ func TestPriorityPool(t *testing.T) {
for i := range clients {
c := &ppTestClient{
node: enode.SignNull(&enr.Record{}, enode.ID{byte(i)}),
balance: 1000000000,
balance: 100000000000,
cap: 1000,
}
sumBalance += c.balance
@ -109,7 +111,7 @@ func TestPriorityPool(t *testing.T) {
for count := 0; count < 100; count++ {
c := clients[rand.Intn(len(clients))]
oldBalance := c.balance
c.balance = uint64(rand.Int63n(1000000000) + 1000000000)
c.balance = uint64(rand.Int63n(100000000000) + 100000000000)
sumBalance += c.balance - oldBalance
pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0)
pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0)
@ -120,10 +122,124 @@ func TestPriorityPool(t *testing.T) {
raise(c)
}
}
// check whether capacities are proportional to balances
for _, c := range clients {
check(c)
}
if count%10 == 0 {
// test available capacity calculation with capacity curve
c = clients[rand.Intn(len(clients))]
curve := pp.GetCapacityCurve().Exclude(c.node.ID())
add := uint64(rand.Int63n(10000000000000))
c.balance += add
sumBalance += add
expCap := curve.MaxCapacity(func(cap uint64) int64 {
return int64(c.balance / cap)
})
//fmt.Println(expCap, c.balance, sumBalance)
/*for i, cp := range curve.points {
fmt.Println("cp", i, cp, "ex", curve.getPoint(i))
}*/
var ok bool
expFail := expCap + 1
if expFail < testMinCap {
expFail = testMinCap
}
ns.Operation(func() {
_, ok = pp.RequestCapacity(c.node, expFail, 0, true)
})
if ok {
t.Errorf("Request for more than expected available capacity succeeded")
}
if expCap >= testMinCap {
ns.Operation(func() {
_, ok = pp.RequestCapacity(c.node, expCap, 0, true)
})
if !ok {
t.Errorf("Request for expected available capacity failed")
}
}
c.balance -= add
sumBalance -= add
pp.ns.SetState(c.node, ppUpdateFlag, nodestate.Flags{}, 0)
pp.ns.SetState(c.node, nodestate.Flags{}, ppUpdateFlag, 0)
for _, c := range clients {
raise(c)
}
}
}
ns.Stop()
}
func TestCapacityCurve(t *testing.T) {
clock := &mclock.Simulated{}
ns := nodestate.NewNodeStateMachine(nil, nil, clock, testSetup)
pp := NewPriorityPool(ns, ppTestSetup, clock, 400000, 0, 2)
ns.Start()
pp.SetLimits(10, 10000000)
clients := make([]*ppTestClient, 10)
for i := range clients {
c := &ppTestClient{
node: enode.SignNull(&enr.Record{}, enode.ID{byte(i)}),
balance: 100000000000 * uint64(i+1),
cap: 1000000,
}
clients[i] = c
ns.SetState(c.node, ppTestClientFlag, nodestate.Flags{}, 0)
ns.SetField(c.node, ppTestSetup.priorityField, c)
ns.SetState(c.node, ppTestSetup.InactiveFlag, nodestate.Flags{}, 0)
ns.Operation(func() {
pp.RequestCapacity(c.node, c.cap, 0, true)
})
}
curve := pp.GetCapacityCurve()
check := func(balance, expCap uint64) {
cap := curve.MaxCapacity(func(cap uint64) int64 {
return int64(balance / cap)
})
var fail bool
if cap == 0 || expCap == 0 {
fail = cap != expCap
} else {
pri := balance / cap
expPri := balance / expCap
fail = pri != expPri && pri != expPri+1
}
if fail {
t.Errorf("Incorrect capacity for %d balance (got %d, expected %d)", balance, cap, expCap)
}
}
check(0, 0)
check(10000000000, 100000)
check(50000000000, 500000)
check(100000000000, 1000000)
check(200000000000, 1000000)
check(300000000000, 1500000)
check(450000000000, 1500000)
check(600000000000, 2000000)
check(800000000000, 2000000)
check(1000000000000, 2500000)
pp.SetLimits(11, 10000000)
curve = pp.GetCapacityCurve()
check(0, 0)
check(10000000000, 100000)
check(50000000000, 500000)
check(150000000000, 750000)
check(200000000000, 1000000)
check(220000000000, 1100000)
check(275000000000, 1100000)
check(375000000000, 1500000)
check(450000000000, 1500000)
check(600000000000, 2000000)
check(800000000000, 2000000)
check(1000000000000, 2500000)
ns.Stop()
}

122
les/vflux/server/service.go Normal file

@ -0,0 +1,122 @@
// Copyright 2020 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package server
import (
"net"
"strings"
"sync"
"time"
"github.com/ethereum/go-ethereum/les/utils"
"github.com/ethereum/go-ethereum/les/vflux"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
)
type (
// Server serves vflux requests
Server struct {
limiter *utils.Limiter
lock sync.Mutex
services map[string]*serviceEntry
delayPerRequest time.Duration
}
// Service is a service registered at the Server and identified by a string id
Service interface {
ServiceInfo() (id, desc string) // only called during registration
Handle(id enode.ID, address string, name string, data []byte) []byte // never called concurrently
}
serviceEntry struct {
id, desc string
backend Service
}
)
// NewServer creates a new Server
func NewServer(delayPerRequest time.Duration) *Server {
return &Server{
limiter: utils.NewLimiter(1000),
delayPerRequest: delayPerRequest,
services: make(map[string]*serviceEntry),
}
}
// Register registers a Service
func (s *Server) Register(b Service) {
srv := &serviceEntry{backend: b}
srv.id, srv.desc = b.ServiceInfo()
if strings.Contains(srv.id, ":") {
// srv.id + ":" will be used as a service database prefix
log.Error("Service ID contains ':'", "id", srv.id)
return
}
s.lock.Lock()
s.services[srv.id] = srv
s.lock.Unlock()
}
// Serve serves a vflux request batch
// Note: requests are served by the Handle functions of the registered services. Serve
// may be called concurrently but the Handle functions are called sequentially and
// therefore thread safety is guaranteed.
func (s *Server) Serve(id enode.ID, address string, requests vflux.Requests) vflux.Replies {
reqLen := uint(len(requests))
if reqLen == 0 || reqLen > vflux.MaxRequestLength {
return nil
}
// Note: the value parameter will be supplied by the token sale module (total amount paid)
ch := <-s.limiter.Add(id, address, 0, reqLen)
if ch == nil {
return nil
}
// Note: the limiter ensures that the following section is not running concurrently,
// the lock only protects against contention caused by new service registration
s.lock.Lock()
results := make(vflux.Replies, len(requests))
for i, req := range requests {
if service := s.services[req.Service]; service != nil {
results[i] = service.backend.Handle(id, address, req.Name, req.Params)
}
}
s.lock.Unlock()
time.Sleep(s.delayPerRequest * time.Duration(reqLen))
close(ch)
return results
}
// ServeEncoded serves an encoded vflux request batch and returns the encoded replies
func (s *Server) ServeEncoded(id enode.ID, addr *net.UDPAddr, req []byte) []byte {
var requests vflux.Requests
if err := rlp.DecodeBytes(req, &requests); err != nil {
return nil
}
results := s.Serve(id, addr.String(), requests)
if results == nil {
return nil
}
res, _ := rlp.EncodeToBytes(&results)
return res
}
// Stop shuts down the server
func (s *Server) Stop() {
s.limiter.Stop()
}

@ -74,7 +74,7 @@ type UDPv5 struct {
// talkreq handler registry
trlock sync.Mutex
trhandlers map[string]func([]byte) []byte
trhandlers map[string]TalkRequestHandler
// channels into dispatch
packetInCh chan ReadPacket
@ -96,6 +96,9 @@ type UDPv5 struct {
wg sync.WaitGroup
}
// TalkRequestHandler callback processes a talk request and optionally returns a reply
type TalkRequestHandler func(enode.ID, *net.UDPAddr, []byte) []byte
// callV5 represents a remote procedure call against another node.
type callV5 struct {
node *enode.Node
@ -145,7 +148,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
log: cfg.Log,
validSchemes: cfg.ValidSchemes,
clock: cfg.Clock,
trhandlers: make(map[string]func([]byte) []byte),
trhandlers: make(map[string]TalkRequestHandler),
// channels into dispatch
packetInCh: make(chan ReadPacket, 1),
readNextCh: make(chan struct{}, 1),
@ -233,7 +236,7 @@ func (t *UDPv5) LocalNode() *enode.LocalNode {
// RegisterTalkHandler adds a handler for 'talk requests'. The handler function is called
// whenever a request for the given protocol is received and should return the response
// data or nil.
func (t *UDPv5) RegisterTalkHandler(protocol string, handler func([]byte) []byte) {
func (t *UDPv5) RegisterTalkHandler(protocol string, handler TalkRequestHandler) {
t.trlock.Lock()
defer t.trlock.Unlock()
t.trhandlers[protocol] = handler
@ -841,7 +844,7 @@ func (t *UDPv5) handleTalkRequest(p *v5wire.TalkRequest, fromID enode.ID, fromAd
var response []byte
if handler != nil {
response = handler(p.Message)
response = handler(fromID, fromAddr, p.Message)
}
resp := &v5wire.TalkResponse{ReqID: p.ReqID, Message: response}
t.sendResponse(fromID, fromAddr, resp)

@ -435,7 +435,7 @@ func TestUDPv5_talkHandling(t *testing.T) {
defer test.close()
var recvMessage []byte
test.udp.RegisterTalkHandler("test", func(message []byte) []byte {
test.udp.RegisterTalkHandler("test", func(id enode.ID, addr *net.UDPAddr, message []byte) []byte {
recvMessage = message
return []byte("test response")
})

@ -599,6 +599,7 @@ func (ns *NodeStateMachine) updateEnode(n *enode.Node) (enode.ID, *nodeInfo) {
node := ns.nodes[id]
if node != nil && n.Seq() > node.node.Seq() {
node.node = n
node.dirty = true
}
return id, node
}