Merge pull request #16333 from shazow/addremovetrustedpeer

rpc: Add admin_addTrustedPeer and admin_removeTrustedPeer.
This commit is contained in:
Felföldi Zsolt 2018-08-06 13:30:04 +02:00 committed by GitHub
commit c4df67461f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 208 additions and 9 deletions

@ -152,6 +152,16 @@ web3._extend({
call: 'admin_removePeer', call: 'admin_removePeer',
params: 1 params: 1
}), }),
new web3._extend.Method({
name: 'addTrustedPeer',
call: 'admin_addTrustedPeer',
params: 1
}),
new web3._extend.Method({
name: 'removeTrustedPeer',
call: 'admin_removeTrustedPeer',
params: 1
}),
new web3._extend.Method({ new web3._extend.Method({
name: 'exportChain', name: 'exportChain',
call: 'admin_exportChain', call: 'admin_exportChain',

@ -59,7 +59,7 @@ func (api *PrivateAdminAPI) AddPeer(url string) (bool, error) {
return true, nil return true, nil
} }
// RemovePeer disconnects from a a remote node if the connection exists // RemovePeer disconnects from a remote node if the connection exists
func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) { func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise // Make sure the server is running, fail otherwise
server := api.node.Server() server := api.node.Server()
@ -75,6 +75,37 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
return true, nil return true, nil
} }
// AddTrustedPeer allows a remote node to always connect, even if slots are full
func (api *PrivateAdminAPI) AddTrustedPeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
if server == nil {
return false, ErrNodeStopped
}
node, err := discover.ParseNode(url)
if err != nil {
return false, fmt.Errorf("invalid enode: %v", err)
}
server.AddTrustedPeer(node)
return true, nil
}
// RemoveTrustedPeer removes a remote node from the trusted peer set, but it
// does not disconnect it automatically.
func (api *PrivateAdminAPI) RemoveTrustedPeer(url string) (bool, error) {
// Make sure the server is running, fail otherwise
server := api.node.Server()
if server == nil {
return false, ErrNodeStopped
}
node, err := discover.ParseNode(url)
if err != nil {
return false, fmt.Errorf("invalid enode: %v", err)
}
server.RemoveTrustedPeer(node)
return true, nil
}
// PeerEvents creates an RPC subscription which receives peer events from the // PeerEvents creates an RPC subscription which receives peer events from the
// node's p2p.Server // node's p2p.Server
func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) { func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {

@ -165,7 +165,7 @@ func (p *Peer) String() string {
// Inbound returns true if the peer is an inbound connection // Inbound returns true if the peer is an inbound connection
func (p *Peer) Inbound() bool { func (p *Peer) Inbound() bool {
return p.rw.flags&inboundConn != 0 return p.rw.is(inboundConn)
} }
func newPeer(conn *conn, protocols []Protocol) *Peer { func newPeer(conn *conn, protocols []Protocol) *Peer {

@ -23,6 +23,7 @@ import (
"fmt" "fmt"
"net" "net"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -169,6 +170,8 @@ type Server struct {
quit chan struct{} quit chan struct{}
addstatic chan *discover.Node addstatic chan *discover.Node
removestatic chan *discover.Node removestatic chan *discover.Node
addtrusted chan *discover.Node
removetrusted chan *discover.Node
posthandshake chan *conn posthandshake chan *conn
addpeer chan *conn addpeer chan *conn
delpeer chan peerDrop delpeer chan peerDrop
@ -185,7 +188,7 @@ type peerDrop struct {
requested bool // true if signaled by the peer requested bool // true if signaled by the peer
} }
type connFlag int type connFlag int32
const ( const (
dynDialedConn connFlag = 1 << iota dynDialedConn connFlag = 1 << iota
@ -250,7 +253,18 @@ func (f connFlag) String() string {
} }
func (c *conn) is(f connFlag) bool { func (c *conn) is(f connFlag) bool {
return c.flags&f != 0 flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
return flags&f != 0
}
func (c *conn) set(f connFlag, val bool) {
flags := connFlag(atomic.LoadInt32((*int32)(&c.flags)))
if val {
flags |= f
} else {
flags &= ^f
}
atomic.StoreInt32((*int32)(&c.flags), int32(flags))
} }
// Peers returns all connected peers. // Peers returns all connected peers.
@ -300,6 +314,23 @@ func (srv *Server) RemovePeer(node *discover.Node) {
} }
} }
// AddTrustedPeer adds the given node to a reserved whitelist which allows the
// node to always connect, even if the slot are full.
func (srv *Server) AddTrustedPeer(node *discover.Node) {
select {
case srv.addtrusted <- node:
case <-srv.quit:
}
}
// RemoveTrustedPeer removes the given node from the trusted peer set.
func (srv *Server) RemoveTrustedPeer(node *discover.Node) {
select {
case srv.removetrusted <- node:
case <-srv.quit:
}
}
// SubscribePeers subscribes the given channel to peer events // SubscribePeers subscribes the given channel to peer events
func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription { func (srv *Server) SubscribeEvents(ch chan *PeerEvent) event.Subscription {
return srv.peerFeed.Subscribe(ch) return srv.peerFeed.Subscribe(ch)
@ -411,6 +442,8 @@ func (srv *Server) Start() (err error) {
srv.posthandshake = make(chan *conn) srv.posthandshake = make(chan *conn)
srv.addstatic = make(chan *discover.Node) srv.addstatic = make(chan *discover.Node)
srv.removestatic = make(chan *discover.Node) srv.removestatic = make(chan *discover.Node)
srv.addtrusted = make(chan *discover.Node)
srv.removetrusted = make(chan *discover.Node)
srv.peerOp = make(chan peerOpFunc) srv.peerOp = make(chan peerOpFunc)
srv.peerOpDone = make(chan struct{}) srv.peerOpDone = make(chan struct{})
@ -547,8 +580,7 @@ func (srv *Server) run(dialstate dialer) {
queuedTasks []task // tasks that can't run yet queuedTasks []task // tasks that can't run yet
) )
// Put trusted nodes into a map to speed up checks. // Put trusted nodes into a map to speed up checks.
// Trusted peers are loaded on startup and cannot be // Trusted peers are loaded on startup or added via AddTrustedPeer RPC.
// modified while the server is running.
for _, n := range srv.TrustedNodes { for _, n := range srv.TrustedNodes {
trusted[n.ID] = true trusted[n.ID] = true
} }
@ -600,12 +632,32 @@ running:
case n := <-srv.removestatic: case n := <-srv.removestatic:
// This channel is used by RemovePeer to send a // This channel is used by RemovePeer to send a
// disconnect request to a peer and begin the // disconnect request to a peer and begin the
// stop keeping the node connected // stop keeping the node connected.
srv.log.Trace("Removing static node", "node", n) srv.log.Trace("Removing static node", "node", n)
dialstate.removeStatic(n) dialstate.removeStatic(n)
if p, ok := peers[n.ID]; ok { if p, ok := peers[n.ID]; ok {
p.Disconnect(DiscRequested) p.Disconnect(DiscRequested)
} }
case n := <-srv.addtrusted:
// This channel is used by AddTrustedPeer to add an enode
// to the trusted node set.
srv.log.Trace("Adding trusted node", "node", n)
trusted[n.ID] = true
// Mark any already-connected peer as trusted
if p, ok := peers[n.ID]; ok {
p.rw.set(trustedConn, true)
}
case n := <-srv.removetrusted:
// This channel is used by RemoveTrustedPeer to remove an enode
// from the trusted node set.
srv.log.Trace("Removing trusted node", "node", n)
if _, ok := trusted[n.ID]; ok {
delete(trusted, n.ID)
}
// Unmark any already-connected peer as trusted
if p, ok := peers[n.ID]; ok {
p.rw.set(trustedConn, false)
}
case op := <-srv.peerOp: case op := <-srv.peerOp:
// This channel is used by Peers and PeerCount. // This channel is used by Peers and PeerCount.
op(peers) op(peers)

@ -148,7 +148,8 @@ func TestServerDial(t *testing.T) {
// tell the server to connect // tell the server to connect
tcpAddr := listener.Addr().(*net.TCPAddr) tcpAddr := listener.Addr().(*net.TCPAddr)
srv.AddPeer(&discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}) node := &discover.Node{ID: remid, IP: tcpAddr.IP, TCP: uint16(tcpAddr.Port)}
srv.AddPeer(node)
select { select {
case conn := <-accepted: case conn := <-accepted:
@ -170,6 +171,29 @@ func TestServerDial(t *testing.T) {
if !reflect.DeepEqual(peers, []*Peer{peer}) { if !reflect.DeepEqual(peers, []*Peer{peer}) {
t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer}) t.Errorf("Peers mismatch: got %v, want %v", peers, []*Peer{peer})
} }
// Test AddTrustedPeer/RemoveTrustedPeer and changing Trusted flags
// Particularly for race conditions on changing the flag state.
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted prematurely: %v", peer)
}
done := make(chan bool)
go func() {
srv.AddTrustedPeer(node)
if peer := srv.Peers()[0]; !peer.Info().Network.Trusted {
t.Errorf("peer is not trusted after AddTrustedPeer: %v", peer)
}
srv.RemoveTrustedPeer(node)
if peer := srv.Peers()[0]; peer.Info().Network.Trusted {
t.Errorf("peer is trusted after RemoveTrustedPeer: %v", peer)
}
done <- true
}()
// Trigger potential race conditions
peer = srv.Peers()[0]
_ = peer.Inbound()
_ = peer.Info()
<-done
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
t.Error("server did not launch peer within one second") t.Error("server did not launch peer within one second")
} }
@ -351,7 +375,8 @@ func TestServerAtCap(t *testing.T) {
} }
} }
// Try inserting a non-trusted connection. // Try inserting a non-trusted connection.
c := newconn(randomID()) anotherID := randomID()
c := newconn(anotherID)
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers { if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err) t.Error("wrong error for insert:", err)
} }
@ -364,6 +389,87 @@ func TestServerAtCap(t *testing.T) {
t.Error("Server did not set trusted flag") t.Error("Server did not set trusted flag")
} }
// Remove from trusted set and try again
srv.RemoveTrustedPeer(&discover.Node{ID: trustedID})
c = newconn(trustedID)
if err := srv.checkpoint(c, srv.posthandshake); err != DiscTooManyPeers {
t.Error("wrong error for insert:", err)
}
// Add anotherID to trusted set and try again
srv.AddTrustedPeer(&discover.Node{ID: anotherID})
c = newconn(anotherID)
if err := srv.checkpoint(c, srv.posthandshake); err != nil {
t.Error("unexpected error for trusted conn @posthandshake:", err)
}
if !c.is(trustedConn) {
t.Error("Server did not set trusted flag")
}
}
func TestServerPeerLimits(t *testing.T) {
srvkey := newkey()
clientid := randomID()
clientnode := &discover.Node{ID: clientid}
var tp *setupTransport = &setupTransport{
id: clientid,
phs: &protoHandshake{
ID: clientid,
// Force "DiscUselessPeer" due to unmatching caps
// Caps: []Cap{discard.cap()},
},
}
var flags connFlag = dynDialedConn
var dialDest *discover.Node = &discover.Node{ID: clientid}
srv := &Server{
Config: Config{
PrivateKey: srvkey,
MaxPeers: 0,
NoDial: true,
Protocols: []Protocol{discard},
},
newTransport: func(fd net.Conn) transport { return tp },
log: log.New(),
}
if err := srv.Start(); err != nil {
t.Fatalf("couldn't start server: %v", err)
}
defer srv.Stop()
// Check that server is full (MaxPeers=0)
conn, _ := net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr != DiscTooManyPeers {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()
srv.AddTrustedPeer(clientnode)
// Check that server allows a trusted peer despite being full.
conn, _ = net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr == DiscTooManyPeers {
t.Errorf("failed to bypass MaxPeers with trusted node: %q", tp.closeErr)
}
if tp.closeErr != DiscUselessPeer {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()
srv.RemoveTrustedPeer(clientnode)
// Check that server is full again.
conn, _ = net.Pipe()
srv.SetupConn(conn, flags, dialDest)
if tp.closeErr != DiscTooManyPeers {
t.Errorf("unexpected close error: %q", tp.closeErr)
}
conn.Close()
} }
func TestServerSetupConn(t *testing.T) { func TestServerSetupConn(t *testing.T) {