go-ethereum/p2p/simulations/network.go
Bin 0bbd88bda0
all: use timer instead of time.After in loops, to avoid memleaks (#29241)
time.After is equivalent to NewTimer(d).C, and does not call Stop if the timer is no longer needed. This can cause memory leaks. This change changes many such occations to use NewTimer instead, and calling Stop once the timer is no longer needed.
2024-04-09 08:51:54 +02:00

1094 lines
29 KiB
Go

// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package simulations
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)
var DialBanTimeout = 200 * time.Millisecond
// NetworkConfig defines configuration options for starting a Network
type NetworkConfig struct {
ID string `json:"id"`
DefaultService string `json:"default_service,omitempty"`
}
// Network models a p2p simulation network which consists of a collection of
// simulated nodes and the connections which exist between them.
//
// The Network has a single NodeAdapter which is responsible for actually
// starting nodes and connecting them together.
//
// The Network emits events when nodes are started and stopped, when they are
// connected and disconnected, and also when messages are sent between nodes.
type Network struct {
NetworkConfig
Nodes []*Node `json:"nodes"`
nodeMap map[enode.ID]int
// Maps a node property string to node indexes of all nodes that hold this property
propertyMap map[string][]int
Conns []*Conn `json:"conns"`
connMap map[string]int
nodeAdapter adapters.NodeAdapter
events event.Feed
lock sync.RWMutex
quitc chan struct{}
}
// NewNetwork returns a Network which uses the given NodeAdapter and NetworkConfig
func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network {
return &Network{
NetworkConfig: *conf,
nodeAdapter: nodeAdapter,
nodeMap: make(map[enode.ID]int),
propertyMap: make(map[string][]int),
connMap: make(map[string]int),
quitc: make(chan struct{}),
}
}
// Events returns the output event feed of the Network.
func (net *Network) Events() *event.Feed {
return &net.events
}
// NewNodeWithConfig adds a new node to the network with the given config,
// returning an error if a node with the same ID or name already exists
func (net *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) {
net.lock.Lock()
defer net.lock.Unlock()
if conf.Reachable == nil {
conf.Reachable = func(otherID enode.ID) bool {
_, err := net.InitConn(conf.ID, otherID)
if err != nil && bytes.Compare(conf.ID.Bytes(), otherID.Bytes()) < 0 {
return false
}
return true
}
}
// check the node doesn't already exist
if node := net.getNode(conf.ID); node != nil {
return nil, fmt.Errorf("node with ID %q already exists", conf.ID)
}
if node := net.getNodeByName(conf.Name); node != nil {
return nil, fmt.Errorf("node with name %q already exists", conf.Name)
}
// if no services are configured, use the default service
if len(conf.Lifecycles) == 0 {
conf.Lifecycles = []string{net.DefaultService}
}
// use the NodeAdapter to create the node
adapterNode, err := net.nodeAdapter.NewNode(conf)
if err != nil {
return nil, err
}
node := newNode(adapterNode, conf, false)
log.Trace("Node created", "id", conf.ID)
nodeIndex := len(net.Nodes)
net.nodeMap[conf.ID] = nodeIndex
net.Nodes = append(net.Nodes, node)
// Register any node properties with the network-level propertyMap
for _, property := range conf.Properties {
net.propertyMap[property] = append(net.propertyMap[property], nodeIndex)
}
// emit a "control" event
net.events.Send(ControlEvent(node))
return node, nil
}
// Config returns the network configuration
func (net *Network) Config() *NetworkConfig {
return &net.NetworkConfig
}
// StartAll starts all nodes in the network
func (net *Network) StartAll() error {
for _, node := range net.Nodes {
if node.Up() {
continue
}
if err := net.Start(node.ID()); err != nil {
return err
}
}
return nil
}
// StopAll stops all nodes in the network
func (net *Network) StopAll() error {
for _, node := range net.Nodes {
if !node.Up() {
continue
}
if err := net.Stop(node.ID()); err != nil {
return err
}
}
return nil
}
// Start starts the node with the given ID
func (net *Network) Start(id enode.ID) error {
return net.startWithSnapshots(id, nil)
}
// startWithSnapshots starts the node with the given ID using the give
// snapshots
func (net *Network) startWithSnapshots(id enode.ID, snapshots map[string][]byte) error {
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
return fmt.Errorf("node %v does not exist", id)
}
if node.Up() {
return fmt.Errorf("node %v already up", id)
}
log.Trace("Starting node", "id", id, "adapter", net.nodeAdapter.Name())
if err := node.Start(snapshots); err != nil {
log.Warn("Node startup failed", "id", id, "err", err)
return err
}
node.SetUp(true)
log.Info("Started node", "id", id)
ev := NewEvent(node)
net.events.Send(ev)
// subscribe to peer events
client, err := node.Client()
if err != nil {
return fmt.Errorf("error getting rpc client for node %v: %s", id, err)
}
events := make(chan *p2p.PeerEvent)
sub, err := client.Subscribe(context.Background(), "admin", events, "peerEvents")
if err != nil {
return fmt.Errorf("error getting peer events for node %v: %s", id, err)
}
go net.watchPeerEvents(id, events, sub)
return nil
}
// watchPeerEvents reads peer events from the given channel and emits
// corresponding network events
func (net *Network) watchPeerEvents(id enode.ID, events chan *p2p.PeerEvent, sub event.Subscription) {
defer func() {
sub.Unsubscribe()
// assume the node is now down
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
return
}
node.SetUp(false)
ev := NewEvent(node)
net.events.Send(ev)
}()
for {
select {
case event, ok := <-events:
if !ok {
return
}
peer := event.Peer
switch event.Type {
case p2p.PeerEventTypeAdd:
net.DidConnect(id, peer)
case p2p.PeerEventTypeDrop:
net.DidDisconnect(id, peer)
case p2p.PeerEventTypeMsgSend:
net.DidSend(id, peer, event.Protocol, *event.MsgCode)
case p2p.PeerEventTypeMsgRecv:
net.DidReceive(peer, id, event.Protocol, *event.MsgCode)
}
case err := <-sub.Err():
if err != nil {
log.Error("Error in peer event subscription", "id", id, "err", err)
}
return
}
}
}
// Stop stops the node with the given ID
func (net *Network) Stop(id enode.ID) error {
// IMPORTANT: node.Stop() must NOT be called under net.lock as
// node.Reachable() closure has a reference to the network and
// calls net.InitConn() what also locks the network. => DEADLOCK
// That holds until the following ticket is not resolved:
var err error
node, err := func() (*Node, error) {
net.lock.Lock()
defer net.lock.Unlock()
node := net.getNode(id)
if node == nil {
return nil, fmt.Errorf("node %v does not exist", id)
}
if !node.Up() {
return nil, fmt.Errorf("node %v already down", id)
}
node.SetUp(false)
return node, nil
}()
if err != nil {
return err
}
err = node.Stop() // must be called without net.lock
net.lock.Lock()
defer net.lock.Unlock()
if err != nil {
node.SetUp(true)
return err
}
log.Info("Stopped node", "id", id, "err", err)
ev := ControlEvent(node)
net.events.Send(ev)
return nil
}
// Connect connects two nodes together by calling the "admin_addPeer" RPC
// method on the "one" node so that it connects to the "other" node
func (net *Network) Connect(oneID, otherID enode.ID) error {
net.lock.Lock()
defer net.lock.Unlock()
return net.connect(oneID, otherID)
}
func (net *Network) connect(oneID, otherID enode.ID) error {
log.Debug("Connecting nodes with addPeer", "id", oneID, "other", otherID)
conn, err := net.initConn(oneID, otherID)
if err != nil {
return err
}
client, err := conn.one.Client()
if err != nil {
return err
}
net.events.Send(ControlEvent(conn))
return client.Call(nil, "admin_addPeer", string(conn.other.Addr()))
}
// Disconnect disconnects two nodes by calling the "admin_removePeer" RPC
// method on the "one" node so that it disconnects from the "other" node
func (net *Network) Disconnect(oneID, otherID enode.ID) error {
conn := net.GetConn(oneID, otherID)
if conn == nil {
return fmt.Errorf("connection between %v and %v does not exist", oneID, otherID)
}
if !conn.Up {
return fmt.Errorf("%v and %v already disconnected", oneID, otherID)
}
client, err := conn.one.Client()
if err != nil {
return err
}
net.events.Send(ControlEvent(conn))
return client.Call(nil, "admin_removePeer", string(conn.other.Addr()))
}
// DidConnect tracks the fact that the "one" node connected to the "other" node
func (net *Network) DidConnect(one, other enode.ID) error {
net.lock.Lock()
defer net.lock.Unlock()
conn, err := net.getOrCreateConn(one, other)
if err != nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
if conn.Up {
return fmt.Errorf("%v and %v already connected", one, other)
}
conn.Up = true
net.events.Send(NewEvent(conn))
return nil
}
// DidDisconnect tracks the fact that the "one" node disconnected from the
// "other" node
func (net *Network) DidDisconnect(one, other enode.ID) error {
net.lock.Lock()
defer net.lock.Unlock()
conn := net.getConn(one, other)
if conn == nil {
return fmt.Errorf("connection between %v and %v does not exist", one, other)
}
if !conn.Up {
return fmt.Errorf("%v and %v already disconnected", one, other)
}
conn.Up = false
conn.initiated = time.Now().Add(-DialBanTimeout)
net.events.Send(NewEvent(conn))
return nil
}
// DidSend tracks the fact that "sender" sent a message to "receiver"
func (net *Network) DidSend(sender, receiver enode.ID, proto string, code uint64) error {
msg := &Msg{
One: sender,
Other: receiver,
Protocol: proto,
Code: code,
Received: false,
}
net.events.Send(NewEvent(msg))
return nil
}
// DidReceive tracks the fact that "receiver" received a message from "sender"
func (net *Network) DidReceive(sender, receiver enode.ID, proto string, code uint64) error {
msg := &Msg{
One: sender,
Other: receiver,
Protocol: proto,
Code: code,
Received: true,
}
net.events.Send(NewEvent(msg))
return nil
}
// GetNode gets the node with the given ID, returning nil if the node does not
// exist
func (net *Network) GetNode(id enode.ID) *Node {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getNode(id)
}
func (net *Network) getNode(id enode.ID) *Node {
i, found := net.nodeMap[id]
if !found {
return nil
}
return net.Nodes[i]
}
// GetNodeByName gets the node with the given name, returning nil if the node does
// not exist
func (net *Network) GetNodeByName(name string) *Node {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getNodeByName(name)
}
func (net *Network) getNodeByName(name string) *Node {
for _, node := range net.Nodes {
if node.Config.Name == name {
return node
}
}
return nil
}
// GetNodeIDs returns the IDs of all existing nodes
// Nodes can optionally be excluded by specifying their enode.ID.
func (net *Network) GetNodeIDs(excludeIDs ...enode.ID) []enode.ID {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getNodeIDs(excludeIDs)
}
func (net *Network) getNodeIDs(excludeIDs []enode.ID) []enode.ID {
// Get all current nodeIDs
nodeIDs := make([]enode.ID, 0, len(net.nodeMap))
for id := range net.nodeMap {
nodeIDs = append(nodeIDs, id)
}
if len(excludeIDs) > 0 {
// Return the difference of nodeIDs and excludeIDs
return filterIDs(nodeIDs, excludeIDs)
}
return nodeIDs
}
// GetNodes returns the existing nodes.
// Nodes can optionally be excluded by specifying their enode.ID.
func (net *Network) GetNodes(excludeIDs ...enode.ID) []*Node {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getNodes(excludeIDs)
}
func (net *Network) getNodes(excludeIDs []enode.ID) []*Node {
if len(excludeIDs) > 0 {
nodeIDs := net.getNodeIDs(excludeIDs)
return net.getNodesByID(nodeIDs)
}
return net.Nodes
}
// GetNodesByID returns existing nodes with the given enode.IDs.
// If a node doesn't exist with a given enode.ID, it is ignored.
func (net *Network) GetNodesByID(nodeIDs []enode.ID) []*Node {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getNodesByID(nodeIDs)
}
func (net *Network) getNodesByID(nodeIDs []enode.ID) []*Node {
nodes := make([]*Node, 0, len(nodeIDs))
for _, id := range nodeIDs {
node := net.getNode(id)
if node != nil {
nodes = append(nodes, node)
}
}
return nodes
}
// GetNodesByProperty returns existing nodes that have the given property string registered in their NodeConfig
func (net *Network) GetNodesByProperty(property string) []*Node {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getNodesByProperty(property)
}
func (net *Network) getNodesByProperty(property string) []*Node {
nodes := make([]*Node, 0, len(net.propertyMap[property]))
for _, nodeIndex := range net.propertyMap[property] {
nodes = append(nodes, net.Nodes[nodeIndex])
}
return nodes
}
// GetNodeIDsByProperty returns existing node's enode IDs that have the given property string registered in the NodeConfig
func (net *Network) GetNodeIDsByProperty(property string) []enode.ID {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getNodeIDsByProperty(property)
}
func (net *Network) getNodeIDsByProperty(property string) []enode.ID {
nodeIDs := make([]enode.ID, 0, len(net.propertyMap[property]))
for _, nodeIndex := range net.propertyMap[property] {
node := net.Nodes[nodeIndex]
nodeIDs = append(nodeIDs, node.ID())
}
return nodeIDs
}
// GetRandomUpNode returns a random node on the network, which is running.
func (net *Network) GetRandomUpNode(excludeIDs ...enode.ID) *Node {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getRandomUpNode(excludeIDs...)
}
// getRandomUpNode returns a random node on the network, which is running.
func (net *Network) getRandomUpNode(excludeIDs ...enode.ID) *Node {
return net.getRandomNode(net.getUpNodeIDs(), excludeIDs)
}
func (net *Network) getUpNodeIDs() (ids []enode.ID) {
for _, node := range net.Nodes {
if node.Up() {
ids = append(ids, node.ID())
}
}
return ids
}
// GetRandomDownNode returns a random node on the network, which is stopped.
func (net *Network) GetRandomDownNode(excludeIDs ...enode.ID) *Node {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getRandomNode(net.getDownNodeIDs(), excludeIDs)
}
func (net *Network) getDownNodeIDs() (ids []enode.ID) {
for _, node := range net.Nodes {
if !node.Up() {
ids = append(ids, node.ID())
}
}
return ids
}
// GetRandomNode returns a random node on the network, regardless of whether it is running or not
func (net *Network) GetRandomNode(excludeIDs ...enode.ID) *Node {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getRandomNode(net.getNodeIDs(nil), excludeIDs) // no need to exclude twice
}
func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node {
filtered := filterIDs(ids, excludeIDs)
l := len(filtered)
if l == 0 {
return nil
}
return net.getNode(filtered[rand.Intn(l)])
}
func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID {
exclude := make(map[enode.ID]bool)
for _, id := range excludeIDs {
exclude[id] = true
}
var filtered []enode.ID
for _, id := range ids {
if _, found := exclude[id]; !found {
filtered = append(filtered, id)
}
}
return filtered
}
// GetConn returns the connection which exists between "one" and "other"
// regardless of which node initiated the connection
func (net *Network) GetConn(oneID, otherID enode.ID) *Conn {
net.lock.RLock()
defer net.lock.RUnlock()
return net.getConn(oneID, otherID)
}
// GetOrCreateConn is like GetConn but creates the connection if it doesn't
// already exist
func (net *Network) GetOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
net.lock.Lock()
defer net.lock.Unlock()
return net.getOrCreateConn(oneID, otherID)
}
func (net *Network) getOrCreateConn(oneID, otherID enode.ID) (*Conn, error) {
if conn := net.getConn(oneID, otherID); conn != nil {
return conn, nil
}
one := net.getNode(oneID)
if one == nil {
return nil, fmt.Errorf("node %v does not exist", oneID)
}
other := net.getNode(otherID)
if other == nil {
return nil, fmt.Errorf("node %v does not exist", otherID)
}
conn := &Conn{
One: oneID,
Other: otherID,
one: one,
other: other,
}
label := ConnLabel(oneID, otherID)
net.connMap[label] = len(net.Conns)
net.Conns = append(net.Conns, conn)
return conn, nil
}
func (net *Network) getConn(oneID, otherID enode.ID) *Conn {
label := ConnLabel(oneID, otherID)
i, found := net.connMap[label]
if !found {
return nil
}
return net.Conns[i]
}
// InitConn retrieves the connection model for the connection between
// peers 'oneID' and 'otherID', or creates a new one if it does not exist
// the order of nodes does not matter, i.e., Conn(i,j) == Conn(j, i)
// it checks if the connection is already up, and if the nodes are running
// NOTE:
// it also checks whether there has been recent attempt to connect the peers
// this is cheating as the simulation is used as an oracle and know about
// remote peers attempt to connect to a node which will then not initiate the connection
func (net *Network) InitConn(oneID, otherID enode.ID) (*Conn, error) {
net.lock.Lock()
defer net.lock.Unlock()
return net.initConn(oneID, otherID)
}
func (net *Network) initConn(oneID, otherID enode.ID) (*Conn, error) {
if oneID == otherID {
return nil, fmt.Errorf("refusing to connect to self %v", oneID)
}
conn, err := net.getOrCreateConn(oneID, otherID)
if err != nil {
return nil, err
}
if conn.Up {
return nil, fmt.Errorf("%v and %v already connected", oneID, otherID)
}
if time.Since(conn.initiated) < DialBanTimeout {
return nil, fmt.Errorf("connection between %v and %v recently attempted", oneID, otherID)
}
err = conn.nodesUp()
if err != nil {
log.Trace("Nodes not up", "err", err)
return nil, fmt.Errorf("nodes not up: %v", err)
}
log.Debug("Connection initiated", "id", oneID, "other", otherID)
conn.initiated = time.Now()
return conn, nil
}
// Shutdown stops all nodes in the network and closes the quit channel
func (net *Network) Shutdown() {
for _, node := range net.Nodes {
log.Debug("Stopping node", "id", node.ID())
if err := node.Stop(); err != nil {
log.Warn("Can't stop node", "id", node.ID(), "err", err)
}
}
close(net.quitc)
}
// Reset resets all network properties:
// empties the nodes and the connection list
func (net *Network) Reset() {
net.lock.Lock()
defer net.lock.Unlock()
//re-initialize the maps
net.connMap = make(map[string]int)
net.nodeMap = make(map[enode.ID]int)
net.propertyMap = make(map[string][]int)
net.Nodes = nil
net.Conns = nil
}
// Node is a wrapper around adapters.Node which is used to track the status
// of a node in the network
type Node struct {
adapters.Node `json:"-"`
// Config if the config used to created the node
Config *adapters.NodeConfig `json:"config"`
// up tracks whether or not the node is running
up bool
upMu *sync.RWMutex
}
func newNode(an adapters.Node, ac *adapters.NodeConfig, up bool) *Node {
return &Node{Node: an, Config: ac, up: up, upMu: new(sync.RWMutex)}
}
func (n *Node) copy() *Node {
configCpy := *n.Config
return newNode(n.Node, &configCpy, n.Up())
}
// Up returns whether the node is currently up (online)
func (n *Node) Up() bool {
n.upMu.RLock()
defer n.upMu.RUnlock()
return n.up
}
// SetUp sets the up (online) status of the nodes with the given value
func (n *Node) SetUp(up bool) {
n.upMu.Lock()
defer n.upMu.Unlock()
n.up = up
}
// ID returns the ID of the node
func (n *Node) ID() enode.ID {
return n.Config.ID
}
// String returns a log-friendly string
func (n *Node) String() string {
return fmt.Sprintf("Node %v", n.ID().TerminalString())
}
// NodeInfo returns information about the node
func (n *Node) NodeInfo() *p2p.NodeInfo {
// avoid a panic if the node is not started yet
if n.Node == nil {
return nil
}
info := n.Node.NodeInfo()
info.Name = n.Config.Name
return info
}
// MarshalJSON implements the json.Marshaler interface so that the encoded
// JSON includes the NodeInfo
func (n *Node) MarshalJSON() ([]byte, error) {
return json.Marshal(struct {
Info *p2p.NodeInfo `json:"info,omitempty"`
Config *adapters.NodeConfig `json:"config,omitempty"`
Up bool `json:"up"`
}{
Info: n.NodeInfo(),
Config: n.Config,
Up: n.Up(),
})
}
// UnmarshalJSON implements json.Unmarshaler interface so that we don't lose Node.up
// status. IMPORTANT: The implementation is incomplete; we lose p2p.NodeInfo.
func (n *Node) UnmarshalJSON(raw []byte) error {
// TODO: How should we turn back NodeInfo into n.Node?
// Ticket: https://github.com/ethersphere/go-ethereum/issues/1177
var node struct {
Config *adapters.NodeConfig `json:"config,omitempty"`
Up bool `json:"up"`
}
if err := json.Unmarshal(raw, &node); err != nil {
return err
}
*n = *newNode(nil, node.Config, node.Up)
return nil
}
// Conn represents a connection between two nodes in the network
type Conn struct {
// One is the node which initiated the connection
One enode.ID `json:"one"`
// Other is the node which the connection was made to
Other enode.ID `json:"other"`
// Up tracks whether or not the connection is active
Up bool `json:"up"`
// Registers when the connection was grabbed to dial
initiated time.Time
one *Node
other *Node
}
// nodesUp returns whether both nodes are currently up
func (c *Conn) nodesUp() error {
if !c.one.Up() {
return fmt.Errorf("one %v is not up", c.One)
}
if !c.other.Up() {
return fmt.Errorf("other %v is not up", c.Other)
}
return nil
}
// String returns a log-friendly string
func (c *Conn) String() string {
return fmt.Sprintf("Conn %v->%v", c.One.TerminalString(), c.Other.TerminalString())
}
// Msg represents a p2p message sent between two nodes in the network
type Msg struct {
One enode.ID `json:"one"`
Other enode.ID `json:"other"`
Protocol string `json:"protocol"`
Code uint64 `json:"code"`
Received bool `json:"received"`
}
// String returns a log-friendly string
func (m *Msg) String() string {
return fmt.Sprintf("Msg(%d) %v->%v", m.Code, m.One.TerminalString(), m.Other.TerminalString())
}
// ConnLabel generates a deterministic string which represents a connection
// between two nodes, used to compare if two connections are between the same
// nodes
func ConnLabel(source, target enode.ID) string {
var first, second enode.ID
if bytes.Compare(source.Bytes(), target.Bytes()) > 0 {
first = target
second = source
} else {
first = source
second = target
}
return fmt.Sprintf("%v-%v", first, second)
}
// Snapshot represents the state of a network at a single point in time and can
// be used to restore the state of a network
type Snapshot struct {
Nodes []NodeSnapshot `json:"nodes,omitempty"`
Conns []Conn `json:"conns,omitempty"`
}
// NodeSnapshot represents the state of a node in the network
type NodeSnapshot struct {
Node Node `json:"node,omitempty"`
// Snapshots is arbitrary data gathered from calling node.Snapshots()
Snapshots map[string][]byte `json:"snapshots,omitempty"`
}
// Snapshot creates a network snapshot
func (net *Network) Snapshot() (*Snapshot, error) {
return net.snapshot(nil, nil)
}
func (net *Network) SnapshotWithServices(addServices []string, removeServices []string) (*Snapshot, error) {
return net.snapshot(addServices, removeServices)
}
func (net *Network) snapshot(addServices []string, removeServices []string) (*Snapshot, error) {
net.lock.Lock()
defer net.lock.Unlock()
snap := &Snapshot{
Nodes: make([]NodeSnapshot, len(net.Nodes)),
}
for i, node := range net.Nodes {
snap.Nodes[i] = NodeSnapshot{Node: *node.copy()}
if !node.Up() {
continue
}
snapshots, err := node.Snapshots()
if err != nil {
return nil, err
}
snap.Nodes[i].Snapshots = snapshots
for _, addSvc := range addServices {
haveSvc := false
for _, svc := range snap.Nodes[i].Node.Config.Lifecycles {
if svc == addSvc {
haveSvc = true
break
}
}
if !haveSvc {
snap.Nodes[i].Node.Config.Lifecycles = append(snap.Nodes[i].Node.Config.Lifecycles, addSvc)
}
}
if len(removeServices) > 0 {
var cleanedServices []string
for _, svc := range snap.Nodes[i].Node.Config.Lifecycles {
haveSvc := false
for _, rmSvc := range removeServices {
if rmSvc == svc {
haveSvc = true
break
}
}
if !haveSvc {
cleanedServices = append(cleanedServices, svc)
}
}
snap.Nodes[i].Node.Config.Lifecycles = cleanedServices
}
}
for _, conn := range net.Conns {
if conn.Up {
snap.Conns = append(snap.Conns, *conn)
}
}
return snap, nil
}
// longrunning tests may need a longer timeout
var snapshotLoadTimeout = 900 * time.Second
// Load loads a network snapshot
func (net *Network) Load(snap *Snapshot) error {
// Start nodes.
for _, n := range snap.Nodes {
if _, err := net.NewNodeWithConfig(n.Node.Config); err != nil {
return err
}
if !n.Node.Up() {
continue
}
if err := net.startWithSnapshots(n.Node.Config.ID, n.Snapshots); err != nil {
return err
}
}
// Prepare connection events counter.
allConnected := make(chan struct{}) // closed when all connections are established
done := make(chan struct{}) // ensures that the event loop goroutine is terminated
defer close(done)
// Subscribe to event channel.
// It needs to be done outside of the event loop goroutine (created below)
// to ensure that the event channel is blocking before connect calls are made.
events := make(chan *Event)
sub := net.Events().Subscribe(events)
defer sub.Unsubscribe()
go func() {
// Expected number of connections.
total := len(snap.Conns)
// Set of all established connections from the snapshot, not other connections.
// Key array element 0 is the connection One field value, and element 1 connection Other field.
connections := make(map[[2]enode.ID]struct{}, total)
for {
select {
case e := <-events:
// Ignore control events as they do not represent
// connect or disconnect (Up) state change.
if e.Control {
continue
}
// Detect only connection events.
if e.Type != EventTypeConn {
continue
}
connection := [2]enode.ID{e.Conn.One, e.Conn.Other}
// Nodes are still not connected or have been disconnected.
if !e.Conn.Up {
// Delete the connection from the set of established connections.
// This will prevent false positive in case disconnections happen.
delete(connections, connection)
log.Warn("load snapshot: unexpected disconnection", "one", e.Conn.One, "other", e.Conn.Other)
continue
}
// Check that the connection is from the snapshot.
for _, conn := range snap.Conns {
if conn.One == e.Conn.One && conn.Other == e.Conn.Other {
// Add the connection to the set of established connections.
connections[connection] = struct{}{}
if len(connections) == total {
// Signal that all nodes are connected.
close(allConnected)
return
}
break
}
}
case <-done:
// Load function returned, terminate this goroutine.
return
}
}
}()
// Start connecting.
for _, conn := range snap.Conns {
if !net.GetNode(conn.One).Up() || !net.GetNode(conn.Other).Up() {
//in this case, at least one of the nodes of a connection is not up,
//so it would result in the snapshot `Load` to fail
continue
}
if err := net.Connect(conn.One, conn.Other); err != nil {
return err
}
}
timeout := time.NewTimer(snapshotLoadTimeout)
defer timeout.Stop()
select {
// Wait until all connections from the snapshot are established.
case <-allConnected:
// Make sure that we do not wait forever.
case <-timeout.C:
return errors.New("snapshot connections not established")
}
return nil
}
// Subscribe reads control events from a channel and executes them
func (net *Network) Subscribe(events chan *Event) {
for {
select {
case event, ok := <-events:
if !ok {
return
}
if event.Control {
net.executeControlEvent(event)
}
case <-net.quitc:
return
}
}
}
func (net *Network) executeControlEvent(event *Event) {
log.Trace("Executing control event", "type", event.Type, "event", event)
switch event.Type {
case EventTypeNode:
if err := net.executeNodeEvent(event); err != nil {
log.Error("Error executing node event", "event", event, "err", err)
}
case EventTypeConn:
if err := net.executeConnEvent(event); err != nil {
log.Error("Error executing conn event", "event", event, "err", err)
}
case EventTypeMsg:
log.Warn("Ignoring control msg event")
}
}
func (net *Network) executeNodeEvent(e *Event) error {
if !e.Node.Up() {
return net.Stop(e.Node.ID())
}
if _, err := net.NewNodeWithConfig(e.Node.Config); err != nil {
return err
}
return net.Start(e.Node.ID())
}
func (net *Network) executeConnEvent(e *Event) error {
if e.Conn.Up {
return net.Connect(e.Conn.One, e.Conn.Other)
}
return net.Disconnect(e.Conn.One, e.Conn.Other)
}