Merge pull request #704 from fjl/p2p-concurrency-fixups

p2p: more concurrency fixups
This commit is contained in:
Jeffrey Wilcke 2015-04-14 00:38:47 +02:00
commit 2ea98d9b74
6 changed files with 14 additions and 41 deletions

@ -14,8 +14,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/secp256k1" "github.com/ethereum/go-ethereum/crypto/secp256k1"
@ -31,9 +29,6 @@ type Node struct {
DiscPort int // UDP listening port for discovery protocol DiscPort int // UDP listening port for discovery protocol
TCPPort int // TCP listening port for RLPx TCPPort int // TCP listening port for RLPx
// this must be set/read using atomic load and store.
activeStamp int64
} }
func newNode(id NodeID, addr *net.UDPAddr) *Node { func newNode(id NodeID, addr *net.UDPAddr) *Node {
@ -50,16 +45,6 @@ func (n *Node) isValid() bool {
return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.TCPPort != 0 && n.DiscPort != 0 return !n.IP.IsMulticast() && !n.IP.IsUnspecified() && n.TCPPort != 0 && n.DiscPort != 0
} }
func (n *Node) bumpActive() {
stamp := time.Now().Unix()
atomic.StoreInt64(&n.activeStamp, stamp)
}
func (n *Node) active() time.Time {
stamp := atomic.LoadInt64(&n.activeStamp)
return time.Unix(stamp, 0)
}
func (n *Node) addr() *net.UDPAddr { func (n *Node) addr() *net.UDPAddr {
return &net.UDPAddr{IP: n.IP, Port: n.DiscPort} return &net.UDPAddr{IP: n.IP, Port: n.DiscPort}
} }

@ -326,7 +326,6 @@ outer:
func (b *bucket) bump(n *Node) bool { func (b *bucket) bump(n *Node) bool {
for i := range b.entries { for i := range b.entries {
if b.entries[i].ID == n.ID { if b.entries[i].ID == n.ID {
n.bumpActive()
// move it to the front // move it to the front
copy(b.entries[1:], b.entries[:i]) copy(b.entries[1:], b.entries[:i])
b.entries[0] = n b.entries[0] = n

@ -267,11 +267,12 @@ func (t *udp) loop() {
defer timeout.Stop() defer timeout.Stop()
rearmTimeout := func() { rearmTimeout := func() {
if len(pending) == 0 || nextDeadline == pending[0].deadline { now := time.Now()
if len(pending) == 0 || now.Before(nextDeadline) {
return return
} }
nextDeadline = pending[0].deadline nextDeadline = pending[0].deadline
timeout.Reset(nextDeadline.Sub(time.Now())) timeout.Reset(nextDeadline.Sub(now))
} }
for { for {

@ -115,7 +115,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake,
// returning the handshake read error. If the remote side // returning the handshake read error. If the remote side
// disconnects us early with a valid reason, we should return it // disconnects us early with a valid reason, we should return it
// as the error so it can be tracked elsewhere. // as the error so it can be tracked elsewhere.
werr := make(chan error) werr := make(chan error, 1)
go func() { werr <- Send(rw, handshakeMsg, our) }() go func() { werr <- Send(rw, handshakeMsg, our) }()
rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our) rhs, err := readProtocolHandshake(rw, secrets.RemoteID, our)
if err != nil { if err != nil {

@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net" "net"
"sort" "sort"
"sync" "sync"
@ -20,8 +19,7 @@ const (
baseProtocolLength = uint64(16) baseProtocolLength = uint64(16)
baseProtocolMaxMsgSize = 10 * 1024 * 1024 baseProtocolMaxMsgSize = 10 * 1024 * 1024
pingInterval = 15 * time.Second pingInterval = 15 * time.Second
disconnectGracePeriod = 2 * time.Second
) )
const ( const (
@ -129,39 +127,27 @@ func (p *Peer) run() DiscReason {
case err := <-readErr: case err := <-readErr:
if r, ok := err.(DiscReason); ok { if r, ok := err.(DiscReason); ok {
reason = r reason = r
break } else {
// Note: We rely on protocols to abort if there is a write
// error. It might be more robust to handle them here as well.
p.DebugDetailf("Read error: %v\n", err)
reason = DiscNetworkError
} }
// Note: We rely on protocols to abort if there is a write
// error. It might be more robust to handle them here as well.
p.DebugDetailf("Read error: %v\n", err)
p.conn.Close()
reason = DiscNetworkError
case err := <-p.protoErr: case err := <-p.protoErr:
reason = discReasonForError(err) reason = discReasonForError(err)
case reason = <-p.disc: case reason = <-p.disc:
} }
close(p.closed) close(p.closed)
p.politeDisconnect(reason)
p.wg.Wait() p.wg.Wait()
if reason != DiscNetworkError {
p.politeDisconnect(reason)
}
p.Debugf("Disconnected: %v\n", reason) p.Debugf("Disconnected: %v\n", reason)
return reason return reason
} }
func (p *Peer) politeDisconnect(reason DiscReason) { func (p *Peer) politeDisconnect(reason DiscReason) {
done := make(chan struct{}) if reason != DiscNetworkError {
go func() {
SendItems(p.rw, discMsg, uint(reason)) SendItems(p.rw, discMsg, uint(reason))
// Wait for the other side to close the connection.
// Discard any data that they send until then.
io.Copy(ioutil.Discard, p.conn)
close(done)
}()
select {
case <-done:
case <-time.After(disconnectGracePeriod):
} }
p.conn.Close() p.conn.Close()
} }

@ -260,9 +260,11 @@ func (srv *Server) Stop() {
// No new peers can be added at this point because dialLoop and // No new peers can be added at this point because dialLoop and
// listenLoop are down. It is safe to call peerWG.Wait because // listenLoop are down. It is safe to call peerWG.Wait because
// peerWG.Add is not called outside of those loops. // peerWG.Add is not called outside of those loops.
srv.lock.Lock()
for _, peer := range srv.peers { for _, peer := range srv.peers {
peer.Disconnect(DiscQuitting) peer.Disconnect(DiscQuitting)
} }
srv.lock.Unlock()
srv.peerWG.Wait() srv.peerWG.Wait()
} }