p2p/simulations: various stability fixes (#15198)
p2p/simulations: introduce dialBan - Refactor simulations/network connection getters to support avoiding simultaneous dials between two peers If two peers dial simultaneously, the connection will be dropped to help avoid that, we essentially lock the connection object with a timestamp which serves as a ban on dialing for a period of time (dialBanTimeout). - The connection getter InitConn can be wrapped and passed to the nodes via adapters.NodeConfig#Reachable field and then used by the respective services when they initiate connections. This massively stablise the emerging connectivity when running with hundreds of nodes bootstrapping a network. p2p: add Inbound public method to p2p.Peer p2p/simulations: Add server id to logs to support debugging in-memory network simulations when multiple peers are logging. p2p: SetupConn now returns error. The dialer checks the error and only calls resolve if the actual TCP dial fails.
This commit is contained in:
parent
73067fd24f
commit
54aeb8e4c0
@ -135,6 +135,9 @@ type Config struct {
|
|||||||
// *WARNING* Only set this if the node is running in a trusted network, exposing
|
// *WARNING* Only set this if the node is running in a trusted network, exposing
|
||||||
// private APIs to untrusted users is a major security risk.
|
// private APIs to untrusted users is a major security risk.
|
||||||
WSExposeAll bool `toml:",omitempty"`
|
WSExposeAll bool `toml:",omitempty"`
|
||||||
|
|
||||||
|
// Logger is a custom logger to use with the p2p.Server.
|
||||||
|
Logger log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
|
// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
|
||||||
|
33
node/node.go
33
node/node.go
@ -69,6 +69,8 @@ type Node struct {
|
|||||||
|
|
||||||
stop chan struct{} // Channel to wait for termination notifications
|
stop chan struct{} // Channel to wait for termination notifications
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
|
||||||
|
log log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates a new P2P node, ready for protocol registration.
|
// New creates a new P2P node, ready for protocol registration.
|
||||||
@ -101,6 +103,9 @@ func New(conf *Config) (*Node, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if conf.Logger == nil {
|
||||||
|
conf.Logger = log.New()
|
||||||
|
}
|
||||||
// Note: any interaction with Config that would create/touch files
|
// Note: any interaction with Config that would create/touch files
|
||||||
// in the data directory or instance directory is delayed until Start.
|
// in the data directory or instance directory is delayed until Start.
|
||||||
return &Node{
|
return &Node{
|
||||||
@ -112,6 +117,7 @@ func New(conf *Config) (*Node, error) {
|
|||||||
httpEndpoint: conf.HTTPEndpoint(),
|
httpEndpoint: conf.HTTPEndpoint(),
|
||||||
wsEndpoint: conf.WSEndpoint(),
|
wsEndpoint: conf.WSEndpoint(),
|
||||||
eventmux: new(event.TypeMux),
|
eventmux: new(event.TypeMux),
|
||||||
|
log: conf.Logger,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,6 +152,7 @@ func (n *Node) Start() error {
|
|||||||
n.serverConfig = n.config.P2P
|
n.serverConfig = n.config.P2P
|
||||||
n.serverConfig.PrivateKey = n.config.NodeKey()
|
n.serverConfig.PrivateKey = n.config.NodeKey()
|
||||||
n.serverConfig.Name = n.config.NodeName()
|
n.serverConfig.Name = n.config.NodeName()
|
||||||
|
n.serverConfig.Logger = n.log
|
||||||
if n.serverConfig.StaticNodes == nil {
|
if n.serverConfig.StaticNodes == nil {
|
||||||
n.serverConfig.StaticNodes = n.config.StaticNodes()
|
n.serverConfig.StaticNodes = n.config.StaticNodes()
|
||||||
}
|
}
|
||||||
@ -156,7 +163,7 @@ func (n *Node) Start() error {
|
|||||||
n.serverConfig.NodeDatabase = n.config.NodeDB()
|
n.serverConfig.NodeDatabase = n.config.NodeDB()
|
||||||
}
|
}
|
||||||
running := &p2p.Server{Config: n.serverConfig}
|
running := &p2p.Server{Config: n.serverConfig}
|
||||||
log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
|
n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
|
||||||
|
|
||||||
// Otherwise copy and specialize the P2P configuration
|
// Otherwise copy and specialize the P2P configuration
|
||||||
services := make(map[reflect.Type]Service)
|
services := make(map[reflect.Type]Service)
|
||||||
@ -280,7 +287,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
|
|||||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace))
|
n.log.Debug(fmt.Sprintf("InProc registered %T under '%s'", api.Service, api.Namespace))
|
||||||
}
|
}
|
||||||
n.inprocHandler = handler
|
n.inprocHandler = handler
|
||||||
return nil
|
return nil
|
||||||
@ -306,7 +313,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
|
|||||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace))
|
n.log.Debug(fmt.Sprintf("IPC registered %T under '%s'", api.Service, api.Namespace))
|
||||||
}
|
}
|
||||||
// All APIs registered, start the IPC listener
|
// All APIs registered, start the IPC listener
|
||||||
var (
|
var (
|
||||||
@ -317,7 +324,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint))
|
n.log.Info(fmt.Sprintf("IPC endpoint opened: %s", n.ipcEndpoint))
|
||||||
|
|
||||||
for {
|
for {
|
||||||
conn, err := listener.Accept()
|
conn, err := listener.Accept()
|
||||||
@ -330,7 +337,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Not closed, just some error; report and continue
|
// Not closed, just some error; report and continue
|
||||||
log.Error(fmt.Sprintf("IPC accept failed: %v", err))
|
n.log.Error(fmt.Sprintf("IPC accept failed: %v", err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
|
go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation|rpc.OptionSubscriptions)
|
||||||
@ -349,7 +356,7 @@ func (n *Node) stopIPC() {
|
|||||||
n.ipcListener.Close()
|
n.ipcListener.Close()
|
||||||
n.ipcListener = nil
|
n.ipcListener = nil
|
||||||
|
|
||||||
log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint))
|
n.log.Info(fmt.Sprintf("IPC endpoint closed: %s", n.ipcEndpoint))
|
||||||
}
|
}
|
||||||
if n.ipcHandler != nil {
|
if n.ipcHandler != nil {
|
||||||
n.ipcHandler.Stop()
|
n.ipcHandler.Stop()
|
||||||
@ -375,7 +382,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
|
|||||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace))
|
n.log.Debug(fmt.Sprintf("HTTP registered %T under '%s'", api.Service, api.Namespace))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// All APIs registered, start the HTTP listener
|
// All APIs registered, start the HTTP listener
|
||||||
@ -387,7 +394,7 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go rpc.NewHTTPServer(cors, handler).Serve(listener)
|
go rpc.NewHTTPServer(cors, handler).Serve(listener)
|
||||||
log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint))
|
n.log.Info(fmt.Sprintf("HTTP endpoint opened: http://%s", endpoint))
|
||||||
|
|
||||||
// All listeners booted successfully
|
// All listeners booted successfully
|
||||||
n.httpEndpoint = endpoint
|
n.httpEndpoint = endpoint
|
||||||
@ -403,7 +410,7 @@ func (n *Node) stopHTTP() {
|
|||||||
n.httpListener.Close()
|
n.httpListener.Close()
|
||||||
n.httpListener = nil
|
n.httpListener = nil
|
||||||
|
|
||||||
log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint))
|
n.log.Info(fmt.Sprintf("HTTP endpoint closed: http://%s", n.httpEndpoint))
|
||||||
}
|
}
|
||||||
if n.httpHandler != nil {
|
if n.httpHandler != nil {
|
||||||
n.httpHandler.Stop()
|
n.httpHandler.Stop()
|
||||||
@ -429,7 +436,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
|
|||||||
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace))
|
n.log.Debug(fmt.Sprintf("WebSocket registered %T under '%s'", api.Service, api.Namespace))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// All APIs registered, start the HTTP listener
|
// All APIs registered, start the HTTP listener
|
||||||
@ -441,7 +448,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
|
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
|
||||||
log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
|
n.log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
|
||||||
|
|
||||||
// All listeners booted successfully
|
// All listeners booted successfully
|
||||||
n.wsEndpoint = endpoint
|
n.wsEndpoint = endpoint
|
||||||
@ -457,7 +464,7 @@ func (n *Node) stopWS() {
|
|||||||
n.wsListener.Close()
|
n.wsListener.Close()
|
||||||
n.wsListener = nil
|
n.wsListener = nil
|
||||||
|
|
||||||
log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint))
|
n.log.Info(fmt.Sprintf("WebSocket endpoint closed: ws://%s", n.wsEndpoint))
|
||||||
}
|
}
|
||||||
if n.wsHandler != nil {
|
if n.wsHandler != nil {
|
||||||
n.wsHandler.Stop()
|
n.wsHandler.Stop()
|
||||||
@ -496,7 +503,7 @@ func (n *Node) Stop() error {
|
|||||||
// Release instance directory lock.
|
// Release instance directory lock.
|
||||||
if n.instanceDirLock != nil {
|
if n.instanceDirLock != nil {
|
||||||
if err := n.instanceDirLock.Release(); err != nil {
|
if err := n.instanceDirLock.Release(); err != nil {
|
||||||
log.Error("Can't release datadir lock", "err", err)
|
n.log.Error("Can't release datadir lock", "err", err)
|
||||||
}
|
}
|
||||||
n.instanceDirLock = nil
|
n.instanceDirLock = nil
|
||||||
}
|
}
|
||||||
|
25
p2p/dial.go
25
p2p/dial.go
@ -291,11 +291,14 @@ func (t *dialTask) Do(srv *Server) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
success := t.dial(srv, t.dest)
|
err := t.dial(srv, t.dest)
|
||||||
// Try resolving the ID of static nodes if dialing failed.
|
if err != nil {
|
||||||
if !success && t.flags&staticDialedConn != 0 {
|
log.Trace("Dial error", "task", t, "err", err)
|
||||||
if t.resolve(srv) {
|
// Try resolving the ID of static nodes if dialing failed.
|
||||||
t.dial(srv, t.dest)
|
if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
|
||||||
|
if t.resolve(srv) {
|
||||||
|
t.dial(srv, t.dest)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -334,16 +337,18 @@ func (t *dialTask) resolve(srv *Server) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type dialError struct {
|
||||||
|
error
|
||||||
|
}
|
||||||
|
|
||||||
// dial performs the actual connection attempt.
|
// dial performs the actual connection attempt.
|
||||||
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
|
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
|
||||||
fd, err := srv.Dialer.Dial(dest)
|
fd, err := srv.Dialer.Dial(dest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Trace("Dial error", "task", t, "err", err)
|
return &dialError{err}
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
mfd := newMeteredConn(fd, false)
|
mfd := newMeteredConn(fd, false)
|
||||||
srv.SetupConn(mfd, t.flags, dest)
|
return srv.SetupConn(mfd, t.flags, dest)
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *dialTask) String() string {
|
func (t *dialTask) String() string {
|
||||||
|
@ -160,6 +160,11 @@ func (p *Peer) String() string {
|
|||||||
return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr())
|
return fmt.Sprintf("Peer %x %v", p.rw.id[:8], p.RemoteAddr())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inbound returns true if the peer is an inbound connection
|
||||||
|
func (p *Peer) Inbound() bool {
|
||||||
|
return p.rw.flags&inboundConn != 0
|
||||||
|
}
|
||||||
|
|
||||||
func newPeer(conn *conn, protocols []Protocol) *Peer {
|
func newPeer(conn *conn, protocols []Protocol) *Peer {
|
||||||
protomap := matchProtocols(protocols, conn.caps, conn)
|
protomap := matchProtocols(protocols, conn.caps, conn)
|
||||||
p := &Peer{
|
p := &Peer{
|
||||||
|
@ -139,6 +139,9 @@ type Config struct {
|
|||||||
// If EnableMsgEvents is set then the server will emit PeerEvents
|
// If EnableMsgEvents is set then the server will emit PeerEvents
|
||||||
// whenever a message is sent to or received from a peer
|
// whenever a message is sent to or received from a peer
|
||||||
EnableMsgEvents bool
|
EnableMsgEvents bool
|
||||||
|
|
||||||
|
// Logger is a custom logger to use with the p2p.Server.
|
||||||
|
Logger log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// Server manages all peer connections.
|
// Server manages all peer connections.
|
||||||
@ -172,6 +175,7 @@ type Server struct {
|
|||||||
delpeer chan peerDrop
|
delpeer chan peerDrop
|
||||||
loopWG sync.WaitGroup // loop, listenLoop
|
loopWG sync.WaitGroup // loop, listenLoop
|
||||||
peerFeed event.Feed
|
peerFeed event.Feed
|
||||||
|
log log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerOpFunc func(map[discover.NodeID]*Peer)
|
type peerOpFunc func(map[discover.NodeID]*Peer)
|
||||||
@ -359,7 +363,11 @@ func (srv *Server) Start() (err error) {
|
|||||||
return errors.New("server already running")
|
return errors.New("server already running")
|
||||||
}
|
}
|
||||||
srv.running = true
|
srv.running = true
|
||||||
log.Info("Starting P2P networking")
|
srv.log = srv.Config.Logger
|
||||||
|
if srv.log == nil {
|
||||||
|
srv.log = log.New()
|
||||||
|
}
|
||||||
|
srv.log.Info("Starting P2P networking")
|
||||||
|
|
||||||
// static fields
|
// static fields
|
||||||
if srv.PrivateKey == nil {
|
if srv.PrivateKey == nil {
|
||||||
@ -421,7 +429,7 @@ func (srv *Server) Start() (err error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if srv.NoDial && srv.ListenAddr == "" {
|
if srv.NoDial && srv.ListenAddr == "" {
|
||||||
log.Warn("P2P server will be useless, neither dialing nor listening")
|
srv.log.Warn("P2P server will be useless, neither dialing nor listening")
|
||||||
}
|
}
|
||||||
|
|
||||||
srv.loopWG.Add(1)
|
srv.loopWG.Add(1)
|
||||||
@ -489,7 +497,7 @@ func (srv *Server) run(dialstate dialer) {
|
|||||||
i := 0
|
i := 0
|
||||||
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
|
for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
|
||||||
t := ts[i]
|
t := ts[i]
|
||||||
log.Trace("New dial task", "task", t)
|
srv.log.Trace("New dial task", "task", t)
|
||||||
go func() { t.Do(srv); taskdone <- t }()
|
go func() { t.Do(srv); taskdone <- t }()
|
||||||
runningTasks = append(runningTasks, t)
|
runningTasks = append(runningTasks, t)
|
||||||
}
|
}
|
||||||
@ -517,13 +525,13 @@ running:
|
|||||||
// This channel is used by AddPeer to add to the
|
// This channel is used by AddPeer to add to the
|
||||||
// ephemeral static peer list. Add it to the dialer,
|
// ephemeral static peer list. Add it to the dialer,
|
||||||
// it will keep the node connected.
|
// it will keep the node connected.
|
||||||
log.Debug("Adding static node", "node", n)
|
srv.log.Debug("Adding static node", "node", n)
|
||||||
dialstate.addStatic(n)
|
dialstate.addStatic(n)
|
||||||
case n := <-srv.removestatic:
|
case n := <-srv.removestatic:
|
||||||
// This channel is used by RemovePeer to send a
|
// This channel is used by RemovePeer to send a
|
||||||
// disconnect request to a peer and begin the
|
// disconnect request to a peer and begin the
|
||||||
// stop keeping the node connected
|
// stop keeping the node connected
|
||||||
log.Debug("Removing static node", "node", n)
|
srv.log.Debug("Removing static node", "node", n)
|
||||||
dialstate.removeStatic(n)
|
dialstate.removeStatic(n)
|
||||||
if p, ok := peers[n.ID]; ok {
|
if p, ok := peers[n.ID]; ok {
|
||||||
p.Disconnect(DiscRequested)
|
p.Disconnect(DiscRequested)
|
||||||
@ -536,7 +544,7 @@ running:
|
|||||||
// A task got done. Tell dialstate about it so it
|
// A task got done. Tell dialstate about it so it
|
||||||
// can update its state and remove it from the active
|
// can update its state and remove it from the active
|
||||||
// tasks list.
|
// tasks list.
|
||||||
log.Trace("Dial task done", "task", t)
|
srv.log.Trace("Dial task done", "task", t)
|
||||||
dialstate.taskDone(t, time.Now())
|
dialstate.taskDone(t, time.Now())
|
||||||
delTask(t)
|
delTask(t)
|
||||||
case c := <-srv.posthandshake:
|
case c := <-srv.posthandshake:
|
||||||
@ -565,7 +573,7 @@ running:
|
|||||||
p.events = &srv.peerFeed
|
p.events = &srv.peerFeed
|
||||||
}
|
}
|
||||||
name := truncateName(c.name)
|
name := truncateName(c.name)
|
||||||
log.Debug("Adding p2p peer", "id", c.id, "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
|
srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
|
||||||
peers[c.id] = p
|
peers[c.id] = p
|
||||||
go srv.runPeer(p)
|
go srv.runPeer(p)
|
||||||
}
|
}
|
||||||
@ -585,7 +593,7 @@ running:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Trace("P2P networking is spinning down")
|
srv.log.Trace("P2P networking is spinning down")
|
||||||
|
|
||||||
// Terminate discovery. If there is a running lookup it will terminate soon.
|
// Terminate discovery. If there is a running lookup it will terminate soon.
|
||||||
if srv.ntab != nil {
|
if srv.ntab != nil {
|
||||||
@ -639,7 +647,7 @@ type tempError interface {
|
|||||||
// inbound connections.
|
// inbound connections.
|
||||||
func (srv *Server) listenLoop() {
|
func (srv *Server) listenLoop() {
|
||||||
defer srv.loopWG.Done()
|
defer srv.loopWG.Done()
|
||||||
log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
|
srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))
|
||||||
|
|
||||||
// This channel acts as a semaphore limiting
|
// This channel acts as a semaphore limiting
|
||||||
// active inbound connections that are lingering pre-handshake.
|
// active inbound connections that are lingering pre-handshake.
|
||||||
@ -664,10 +672,10 @@ func (srv *Server) listenLoop() {
|
|||||||
for {
|
for {
|
||||||
fd, err = srv.listener.Accept()
|
fd, err = srv.listener.Accept()
|
||||||
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
|
if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
|
||||||
log.Debug("Temporary read error", "err", err)
|
srv.log.Debug("Temporary read error", "err", err)
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
log.Debug("Read error", "err", err)
|
srv.log.Debug("Read error", "err", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
@ -676,7 +684,7 @@ func (srv *Server) listenLoop() {
|
|||||||
// Reject connections that do not match NetRestrict.
|
// Reject connections that do not match NetRestrict.
|
||||||
if srv.NetRestrict != nil {
|
if srv.NetRestrict != nil {
|
||||||
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
|
if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
|
||||||
log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
|
srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
|
||||||
fd.Close()
|
fd.Close()
|
||||||
slots <- struct{}{}
|
slots <- struct{}{}
|
||||||
continue
|
continue
|
||||||
@ -684,7 +692,7 @@ func (srv *Server) listenLoop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fd = newMeteredConn(fd, true)
|
fd = newMeteredConn(fd, true)
|
||||||
log.Trace("Accepted connection", "addr", fd.RemoteAddr())
|
srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
|
||||||
|
|
||||||
// Spawn the handler. It will give the slot back when the connection
|
// Spawn the handler. It will give the slot back when the connection
|
||||||
// has been established.
|
// has been established.
|
||||||
@ -698,55 +706,65 @@ func (srv *Server) listenLoop() {
|
|||||||
// SetupConn runs the handshakes and attempts to add the connection
|
// SetupConn runs the handshakes and attempts to add the connection
|
||||||
// as a peer. It returns when the connection has been added as a peer
|
// as a peer. It returns when the connection has been added as a peer
|
||||||
// or the handshakes have failed.
|
// or the handshakes have failed.
|
||||||
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) {
|
func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
|
||||||
|
self := srv.Self()
|
||||||
|
if self == nil {
|
||||||
|
return errors.New("shutdown")
|
||||||
|
}
|
||||||
|
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
|
||||||
|
err := srv.setupConn(c, flags, dialDest)
|
||||||
|
if err != nil {
|
||||||
|
c.close(err)
|
||||||
|
srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
|
||||||
// Prevent leftover pending conns from entering the handshake.
|
// Prevent leftover pending conns from entering the handshake.
|
||||||
srv.lock.Lock()
|
srv.lock.Lock()
|
||||||
running := srv.running
|
running := srv.running
|
||||||
srv.lock.Unlock()
|
srv.lock.Unlock()
|
||||||
c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
|
|
||||||
if !running {
|
if !running {
|
||||||
c.close(errServerStopped)
|
return errServerStopped
|
||||||
return
|
|
||||||
}
|
}
|
||||||
// Run the encryption handshake.
|
// Run the encryption handshake.
|
||||||
var err error
|
var err error
|
||||||
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
|
if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
|
||||||
log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
|
srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
|
||||||
c.close(err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
clog := log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
|
clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
|
||||||
// For dialed connections, check that the remote public key matches.
|
// For dialed connections, check that the remote public key matches.
|
||||||
if dialDest != nil && c.id != dialDest.ID {
|
if dialDest != nil && c.id != dialDest.ID {
|
||||||
c.close(DiscUnexpectedIdentity)
|
|
||||||
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
|
clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
|
||||||
return
|
return DiscUnexpectedIdentity
|
||||||
}
|
}
|
||||||
if err := srv.checkpoint(c, srv.posthandshake); err != nil {
|
err = srv.checkpoint(c, srv.posthandshake)
|
||||||
|
if err != nil {
|
||||||
clog.Trace("Rejected peer before protocol handshake", "err", err)
|
clog.Trace("Rejected peer before protocol handshake", "err", err)
|
||||||
c.close(err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
// Run the protocol handshake
|
// Run the protocol handshake
|
||||||
phs, err := c.doProtoHandshake(srv.ourHandshake)
|
phs, err := c.doProtoHandshake(srv.ourHandshake)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
clog.Trace("Failed proto handshake", "err", err)
|
clog.Trace("Failed proto handshake", "err", err)
|
||||||
c.close(err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
if phs.ID != c.id {
|
if phs.ID != c.id {
|
||||||
clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
|
clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
|
||||||
c.close(DiscUnexpectedIdentity)
|
return DiscUnexpectedIdentity
|
||||||
return
|
|
||||||
}
|
}
|
||||||
c.caps, c.name = phs.Caps, phs.Name
|
c.caps, c.name = phs.Caps, phs.Name
|
||||||
if err := srv.checkpoint(c, srv.addpeer); err != nil {
|
err = srv.checkpoint(c, srv.addpeer)
|
||||||
|
if err != nil {
|
||||||
clog.Trace("Rejected peer", "err", err)
|
clog.Trace("Rejected peer", "err", err)
|
||||||
c.close(err)
|
return err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
// If the checks completed successfully, runPeer has now been
|
// If the checks completed successfully, runPeer has now been
|
||||||
// launched by run.
|
// launched by run.
|
||||||
|
clog.Trace("connection set up", "inbound", dialDest == nil)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func truncateName(s string) string {
|
func truncateName(s string) string {
|
||||||
|
@ -27,6 +27,7 @@ import (
|
|||||||
|
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/crypto/sha3"
|
"github.com/ethereum/go-ethereum/crypto/sha3"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -206,6 +207,7 @@ func TestServerTaskScheduling(t *testing.T) {
|
|||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
ntab: fakeTable{},
|
ntab: fakeTable{},
|
||||||
running: true,
|
running: true,
|
||||||
|
log: log.New(),
|
||||||
}
|
}
|
||||||
srv.loopWG.Add(1)
|
srv.loopWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
@ -246,7 +248,12 @@ func TestServerManyTasks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
srv = &Server{quit: make(chan struct{}), ntab: fakeTable{}, running: true}
|
srv = &Server{
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
ntab: fakeTable{},
|
||||||
|
running: true,
|
||||||
|
log: log.New(),
|
||||||
|
}
|
||||||
done = make(chan *testTask)
|
done = make(chan *testTask)
|
||||||
start, end = 0, 0
|
start, end = 0, 0
|
||||||
)
|
)
|
||||||
@ -428,6 +435,7 @@ func TestServerSetupConn(t *testing.T) {
|
|||||||
Protocols: []Protocol{discard},
|
Protocols: []Protocol{discard},
|
||||||
},
|
},
|
||||||
newTransport: func(fd net.Conn) transport { return test.tt },
|
newTransport: func(fd net.Conn) transport { return test.tt },
|
||||||
|
log: log.New(),
|
||||||
}
|
}
|
||||||
if !test.dontstart {
|
if !test.dontstart {
|
||||||
if err := srv.Start(); err != nil {
|
if err := srv.Start(); err != nil {
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/docker/docker/pkg/reexec"
|
"github.com/docker/docker/pkg/reexec"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/node"
|
"github.com/ethereum/go-ethereum/node"
|
||||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||||
)
|
)
|
||||||
@ -94,6 +95,7 @@ func (d *DockerAdapter) NewNode(config *NodeConfig) (Node, error) {
|
|||||||
conf.Stack.P2P.NoDiscovery = true
|
conf.Stack.P2P.NoDiscovery = true
|
||||||
conf.Stack.P2P.NAT = nil
|
conf.Stack.P2P.NAT = nil
|
||||||
conf.Stack.NoUSB = true
|
conf.Stack.NoUSB = true
|
||||||
|
conf.Stack.Logger = log.New("node.id", config.ID.String())
|
||||||
|
|
||||||
node := &DockerNode{
|
node := &DockerNode{
|
||||||
ExecNode: ExecNode{
|
ExecNode: ExecNode{
|
||||||
|
@ -359,6 +359,7 @@ func execP2PNode() {
|
|||||||
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
|
log.Crit("error decoding _P2P_NODE_CONFIG", "err", err)
|
||||||
}
|
}
|
||||||
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
|
conf.Stack.P2P.PrivateKey = conf.Node.PrivateKey
|
||||||
|
conf.Stack.Logger = log.New("node.id", conf.Node.ID.String())
|
||||||
|
|
||||||
// use explicit IP address in ListenAddr so that Enode URL is usable
|
// use explicit IP address in ListenAddr so that Enode URL is usable
|
||||||
externalIP := func() string {
|
externalIP := func() string {
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/node"
|
"github.com/ethereum/go-ethereum/node"
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||||
@ -82,7 +83,8 @@ func (s *SimAdapter) NewNode(config *NodeConfig) (Node, error) {
|
|||||||
Dialer: s,
|
Dialer: s,
|
||||||
EnableMsgEvents: true,
|
EnableMsgEvents: true,
|
||||||
},
|
},
|
||||||
NoUSB: true,
|
NoUSB: true,
|
||||||
|
Logger: log.New("node.id", id.String()),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -83,6 +83,9 @@ type NodeConfig struct {
|
|||||||
// stack to encrypt communications
|
// stack to encrypt communications
|
||||||
PrivateKey *ecdsa.PrivateKey
|
PrivateKey *ecdsa.PrivateKey
|
||||||
|
|
||||||
|
// Enable peer events for Msgs
|
||||||
|
EnableMsgEvents bool
|
||||||
|
|
||||||
// Name is a human friendly name for the node like "node01"
|
// Name is a human friendly name for the node like "node01"
|
||||||
Name string
|
Name string
|
||||||
|
|
||||||
@ -91,6 +94,9 @@ type NodeConfig struct {
|
|||||||
// contained in SimAdapter.services, for other nodes it should be
|
// contained in SimAdapter.services, for other nodes it should be
|
||||||
// services registered by calling the RegisterService function)
|
// services registered by calling the RegisterService function)
|
||||||
Services []string
|
Services []string
|
||||||
|
|
||||||
|
// function to sanction or prevent suggesting a peer
|
||||||
|
Reachable func(id discover.NodeID) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
|
// nodeConfigJSON is used to encode and decode NodeConfig as JSON by encoding
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
@ -30,6 +31,8 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
|
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var dialBanTimeout = 200 * time.Millisecond
|
||||||
|
|
||||||
// NetworkConfig defines configuration options for starting a Network
|
// NetworkConfig defines configuration options for starting a Network
|
||||||
type NetworkConfig struct {
|
type NetworkConfig struct {
|
||||||
ID string `json:"id"`
|
ID string `json:"id"`
|
||||||
@ -95,6 +98,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error)
|
|||||||
conf.PrivateKey = c.PrivateKey
|
conf.PrivateKey = c.PrivateKey
|
||||||
}
|
}
|
||||||
id := conf.ID
|
id := conf.ID
|
||||||
|
if conf.Reachable == nil {
|
||||||
|
conf.Reachable = func(otherID discover.NodeID) bool {
|
||||||
|
_, err := self.InitConn(conf.ID, otherID)
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// assign a name to the node if not set
|
// assign a name to the node if not set
|
||||||
if conf.Name == "" {
|
if conf.Name == "" {
|
||||||
@ -271,16 +280,10 @@ func (self *Network) Stop(id discover.NodeID) error {
|
|||||||
// method on the "one" node so that it connects to the "other" node
|
// method on the "one" node so that it connects to the "other" node
|
||||||
func (self *Network) Connect(oneID, otherID discover.NodeID) error {
|
func (self *Network) Connect(oneID, otherID discover.NodeID) error {
|
||||||
log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
|
log.Debug(fmt.Sprintf("connecting %s to %s", oneID, otherID))
|
||||||
conn, err := self.GetOrCreateConn(oneID, otherID)
|
conn, err := self.InitConn(oneID, otherID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if conn.Up {
|
|
||||||
return fmt.Errorf("%v and %v already connected", oneID, otherID)
|
|
||||||
}
|
|
||||||
if err := conn.nodesUp(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
client, err := conn.one.Client()
|
client, err := conn.one.Client()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -324,14 +327,15 @@ func (self *Network) DidConnect(one, other discover.NodeID) error {
|
|||||||
// DidDisconnect tracks the fact that the "one" node disconnected from the
|
// DidDisconnect tracks the fact that the "one" node disconnected from the
|
||||||
// "other" node
|
// "other" node
|
||||||
func (self *Network) DidDisconnect(one, other discover.NodeID) error {
|
func (self *Network) DidDisconnect(one, other discover.NodeID) error {
|
||||||
conn, err := self.GetOrCreateConn(one, other)
|
conn := self.GetConn(one, other)
|
||||||
if err != nil {
|
if conn == nil {
|
||||||
return fmt.Errorf("connection between %v and %v does not exist", one, other)
|
return fmt.Errorf("connection between %v and %v does not exist", one, other)
|
||||||
}
|
}
|
||||||
if !conn.Up {
|
if !conn.Up {
|
||||||
return fmt.Errorf("%v and %v already disconnected", one, other)
|
return fmt.Errorf("%v and %v already disconnected", one, other)
|
||||||
}
|
}
|
||||||
conn.Up = false
|
conn.Up = false
|
||||||
|
conn.initiated = time.Now().Add(-dialBanTimeout)
|
||||||
self.events.Send(NewEvent(conn))
|
self.events.Send(NewEvent(conn))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -396,10 +400,13 @@ func (self *Network) getNodeByName(name string) *Node {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetNodes returns the existing nodes
|
// GetNodes returns the existing nodes
|
||||||
func (self *Network) GetNodes() []*Node {
|
func (self *Network) GetNodes() (nodes []*Node) {
|
||||||
self.lock.Lock()
|
self.lock.Lock()
|
||||||
defer self.lock.Unlock()
|
defer self.lock.Unlock()
|
||||||
return self.Nodes
|
for _, node := range self.Nodes {
|
||||||
|
nodes = append(nodes, node)
|
||||||
|
}
|
||||||
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetConn returns the connection which exists between "one" and "other"
|
// GetConn returns the connection which exists between "one" and "other"
|
||||||
@ -415,6 +422,10 @@ func (self *Network) GetConn(oneID, otherID discover.NodeID) *Conn {
|
|||||||
func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
|
func (self *Network) GetOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
|
||||||
self.lock.Lock()
|
self.lock.Lock()
|
||||||
defer self.lock.Unlock()
|
defer self.lock.Unlock()
|
||||||
|
return self.getOrCreateConn(oneID, otherID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *Network) getOrCreateConn(oneID, otherID discover.NodeID) (*Conn, error) {
|
||||||
if conn := self.getConn(oneID, otherID); conn != nil {
|
if conn := self.getConn(oneID, otherID); conn != nil {
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
@ -448,6 +459,38 @@ func (self *Network) getConn(oneID, otherID discover.NodeID) *Conn {
|
|||||||
return self.Conns[i]
|
return self.Conns[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InitConn(one, other) retrieves the connectiton model for the connection between
|
||||||
|
// peers one and other, or creates a new one if it does not exist
|
||||||
|
// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
|
||||||
|
// it checks if the connection is already up, and if the nodes are running
|
||||||
|
// NOTE:
|
||||||
|
// it also checks whether there has been recent attempt to connect the peers
|
||||||
|
// this is cheating as the simulation is used as an oracle and know about
|
||||||
|
// remote peers attempt to connect to a node which will then not initiate the connection
|
||||||
|
func (self *Network) InitConn(oneID, otherID discover.NodeID) (*Conn, error) {
|
||||||
|
self.lock.Lock()
|
||||||
|
defer self.lock.Unlock()
|
||||||
|
if oneID == otherID {
|
||||||
|
return nil, fmt.Errorf("refusing to connect to self %v", oneID)
|
||||||
|
}
|
||||||
|
conn, err := self.getOrCreateConn(oneID, otherID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if time.Now().Sub(conn.initiated) < dialBanTimeout {
|
||||||
|
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
|
||||||
|
}
|
||||||
|
if conn.Up {
|
||||||
|
return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
|
||||||
|
}
|
||||||
|
err = conn.nodesUp()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("nodes not up: %v", err)
|
||||||
|
}
|
||||||
|
conn.initiated = time.Now()
|
||||||
|
return conn, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Shutdown stops all nodes in the network and closes the quit channel
|
// Shutdown stops all nodes in the network and closes the quit channel
|
||||||
func (self *Network) Shutdown() {
|
func (self *Network) Shutdown() {
|
||||||
for _, node := range self.Nodes {
|
for _, node := range self.Nodes {
|
||||||
@ -516,6 +559,8 @@ type Conn struct {
|
|||||||
|
|
||||||
// Up tracks whether or not the connection is active
|
// Up tracks whether or not the connection is active
|
||||||
Up bool `json:"up"`
|
Up bool `json:"up"`
|
||||||
|
// Registers when the connection was grabbed to dial
|
||||||
|
initiated time.Time
|
||||||
|
|
||||||
one *Node
|
one *Node
|
||||||
other *Node
|
other *Node
|
||||||
|
Loading…
Reference in New Issue
Block a user