From 3a36604b646cadd8b7d6c4407dfa49b95259220d Mon Sep 17 00:00:00 2001 From: Matus Kysel Date: Fri, 6 Sep 2024 16:06:37 +0200 Subject: [PATCH] eth: initial implementation of peer blacklist --- eth/blacklist.go | 114 ++++++++++++++++++++++++++++++ eth/blacklist_test.go | 161 ++++++++++++++++++++++++++++++++++++++++++ eth/handler.go | 3 + 3 files changed, 278 insertions(+) create mode 100644 eth/blacklist.go create mode 100644 eth/blacklist_test.go diff --git a/eth/blacklist.go b/eth/blacklist.go new file mode 100644 index 000000000..c8b0bd5db --- /dev/null +++ b/eth/blacklist.go @@ -0,0 +1,114 @@ +package eth + +import ( + "container/heap" + "sync" + "time" +) + +// Implements the heap.Interface for *BlackListPeer based on LastSeen +type PeerHeap []*BlackListPeer + +func (h PeerHeap) Len() int { return len(h) } +func (h PeerHeap) Less(i, j int) bool { return h[i].LastSeen.Before(h[j].LastSeen) } +func (h PeerHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i]; h[i].index = i; h[j].index = j } + +func (h *PeerHeap) Push(x interface{}) { + n := len(*h) + item := x.(*BlackListPeer) + item.index = n + *h = append(*h, item) +} + +func (h *PeerHeap) Pop() interface{} { + old := *h + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *h = old[0 : n-1] + return item +} + +// Peer represents the state of a peer in the network. +type BlackListPeer struct { + ID string // Unique identifier for the peer + HeadBlock int64 // Current head block of the peer + LastSeen time.Time // Timestamp of the last head block update + BlacklistCount int // Counter for failed head block updates + index int // Index of the peer in the heap +} + +// blackList manages peers, both active and blacklisted. +type blackList struct { + mu sync.Mutex // To handle concurrent access + peers map[string]*BlackListPeer + peerHeap PeerHeap + maxPeers int + expiryTime time.Duration + blacklistedCount int +} + +// NewBlackList creates a new instance of blackList. +func NewBlackList(maxPeers int, expiryTime time.Duration, blacklistedCount int) *blackList { + bl := &blackList{ + peers: make(map[string]*BlackListPeer), + peerHeap: make(PeerHeap, 0, maxPeers), + maxPeers: maxPeers, + expiryTime: expiryTime, + blacklistedCount: blacklistedCount, + } + heap.Init(&bl.peerHeap) + return bl +} + +// AddOrUpdatePeer adds or updates a peer in the map, rejecting invalid IDs. +func (bl *blackList) AddOrUpdatePeer(id string, headBlock int64) { + if id == "" { + return // Reject empty ID + } + + bl.mu.Lock() + defer bl.mu.Unlock() + + peer, exists := bl.peers[id] + if exists { + if peer.HeadBlock != headBlock { + peer.HeadBlock = headBlock + peer.LastSeen = time.Now() + peer.BlacklistCount = 0 + heap.Fix(&bl.peerHeap, peer.index) + } + } else { + if len(bl.peers) >= bl.maxPeers { + oldest := heap.Pop(&bl.peerHeap).(*BlackListPeer) + delete(bl.peers, oldest.ID) // Corrected to use ID + } + newPeer := &BlackListPeer{ + ID: id, HeadBlock: headBlock, LastSeen: time.Now(), BlacklistCount: 0, + } + bl.peers[id] = newPeer + heap.Push(&bl.peerHeap, newPeer) + } +} + +// BlacklistStalePeers updates the blacklist count of stale peers. +func (bl *blackList) BlacklistStalePeers() { + bl.mu.Lock() + defer bl.mu.Unlock() + + now := time.Now() + for _, peer := range bl.peers { + if now.Sub(peer.LastSeen) > bl.expiryTime { + peer.BlacklistCount++ + } + } +} + +// IsBlacklisted checks if a peer is blacklisted. +func (bl *blackList) IsBlacklisted(id string) bool { + bl.mu.Lock() + defer bl.mu.Unlock() + + peer, exists := bl.peers[id] + return exists && peer.BlacklistCount >= bl.blacklistedCount +} diff --git a/eth/blacklist_test.go b/eth/blacklist_test.go new file mode 100644 index 000000000..c0ade4c3e --- /dev/null +++ b/eth/blacklist_test.go @@ -0,0 +1,161 @@ +package eth + +import ( + "sync" + "testing" + "time" +) + +// TestAddOrUpdatePeer tests adding new peers and updating existing ones. +func TestAddOrUpdatePeer(t *testing.T) { + bl := NewBlackList(2, 10*time.Minute, 3) + bl.AddOrUpdatePeer("peer1", 100) + if len(bl.peers) != 1 { + t.Errorf("Expected 1 peer, got %d", len(bl.peers)) + } + + // Test updating the same peer + bl.AddOrUpdatePeer("peer1", 101) + if bl.peers["peer1"].HeadBlock != 101 { + t.Errorf("Expected head block 101, got %d", bl.peers["peer1"].HeadBlock) + } + + // Test adding another peer and triggering the maxPeers limit + bl.AddOrUpdatePeer("peer2", 102) + bl.AddOrUpdatePeer("peer3", 103) // This should remove the oldest (peer1) + if len(bl.peers) != 2 { + t.Errorf("Expected 2 peers, got %d", len(bl.peers)) + } + if _, exists := bl.peers["peer1"]; exists { + t.Errorf("Expected peer1 to be removed") + } +} + +// TestBlacklistStalePeers tests the automatic blacklisting of stale peers. +func TestBlacklistStalePeers(t *testing.T) { + bl := NewBlackList(2, 1*time.Minute, 1) + bl.AddOrUpdatePeer("peer1", 100) + time.Sleep(2 * time.Minute) // simulate time passing + bl.BlacklistStalePeers() + + if bl.peers["peer1"].BlacklistCount != 1 { + t.Errorf("Expected BlacklistCount of 1, got %d", bl.peers["peer1"].BlacklistCount) + } +} + +// TestIsBlacklisted tests checking if a peer is blacklisted. +func TestIsBlacklisted(t *testing.T) { + bl := NewBlackList(2, 1*time.Minute, 1) + bl.AddOrUpdatePeer("peer1", 100) + bl.peers["peer1"].LastSeen = time.Now().Add(-2 * time.Minute) // make peer stale + bl.BlacklistStalePeers() + + if !bl.IsBlacklisted("peer1") { + t.Errorf("Expected peer1 to be blacklisted") + } +} + +// TestEdgeCases tests handling of edge cases such as invalid IDs. +func TestEdgeCases(t *testing.T) { + bl := NewBlackList(2, 1*time.Minute, 1) + bl.AddOrUpdatePeer("", 100) // testing with empty ID + if len(bl.peers) != 0 { + t.Errorf("Expected 0 peers, got %d", len(bl.peers)) + } +} + +// TestAddOrUpdatePeer_MaxPeers tests behavior when adding peers up to and beyond the maximum limit. +func TestAddOrUpdatePeer_MaxPeers(t *testing.T) { + bl := NewBlackList(3, 10*time.Minute, 3) + + bl.AddOrUpdatePeer("peer1", 100) + bl.AddOrUpdatePeer("peer2", 101) + bl.AddOrUpdatePeer("peer3", 102) + bl.AddOrUpdatePeer("peer4", 103) // This should remove peer1 + + if len(bl.peers) != 3 { + t.Errorf("Expected 3 peers, got %d", len(bl.peers)) + } + + if _, exists := bl.peers["peer1"]; exists { + t.Errorf("Expected peer1 to be removed") + } +} + +// TestBlacklistCountOverflow checks how the system handles when a peer's BlacklistCount exceeds the threshold. +func TestBlacklistCountOverflow(t *testing.T) { + bl := NewBlackList(2, 1*time.Second, 2) + bl.AddOrUpdatePeer("peer1", 100) + + // Simulate multiple blacklist increments + time.Sleep(2 * time.Second) + bl.BlacklistStalePeers() + bl.BlacklistStalePeers() + + if !bl.IsBlacklisted("peer1") { + t.Errorf("Expected peer1 to be blacklisted after exceeding blacklist count") + } +} + +// TestExpiryTimeBoundary tests behavior when a peer's LastSeen is exactly at the expiry boundary. +func TestExpiryTimeBoundary(t *testing.T) { + bl := NewBlackList(2, 1*time.Second, 2) + bl.AddOrUpdatePeer("peer1", 100) + time.Sleep(1 * time.Second) + + bl.BlacklistStalePeers() // Should increment blacklist count but not blacklisted yet + + if bl.peers["peer1"].BlacklistCount != 1 { + t.Errorf("Expected BlacklistCount of 1, got %d", bl.peers["peer1"].BlacklistCount) + } + + if bl.IsBlacklisted("peer1") { + t.Errorf("Peer1 should not be blacklisted yet") + } +} + +// TestConcurrentAccess tests concurrent access to the blackList. +func TestConcurrentAccess(t *testing.T) { + bl := NewBlackList(100, 10*time.Minute, 3) + var wg sync.WaitGroup + + for i := 0; i < 50; i++ { + wg.Add(1) + go func(id string) { + defer wg.Done() + bl.AddOrUpdatePeer(id, int64(i)) + }(string(rune('a' + i))) + } + + wg.Wait() + + if len(bl.peers) != 50 { + t.Errorf("Expected 50 peers, got %d", len(bl.peers)) + } +} + +// TestReaddingBlacklistedPeer tests the behavior when a blacklisted peer is re-added. +func TestReaddingBlacklistedPeer(t *testing.T) { + bl := NewBlackList(2, 1*time.Second, 1) + bl.AddOrUpdatePeer("peer1", 100) + + // Simulate time passing to trigger blacklist + time.Sleep(2 * time.Second) + bl.BlacklistStalePeers() + + // Ensure peer1 is blacklisted + if !bl.IsBlacklisted("peer1") { + t.Errorf("Expected peer1 to be blacklisted") + } + + // Re-add the same peer + bl.AddOrUpdatePeer("peer1", 101) + + if bl.peers["peer1"].BlacklistCount != 0 { + t.Errorf("Expected BlacklistCount to be reset, got %d", bl.peers["peer1"].BlacklistCount) + } + + if bl.IsBlacklisted("peer1") { + t.Errorf("Peer1 should not be blacklisted after re-adding with updated information") + } +} diff --git a/eth/handler.go b/eth/handler.go index cc3bf382b..3bb56ef46 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -172,6 +172,8 @@ type handler struct { handlerStartCh chan struct{} handlerDoneCh chan struct{} + + blackList *blackList } // newHandler returns a handler for all Ethereum chain management protocol. @@ -702,6 +704,7 @@ func (h *handler) unregisterPeer(id string) { func (h *handler) Start(maxPeers int, maxPeersPerIP int) { h.maxPeers = maxPeers h.maxPeersPerIP = maxPeersPerIP + h.blackList = NewBlackList(maxPeers, 1*time.Hour, 3) // broadcast and announce transactions (only new ones, not resurrected ones) h.wg.Add(1) h.txsCh = make(chan core.NewTxsEvent, txChanSize)