Black listing of bad peers

This commit is contained in:
obscuren 2014-10-02 17:03:48 +02:00
parent b55e017e62
commit a75c92000f
3 changed files with 123 additions and 30 deletions

@ -11,6 +11,7 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
)
var poollogger = ethlog.NewLogger("BPOOL")
@ -38,6 +39,8 @@ type BlockPool struct {
downloadStartedAt time.Time
ChainLength, BlocksProcessed int
peer *Peer
}
func NewBlockPool(eth *Ethereum) *BlockPool {
@ -74,6 +77,27 @@ func (self *BlockPool) Blocks() (blocks ethchain.Blocks) {
return
}
func (self *BlockPool) FetchHashes(peer *Peer) {
highestTd := self.eth.HighestTDPeer()
if (self.peer == nil && peer.td.Cmp(highestTd) >= 0) || (self.peer != nil && peer.td.Cmp(self.peer.td) >= 0) || self.peer == peer {
if self.peer != peer {
poollogger.Debugf("Found better suitable peer (%v vs %v)\n", self.td, peer.td)
}
self.peer = peer
self.td = peer.td
if !self.HasLatestHash() {
peer.doneFetchingHashes = false
const amount = 256
peerlogger.Debugf("Fetching hashes (%d)\n", amount)
peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{peer.lastReceivedHash, uint32(amount)}))
}
}
}
func (self *BlockPool) AddHash(hash []byte, peer *Peer) {
self.mut.Lock()
defer self.mut.Unlock()
@ -99,7 +123,7 @@ func (self *BlockPool) Add(b *ethchain.Block, peer *Peer) {
if !self.eth.BlockChain().HasBlock(b.PrevHash) && self.pool[string(b.PrevHash)] == nil && !self.fetchingHashes {
poollogger.Infof("Unknown block, requesting parent (%x...)\n", b.PrevHash[0:4])
//peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)}))
peer.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{b.Hash(), uint32(256)}))
}
} else if self.pool[hash] != nil {
self.pool[hash].block = b
@ -227,7 +251,7 @@ out:
}
func (self *BlockPool) chainThread() {
procTimer := time.NewTicker(500 * time.Millisecond)
procTimer := time.NewTicker(1000 * time.Millisecond)
out:
for {
select {
@ -237,6 +261,14 @@ out:
blocks := self.Blocks()
ethchain.BlockBy(ethchain.Number).Sort(blocks)
// Find common block
for i, block := range blocks {
if self.eth.BlockChain().HasBlock(block.PrevHash) {
blocks = blocks[i:]
break
}
}
if len(blocks) > 0 {
if self.eth.BlockChain().HasBlock(blocks[0].PrevHash) {
for i, block := range blocks[1:] {
@ -253,13 +285,51 @@ out:
}
}
// Handle in batches of 4k
max := int(math.Min(4000, float64(len(blocks))))
for _, block := range blocks[:max] {
self.eth.Eventer().Post("block", block)
if len(blocks) > 0 {
self.eth.Eventer().Post("blocks", blocks)
}
var err error
for i, block := range blocks {
//self.eth.Eventer().Post("block", block)
err = self.eth.StateManager().Process(block, false)
if err != nil {
poollogger.Infoln(err)
poollogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4])
poollogger.Debugln(block)
blocks = blocks[i:]
break
}
self.Remove(block.Hash())
}
if err != nil {
// Remove this bad chain
for _, block := range blocks {
self.Remove(block.Hash())
}
poollogger.Debugf("Punishing peer for supplying bad chain (%v)\n", self.peer.conn.RemoteAddr())
// This peer gave us bad hashes and made us fetch a bad chain, therefor he shall be punished.
self.eth.BlacklistPeer(self.peer)
self.peer.StopWithReason(DiscBadPeer)
self.td = ethutil.Big0
self.peer = nil
}
/*
// Handle in batches of 4k
//max := int(math.Min(4000, float64(len(blocks))))
for _, block := range blocks {
self.eth.Eventer().Post("block", block)
self.Remove(block.Hash())
}
*/
}
}
}

@ -69,6 +69,8 @@ type Ethereum struct {
Addr net.Addr
Port string
blacklist [][]byte
peerMut sync.Mutex
// Capabilities for outgoing peers
@ -211,6 +213,10 @@ func (s *Ethereum) HighestTDPeer() (td *big.Int) {
return
}
func (self *Ethereum) BlacklistPeer(peer *Peer) {
self.blacklist = append(self.blacklist, peer.pubkey)
}
func (s *Ethereum) AddPeer(conn net.Conn) {
peer := NewPeer(conn, s, true)

65
peer.go

@ -39,15 +39,15 @@ const (
// Values are given explicitly instead of by iota because these values are
// defined by the wire protocol spec; it is easier for humans to ensure
// correctness when values are explicit.
DiscReRequested = 0x00
DiscReTcpSysErr = 0x01
DiscBadProto = 0x02
DiscBadPeer = 0x03
DiscTooManyPeers = 0x04
DiscConnDup = 0x05
DiscGenesisErr = 0x06
DiscProtoErr = 0x07
DiscQuitting = 0x08
DiscRequested DiscReason = iota
DiscReTcpSysErr
DiscBadProto
DiscBadPeer
DiscTooManyPeers
DiscConnDup
DiscGenesisErr
DiscProtoErr
DiscQuitting
)
var discReasonToString = []string{
@ -554,19 +554,22 @@ func (self *Peer) FetchBlocks(hashes [][]byte) {
}
func (self *Peer) FetchHashes() {
self.doneFetchingHashes = false
blockPool := self.ethereum.blockPool
blockPool.FetchHashes(self)
if self.td.Cmp(self.ethereum.HighestTDPeer()) >= 0 {
blockPool.td = self.td
/*
if self.td.Cmp(self.ethereum.HighestTDPeer()) >= 0 {
blockPool.td = self.td
if !blockPool.HasLatestHash() {
const amount = 256
peerlogger.Debugf("Fetching hashes (%d)\n", amount)
self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(amount)}))
if !blockPool.HasLatestHash() {
self.doneFetchingHashes = false
const amount = 256
peerlogger.Debugf("Fetching hashes (%d)\n", amount)
self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(amount)}))
}
}
}
*/
}
func (self *Peer) FetchingHashes() bool {
@ -631,18 +634,22 @@ func (p *Peer) Start() {
}
func (p *Peer) Stop() {
p.StopWithReason(DiscRequested)
}
func (p *Peer) StopWithReason(reason DiscReason) {
if atomic.AddInt32(&p.disconnect, 1) != 1 {
return
}
close(p.quit)
if atomic.LoadInt32(&p.connected) != 0 {
p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, ""))
p.conn.Close()
}
// Pre-emptively remove the peer; don't wait for reaping. We already know it's dead if we are here
p.ethereum.RemovePeer(p)
close(p.quit)
if atomic.LoadInt32(&p.connected) != 0 {
p.writeMessage(ethwire.NewMessage(ethwire.MsgDiscTy, reason))
p.conn.Close()
}
}
func (p *Peer) peersMessage() *ethwire.Msg {
@ -764,6 +771,16 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
return
}
// Check for blacklisting
for _, pk := range p.ethereum.blacklist {
if bytes.Compare(pk, pub) == 0 {
peerlogger.Debugf("Blacklisted peer tried to connect (%x...)\n", pubkey[0:4])
p.StopWithReason(DiscBadPeer)
return
}
}
usedPub := 0
// This peer is already added to the peerlist so we expect to find a double pubkey at least once
eachPeer(p.ethereum.Peers(), func(peer *Peer, e *list.Element) {