les, p2p/simulations/adapters: fix issues found while simulating les (#21761)
This adds a few tiny fixes for les and the p2p simulation framework: LES Parts - Keep the LES-SERVER connection even it's non-synced We had this idea to reject the connections in LES protocol if the les-server itself is not synced. However, in LES protocol we will also receive the connection from another les-server. In this case even the local node is not synced yet, we should keep the tcp connection for other protocols(e.g. eth protocol). - Don't count "invalid message" for non-existing GetBlockHeadersMsg request In the eth syncing mechanism (full sync, fast sync, light sync), it will try to fetch some non-existent blocks or headers(to ensure we indeed download all the missing chain). In this case, it's possible that the les-server will receive the request for non-existent headers. So don't count it as the "invalid message" for scheduling dropping. - Copy the announce object in the closure Before the les-server pushes the latest headers to all connected clients, it will create a closure and queue it in the underlying request scheduler. In some scenarios it's problematic. E.g, in private networks, the block can be mined very fast. So before the first closure is executed, we may already update the latest_announce object. So actually the "announce" object we want to send is replaced. The downsize is the client will receive two announces with the same td and then drop the server. P2P Simulation Framework - Don't double register the protocol services in p2p-simulation "Start". The protocols upon the devp2p are registered in the "New node stage". So don't reigster them again when starting a node in the p2p simulation framework - Add one more new config field "ExternalSigner", in order to use clef service in the framework.
This commit is contained in:
parent
b63e3c37a6
commit
b63bffe820
@ -126,11 +126,7 @@ func (h *serverHandler) handle(p *clientPeer) error {
|
|||||||
p.Log().Debug("Light Ethereum handshake failed", "err", err)
|
p.Log().Debug("Light Ethereum handshake failed", "err", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Reject light clients if server is not synced.
|
// Reject the duplicated peer, otherwise register it to peerset.
|
||||||
if !h.synced() {
|
|
||||||
p.Log().Debug("Light server not synced, rejecting peer")
|
|
||||||
return p2p.DiscRequested
|
|
||||||
}
|
|
||||||
var registered bool
|
var registered bool
|
||||||
if err := h.server.ns.Operation(func() {
|
if err := h.server.ns.Operation(func() {
|
||||||
if h.server.ns.GetField(p.Node(), clientPeerField) != nil {
|
if h.server.ns.GetField(p.Node(), clientPeerField) != nil {
|
||||||
@ -156,7 +152,14 @@ func (h *serverHandler) handle(p *clientPeer) error {
|
|||||||
_, err := p.rw.ReadMsg()
|
_, err := p.rw.ReadMsg()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Reject light clients if server is not synced.
|
||||||
|
//
|
||||||
|
// Put this checking here, so that "non-synced" les-server peers are still allowed
|
||||||
|
// to keep the connection.
|
||||||
|
if !h.synced() {
|
||||||
|
p.Log().Debug("Light server not synced, rejecting peer")
|
||||||
|
return p2p.DiscRequested
|
||||||
|
}
|
||||||
// Disconnect the inbound peer if it's rejected by clientPool
|
// Disconnect the inbound peer if it's rejected by clientPool
|
||||||
if cap, err := h.server.clientPool.connect(p); cap != p.fcParams.MinRecharge || err != nil {
|
if cap, err := h.server.clientPool.connect(p); cap != p.fcParams.MinRecharge || err != nil {
|
||||||
p.Log().Debug("Light Ethereum peer rejected", "err", errFullClientPool)
|
p.Log().Debug("Light Ethereum peer rejected", "err", errFullClientPool)
|
||||||
@ -348,7 +351,6 @@ func (h *serverHandler) handleMsg(p *clientPeer, wg *sync.WaitGroup) error {
|
|||||||
origin = h.blockchain.GetHeaderByNumber(query.Origin.Number)
|
origin = h.blockchain.GetHeaderByNumber(query.Origin.Number)
|
||||||
}
|
}
|
||||||
if origin == nil {
|
if origin == nil {
|
||||||
p.bumpInvalid()
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
headers = append(headers, origin)
|
headers = append(headers, origin)
|
||||||
@ -1006,18 +1008,24 @@ func (b *broadcaster) sendTo(node *enode.Node) {
|
|||||||
}
|
}
|
||||||
if p, _ := b.ns.GetField(node, clientPeerField).(*clientPeer); p != nil {
|
if p, _ := b.ns.GetField(node, clientPeerField).(*clientPeer); p != nil {
|
||||||
if p.headInfo.Td == nil || b.lastAnnounce.Td.Cmp(p.headInfo.Td) > 0 {
|
if p.headInfo.Td == nil || b.lastAnnounce.Td.Cmp(p.headInfo.Td) > 0 {
|
||||||
|
announce := b.lastAnnounce
|
||||||
switch p.announceType {
|
switch p.announceType {
|
||||||
case announceTypeSimple:
|
case announceTypeSimple:
|
||||||
if !p.queueSend(func() { p.sendAnnounce(b.lastAnnounce) }) {
|
if !p.queueSend(func() { p.sendAnnounce(announce) }) {
|
||||||
log.Debug("Drop announcement because queue is full", "number", b.lastAnnounce.Number, "hash", b.lastAnnounce.Hash)
|
log.Debug("Drop announcement because queue is full", "number", announce.Number, "hash", announce.Hash)
|
||||||
|
} else {
|
||||||
|
log.Debug("Sent announcement", "number", announce.Number, "hash", announce.Hash)
|
||||||
}
|
}
|
||||||
case announceTypeSigned:
|
case announceTypeSigned:
|
||||||
if b.signedAnnounce.Hash != b.lastAnnounce.Hash {
|
if b.signedAnnounce.Hash != b.lastAnnounce.Hash {
|
||||||
b.signedAnnounce = b.lastAnnounce
|
b.signedAnnounce = b.lastAnnounce
|
||||||
b.signedAnnounce.sign(b.privateKey)
|
b.signedAnnounce.sign(b.privateKey)
|
||||||
}
|
}
|
||||||
if !p.queueSend(func() { p.sendAnnounce(b.signedAnnounce) }) {
|
announce := b.signedAnnounce
|
||||||
log.Debug("Drop announcement because queue is full", "number", b.lastAnnounce.Number, "hash", b.lastAnnounce.Hash)
|
if !p.queueSend(func() { p.sendAnnounce(announce) }) {
|
||||||
|
log.Debug("Drop announcement because queue is full", "number", announce.Number, "hash", announce.Hash)
|
||||||
|
} else {
|
||||||
|
log.Debug("Sent announcement", "number", announce.Number, "hash", announce.Hash)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.headInfo = blockInfo{b.lastAnnounce.Hash, b.lastAnnounce.Number, b.lastAnnounce.Td}
|
p.headInfo = blockInfo{b.lastAnnounce.Hash, b.lastAnnounce.Number, b.lastAnnounce.Td}
|
||||||
|
@ -99,6 +99,7 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
|
|||||||
Dialer: s,
|
Dialer: s,
|
||||||
EnableMsgEvents: config.EnableMsgEvents,
|
EnableMsgEvents: config.EnableMsgEvents,
|
||||||
},
|
},
|
||||||
|
ExternalSigner: config.ExternalSigner,
|
||||||
NoUSB: true,
|
NoUSB: true,
|
||||||
Logger: log.New("node.id", id.String()),
|
Logger: log.New("node.id", id.String()),
|
||||||
})
|
})
|
||||||
@ -263,7 +264,6 @@ func (sn *SimNode) Start(snapshots map[string][]byte) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
sn.running[name] = service
|
sn.running[name] = service
|
||||||
sn.node.RegisterLifecycle(service)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if regErr != nil {
|
if regErr != nil {
|
||||||
|
@ -107,6 +107,9 @@ type NodeConfig struct {
|
|||||||
// These values need to be checked and acted upon by node Services
|
// These values need to be checked and acted upon by node Services
|
||||||
Properties []string
|
Properties []string
|
||||||
|
|
||||||
|
// ExternalSigner specifies an external URI for a clef-type signer
|
||||||
|
ExternalSigner string
|
||||||
|
|
||||||
// Enode
|
// Enode
|
||||||
node *enode.Node
|
node *enode.Node
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user