P2P: try to limit the connection number per IP address (#1623)

** by default, MaxPeersPerIp is same as MaxPeers
 ** no restriction on TrustedNode
 ** add test case: TestOptionMaxPeersPerIp
This commit is contained in:
lx 2023-05-25 08:57:02 +08:00 committed by GitHub
parent 11d16dff3c
commit eaea77a21c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 204 additions and 13 deletions

@ -132,6 +132,7 @@ var (
utils.PruneAncientDataFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPeersPerIPFlag,
utils.MaxPendingPeersFlag,
utils.MiningEnabledFlag,
utils.MinerThreadsFlag,

@ -176,6 +176,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.DNSDiscoveryFlag,
utils.ListenPortFlag,
utils.MaxPeersFlag,
utils.MaxPeersPerIPFlag,
utils.MaxPendingPeersFlag,
utils.NATFlag,
utils.NoDiscoverFlag,

@ -702,6 +702,13 @@ var (
Usage: "Maximum number of network peers (network disabled if set to 0)",
Value: node.DefaultConfig.P2P.MaxPeers,
}
MaxPeersPerIPFlag = cli.IntFlag{
Name: "maxpeersperip",
Usage: "Maximum number of network peers from a single IP address, (default used if set to <= 0, which is same as MaxPeers)",
Value: node.DefaultConfig.P2P.MaxPeersPerIP,
}
MaxPendingPeersFlag = cli.IntFlag{
Name: "maxpendpeers",
Usage: "Maximum number of pending connection attempts (defaults used if set to 0)",
@ -1282,6 +1289,15 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
cfg.MaxPeers = lightPeers
}
}
// if max peers per ip is not set, use max peers
if cfg.MaxPeersPerIP <= 0 {
cfg.MaxPeersPerIP = cfg.MaxPeers
}
// flag like: `--maxpeersperip 10` could override the setting in config.toml
if ctx.GlobalIsSet(MaxPeersPerIPFlag.Name) {
cfg.MaxPeersPerIP = ctx.GlobalInt(MaxPeersPerIPFlag.Name)
}
if !(lightClient || lightServer) {
lightPeers = 0
}

@ -656,7 +656,7 @@ func (s *Ethereum) Start() error {
maxPeers -= s.config.LightPeers
}
// Start the networking layer and the light server if requested
s.handler.Start(maxPeers)
s.handler.Start(maxPeers, s.p2pServer.MaxPeersPerIP)
return nil
}

@ -20,6 +20,7 @@ import (
"errors"
"math"
"math/big"
"strings"
"sync"
"sync/atomic"
"time"
@ -140,6 +141,9 @@ type handler struct {
maliciousVoteMonitor *monitor.MaliciousVoteMonitor
chain *core.BlockChain
maxPeers int
maxPeersPerIP int
peersPerIP map[string]int
peerPerIPLock sync.Mutex
downloader *downloader.Downloader
blockFetcher *fetcher.BlockFetcher
@ -186,6 +190,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
chain: config.Chain,
peers: config.PeerSet,
merger: config.Merger,
peersPerIP: make(map[string]int),
whitelist: config.Whitelist,
directBroadcast: config.DirectBroadcast,
diffSync: config.DiffSync,
@ -387,11 +392,30 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
}
}
// Ignore maxPeers if this is a trusted peer
if !peer.Peer.Info().Network.Trusted {
peerInfo := peer.Peer.Info()
if !peerInfo.Network.Trusted {
if reject || h.peers.len() >= h.maxPeers {
return p2p.DiscTooManyPeers
}
}
remoteAddr := peerInfo.Network.RemoteAddress
indexIP := strings.LastIndex(remoteAddr, ":")
if indexIP == -1 {
// there could be no IP address, such as a pipe
peer.Log().Debug("runEthPeer", "no ip address, remoteAddress", remoteAddr)
} else if !peerInfo.Network.Trusted {
remoteIP := remoteAddr[:indexIP]
h.peerPerIPLock.Lock()
if num, ok := h.peersPerIP[remoteIP]; ok && num >= h.maxPeersPerIP {
h.peerPerIPLock.Unlock()
peer.Log().Info("The IP has too many peers", "ip", remoteIP, "maxPeersPerIP", h.maxPeersPerIP,
"name", peerInfo.Name, "Enode", peerInfo.Enode)
return p2p.DiscTooManyPeers
}
h.peersPerIP[remoteIP] = h.peersPerIP[remoteIP] + 1
h.peerPerIPLock.Unlock()
}
peer.Log().Debug("Ethereum peer connected", "name", peer.Name())
// Register the peer locally
@ -626,11 +650,32 @@ func (h *handler) unregisterPeer(id string) {
if err := h.peers.unregisterPeer(id); err != nil {
logger.Error("Ethereum peer removal failed", "err", err)
}
peerInfo := peer.Peer.Info()
remoteAddr := peerInfo.Network.RemoteAddress
indexIP := strings.LastIndex(remoteAddr, ":")
if indexIP == -1 {
// there could be no IP address, such as a pipe
peer.Log().Debug("unregisterPeer", "name", peerInfo.Name, "no ip address, remoteAddress", remoteAddr)
} else if !peerInfo.Network.Trusted {
remoteIP := remoteAddr[:indexIP]
h.peerPerIPLock.Lock()
if h.peersPerIP[remoteIP] <= 0 {
peer.Log().Error("unregisterPeer without record", "name", peerInfo.Name, "remoteAddress", remoteAddr)
} else {
h.peersPerIP[remoteIP] = h.peersPerIP[remoteIP] - 1
logger.Debug("unregisterPeer", "name", peerInfo.Name, "connectNum", h.peersPerIP[remoteIP])
if h.peersPerIP[remoteIP] == 0 {
delete(h.peersPerIP, remoteIP)
}
}
h.peerPerIPLock.Unlock()
}
}
func (h *handler) Start(maxPeers int) {
func (h *handler) Start(maxPeers int, maxPeersPerIP int) {
h.maxPeers = maxPeers
h.maxPeersPerIP = maxPeersPerIP
// broadcast transactions
h.wg.Add(1)
h.txsCh = make(chan core.NewTxsEvent, txChanSize)

@ -96,7 +96,7 @@ func newTestBackendWithGenerator(blocks int) *testBackend {
BloomCache: 1,
Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()),
})
handler.Start(100)
handler.Start(100, 100)
txconfig := core.DefaultTxPoolConfig
txconfig.Journal = "" // Don't litter the disk with test journals

@ -20,6 +20,8 @@ import (
"fmt"
"math/big"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
@ -148,8 +150,8 @@ func testForkIDSplit(t *testing.T, protocol uint) {
BloomCache: 1,
})
)
ethNoFork.Start(1000)
ethProFork.Start(1000)
ethNoFork.Start(1000, 1000)
ethProFork.Start(1000, 1000)
// Clean up everything after ourselves
defer chainNoFork.Stop()
@ -928,3 +930,109 @@ func testBroadcastMalformedBlock(t *testing.T, protocol uint) {
}
}
}
func TestOptionMaxPeersPerIP(t *testing.T) {
t.Parallel()
handler := newTestHandler()
defer handler.close()
var (
genesis = handler.chain.Genesis()
head = handler.chain.CurrentBlock()
td = handler.chain.GetTd(head.Hash(), head.NumberU64())
wg = sync.WaitGroup{}
maxPeersPerIP = handler.handler.maxPeersPerIP
uniPort = 1000
)
tryFunc := func(tryNum int, ip1 string, ip2 string, trust bool, doneCh chan struct{}) {
// Create a source peer to send messages through and a sink handler to receive them
p2pSrc, p2pSink := p2p.MsgPipe()
defer p2pSrc.Close()
defer p2pSink.Close()
peer1 := p2p.NewPeerPipe(enode.ID{0}, "", nil, p2pSrc)
peer1.UpdateTestRemoteAddr(ip1 + strconv.Itoa(uniPort))
peer2 := p2p.NewPeerPipe(enode.ID{byte(uniPort)}, "", nil, p2pSink)
peer2.UpdateTestRemoteAddr(ip2 + strconv.Itoa(uniPort))
if trust {
peer2.UpdateTrustFlagTest()
}
uniPort++
src := eth.NewPeer(eth.ETH66, peer1, p2pSrc, handler.txpool)
sink := eth.NewPeer(eth.ETH66, peer2, p2pSink, handler.txpool)
defer src.Close()
defer sink.Close()
wg.Add(1)
go func(num int) {
err := handler.handler.runEthPeer(sink, func(peer *eth.Peer) error {
wg.Done()
<-doneCh
return nil
})
// err is nil, connection ok and it is closed by the doneCh
if err == nil {
if trust || num <= maxPeersPerIP {
return
}
// if num > maxPeersPerIP and not trust, should report: p2p.DiscTooManyPeers
t.Errorf("current num is %d, maxPeersPerIP is %d, should failed", num, maxPeersPerIP)
return
}
wg.Done()
if trust {
t.Errorf("trust node should not failed, num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err)
}
// err should be p2p.DiscTooManyPeers and num > maxPeersPerIP
if err == p2p.DiscTooManyPeers && num > maxPeersPerIP {
return
}
t.Errorf("current num is %d, maxPeersPerIP is %d, but failed:%s", num, maxPeersPerIP, err)
}(tryNum)
if err := src.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain), nil); err != nil {
t.Fatalf("failed to run protocol handshake")
}
// make sure runEthPeer execute one by one.
wg.Wait()
}
// case 1: normal case
doneCh1 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "1.2.3.11:", "1.2.3.22:", false, doneCh1)
}
close(doneCh1)
// case 2: once the previous connection was unregisterred, new connections with same IP can be accepted.
doneCh2 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "1.2.3.11:", "1.2.3.22:", false, doneCh2)
}
close(doneCh2)
// case 3: ipv6 address, like: [2001:db8::1]:80
doneCh3 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", false, doneCh3)
}
close(doneCh3)
// case 4: same as case 2, but for ipv6
doneCh4 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", false, doneCh4)
}
close(doneCh4)
// case 5: test trust node
doneCh5 := make(chan struct{})
for tryNum := 1; tryNum <= maxPeersPerIP+2; tryNum++ {
tryFunc(tryNum, "[2001:db8::11]:", "[2001:db8::22]:", true, doneCh5)
}
close(doneCh5)
}

@ -177,7 +177,7 @@ func newTestHandlerWithBlocks(blocks int) *testHandler {
Sync: downloader.SnapSync,
BloomCache: 1,
})
handler.Start(1000)
handler.Start(1000, 3)
return &testHandler{
db: db,

@ -47,9 +47,10 @@ var DefaultConfig = Config{
WSModules: []string{"net", "web3"},
GraphQLVirtualHosts: []string{"localhost"},
P2P: p2p.Config{
ListenAddr: ":30303",
MaxPeers: 50,
NAT: nat.Any(),
ListenAddr: ":30303",
MaxPeers: 50,
MaxPeersPerIP: 0, // by default, it will be same as MaxPeers
NAT: nat.Any(),
},
}

@ -116,8 +116,9 @@ type Peer struct {
disc chan DiscReason
// events receives message send / receive events if set
events *event.Feed
testPipe *MsgPipeRW // for testing
events *event.Feed
testPipe *MsgPipeRW // for testing
testRemoteAddr string // for testing
}
// NewPeer returns a peer for testing purposes.
@ -203,9 +204,23 @@ func (p *Peer) RunningCap(protocol string, versions []uint) bool {
// RemoteAddr returns the remote address of the network connection.
func (p *Peer) RemoteAddr() net.Addr {
if len(p.testRemoteAddr) > 0 {
if addr, err := net.ResolveTCPAddr("tcp", p.testRemoteAddr); err == nil {
return addr
}
log.Warn("RemoteAddr", "invalid testRemoteAddr", p.testRemoteAddr)
}
return p.rw.fd.RemoteAddr()
}
func (p *Peer) UpdateTestRemoteAddr(addr string) { // test purpose only
p.testRemoteAddr = addr
}
func (p *Peer) UpdateTrustFlagTest() { // test purpose only
p.rw.set(trustedConn, true)
}
// LocalAddr returns the local address of the network connection.
func (p *Peer) LocalAddr() net.Addr {
return p.rw.fd.LocalAddr()

@ -81,6 +81,10 @@ type Config struct {
// connected. It must be greater than zero.
MaxPeers int
// MaxPeersPerIP is the maximum number of peers that can be
// connected from a single IP. It must be greater than zero.
MaxPeersPerIP int `toml:",omitempty"`
// MaxPendingPeers is the maximum number of peers that can be pending in the
// handshake phase, counted separately for inbound and outbound connections.
// Zero defaults to preset values.