Refactored a lot of the chain catchup/reorg.

This commit is contained in:
Maran 2014-05-20 11:50:34 +02:00
parent b8034f4d9e
commit 12f30e6220
3 changed files with 79 additions and 58 deletions

@ -127,7 +127,6 @@ func (bc *BlockChain) FindCanonicalChain(blocks []*Block, commonBlockHash []byte
log.Println("[CHAIN] We have found the common parent block, breaking")
break
}
log.Println("Checking incoming blocks:")
chainDifficulty.Add(chainDifficulty, bc.CalculateBlockTD(block))
}
@ -182,6 +181,7 @@ func (bc *BlockChain) ResetTillBlockHash(hash []byte) error {
// XXX Why are we resetting? This is the block chain, it has nothing to do with states
//bc.Ethereum.StateManager().PrepareDefault(returnTo)
// Manually reset the last sync block
err := ethutil.Config.Db.Delete(lastBlock.Hash())
if err != nil {
return err

@ -222,7 +222,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error {
if phost == chost {
alreadyConnected = true
ethutil.Config.Log.Debugf("[SERV] Peer %s already added.\n", chost)
//ethutil.Config.Log.Debugf("[SERV] Peer %s already added.\n", chost)
return
}
})
@ -235,7 +235,7 @@ func (s *Ethereum) ConnectToPeer(addr string) error {
s.peers.PushBack(peer)
ethutil.Config.Log.Infof("[SERV] Adding peer %d / %d\n", s.peers.Len(), s.MaxPeers)
ethutil.Config.Log.Infof("[SERV] Adding peer (%s) %d / %d\n", addr, s.peers.Len(), s.MaxPeers)
}
return nil

131
peer.go

@ -127,6 +127,7 @@ type Peer struct {
// Indicated whether the node is catching up or not
catchingUp bool
diverted bool
blocksRequested int
Version string
@ -190,7 +191,6 @@ func (p *Peer) QueueMessage(msg *ethwire.Msg) {
if atomic.LoadInt32(&p.connected) != 1 {
return
}
p.outputQueue <- msg
}
@ -268,7 +268,6 @@ func (p *Peer) HandleInbound() {
for atomic.LoadInt32(&p.disconnect) == 0 {
// HMM?
time.Sleep(500 * time.Millisecond)
// Wait for a message from the peer
msgs, err := ethwire.ReadMessages(p.conn)
if err != nil {
@ -300,32 +299,36 @@ func (p *Peer) HandleInbound() {
var err error
// Make sure we are actually receiving anything
if msg.Data.Len()-1 > 1 && p.catchingUp {
if msg.Data.Len()-1 > 1 && p.diverted {
// We requested blocks and now we need to make sure we have a common ancestor somewhere in these blocks so we can find
// common ground to start syncing from
lastBlock = ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len() - 1))
if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) {
// If we can't find a common ancenstor we need to request more blocks.
// FIXME: At one point this won't scale anymore since we are not asking for an offset
// we just keep increasing the amount of blocks.
//fmt.Println("[PEER] No common ancestor found, requesting more blocks.")
p.blocksRequested = p.blocksRequested * 2
p.catchingUp = false
p.SyncWithBlocks()
}
ethutil.Config.Log.Infof("[PEER] Last block: %x. Checking if we have it locally.\n", lastBlock.Hash())
for i := msg.Data.Len() - 1; i >= 0; i-- {
block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
// Do we have this block on our chain? If so we can continue
if !p.ethereum.StateManager().BlockChain().HasBlock(block.Hash()) {
// We don't have this block, but we do have a block with the same prevHash, diversion time!
if p.ethereum.StateManager().BlockChain().HasBlockWithPrevHash(block.PrevHash) {
if p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) {
return
p.diverted = false
if !p.ethereum.StateManager().BlockChain().FindCanonicalChainFromMsg(msg, block.PrevHash) {
p.SyncWithPeerToLastKnown()
}
break
}
}
}
if !p.ethereum.StateManager().BlockChain().HasBlock(lastBlock.Hash()) {
// If we can't find a common ancenstor we need to request more blocks.
// FIXME: At one point this won't scale anymore since we are not asking for an offset
// we just keep increasing the amount of blocks.
p.blocksRequested = p.blocksRequested * 2
ethutil.Config.Log.Infof("[PEER] No common ancestor found, requesting %d more blocks.\n", p.blocksRequested)
p.catchingUp = false
p.FindCommonParentBlock()
break
}
}
for i := msg.Data.Len() - 1; i >= 0; i-- {
@ -346,23 +349,28 @@ func (p *Peer) HandleInbound() {
}
}
if msg.Data.Len() == 0 {
// Set catching up to false if
// the peer has nothing left to give
p.catchingUp = false
}
if err != nil {
// If the parent is unknown try to catch up with this peer
if ethchain.IsParentErr(err) {
ethutil.Config.Log.Infoln("Attempting to catch up")
ethutil.Config.Log.Infoln("Attempting to catch up since we don't know the parent")
p.catchingUp = false
p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
} else if ethchain.IsValidationErr(err) {
fmt.Println(err)
fmt.Println("Err:", err)
p.catchingUp = false
}
} else {
// XXX Do we want to catch up if there were errors?
// If we're catching up, try to catch up further.
if p.catchingUp && msg.Data.Len() > 1 {
if ethutil.Config.Debug && lastBlock != nil {
if lastBlock != nil {
blockInfo := lastBlock.BlockInfo()
ethutil.Config.Log.Infof("Synced to block height #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash)
ethutil.Config.Log.Debugf("Synced to block height #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash)
}
p.catchingUp = false
@ -372,11 +380,6 @@ func (p *Peer) HandleInbound() {
}
}
if msg.Data.Len() == 0 {
// Set catching up to false if
// the peer has nothing left to give
p.catchingUp = false
}
case ethwire.MsgTxTy:
// If the message was a transaction queue the transaction
// in the TxPool where it will undergo validation and
@ -444,7 +447,7 @@ func (p *Peer) HandleInbound() {
}
} else {
ethutil.Config.Log.Debugf("[PEER] Could not find a similar block")
//ethutil.Config.Log.Debugf("[PEER] Could not find a similar block")
// If no blocks are found we send back a reply with msg not in chain
// and the last hash from get chain
lastHash := msg.Data.Get(l - 1)
@ -452,8 +455,14 @@ func (p *Peer) HandleInbound() {
p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()}))
}
case ethwire.MsgNotInChainTy:
ethutil.Config.Log.Debugf("Not in chain %x\n", msg.Data)
// TODO
ethutil.Config.Log.Debugf("Not in chain: %x\n", msg.Data.Get(0).Bytes())
if p.diverted == true {
// If were already looking for a common parent and we get here again we need to go deeper
p.blocksRequested = p.blocksRequested * 2
}
p.diverted = true
p.catchingUp = false
p.FindCommonParentBlock()
case ethwire.MsgGetTxsTy:
// Get the current transactions of the pool
txs := p.ethereum.TxPool().CurrentTransactions()
@ -471,7 +480,6 @@ func (p *Peer) HandleInbound() {
}
}
}
p.Stop()
}
@ -581,14 +589,18 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
}
// Catch up with the connected peer
p.SyncWithBlocks()
// Set the peer's caps
p.caps = Caps(c.Get(3).Byte())
// Get a reference to the peers version
p.Version = c.Get(2).Str()
// Catch up with the connected peer
if !p.ethereum.IsUpToDate() {
ethutil.Config.Log.Debugln("Already syncing up with a peer; sleeping")
time.Sleep(10 * time.Second)
}
p.SyncWithPeerToLastKnown()
ethutil.Config.Log.Debugln("[PEER]", p)
}
@ -609,38 +621,47 @@ func (p *Peer) String() string {
return fmt.Sprintf("[%s] (%s) %v %s [%s]", strConnectType, strBoundType, p.conn.RemoteAddr(), p.Version, p.caps)
}
func (p *Peer) SyncWithBlocks() {
if !p.catchingUp {
p.catchingUp = true
// FIXME: THIS SHOULD NOT BE NEEDED
if p.blocksRequested == 0 {
p.blocksRequested = 10
}
blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested)
var hashes []interface{}
for _, block := range blocks {
hashes = append(hashes, block.Hash())
}
msgInfo := append(hashes, uint64(50))
msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo)
p.QueueMessage(msg)
}
func (p *Peer) SyncWithPeerToLastKnown() {
p.catchingUp = false
p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
}
func (p *Peer) FindCommonParentBlock() {
if p.catchingUp {
return
}
p.catchingUp = true
if p.blocksRequested == 0 {
p.blocksRequested = 20
}
blocks := p.ethereum.BlockChain().GetChain(p.ethereum.BlockChain().CurrentBlock.Hash(), p.blocksRequested)
var hashes []interface{}
for _, block := range blocks {
hashes = append(hashes, block.Hash())
}
msgInfo := append(hashes, uint64(len(hashes)))
ethutil.Config.Log.Infof("Asking for block from %x (%d total) from %s\n", p.ethereum.BlockChain().CurrentBlock.Hash(), len(hashes), p.conn.RemoteAddr().String())
msg := ethwire.NewMessage(ethwire.MsgGetChainTy, msgInfo)
p.QueueMessage(msg)
}
func (p *Peer) CatchupWithPeer(blockHash []byte) {
if !p.catchingUp {
// Make sure nobody else is catching up when you want to do this
p.catchingUp = true
msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{blockHash, uint64(50)})
p.QueueMessage(msg)
ethutil.Config.Log.Debugf("Requesting blockchain %x...\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4])
ethutil.Config.Log.Debugf("Requesting blockchain %x... from peer %s\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4], p.conn.RemoteAddr())
msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{})
p.QueueMessage(msg)
ethutil.Config.Log.Debugln("Requested transactions")
/*
msg = ethwire.NewMessage(ethwire.MsgGetTxsTy, []interface{}{})
p.QueueMessage(msg)
*/
}
}