Merge pull request #17747 from ethersphere/max-stream-peer-servers

Add stream peer servers limit
This commit is contained in:
Viktor Trón 2018-09-28 11:04:07 +02:00 committed by GitHub
commit 0da3b17a11
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 276 additions and 75 deletions

@ -59,27 +59,28 @@ var (
//constants for environment variables
const (
SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR"
SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT"
SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR"
SWARM_ENV_PORT = "SWARM_PORT"
SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID"
SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE"
SWARM_ENV_SWAP_API = "SWARM_SWAP_API"
SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE"
SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY"
SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE"
SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK"
SWARM_ENV_ENS_API = "SWARM_ENS_API"
SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR"
SWARM_ENV_CORS = "SWARM_CORS"
SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES"
SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE"
SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH"
SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY"
SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY"
SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD"
GETH_ENV_DATADIR = "GETH_DATADIR"
SWARM_ENV_CHEQUEBOOK_ADDR = "SWARM_CHEQUEBOOK_ADDR"
SWARM_ENV_ACCOUNT = "SWARM_ACCOUNT"
SWARM_ENV_LISTEN_ADDR = "SWARM_LISTEN_ADDR"
SWARM_ENV_PORT = "SWARM_PORT"
SWARM_ENV_NETWORK_ID = "SWARM_NETWORK_ID"
SWARM_ENV_SWAP_ENABLE = "SWARM_SWAP_ENABLE"
SWARM_ENV_SWAP_API = "SWARM_SWAP_API"
SWARM_ENV_SYNC_DISABLE = "SWARM_SYNC_DISABLE"
SWARM_ENV_SYNC_UPDATE_DELAY = "SWARM_ENV_SYNC_UPDATE_DELAY"
SWARM_ENV_MAX_STREAM_PEER_SERVERS = "SWARM_ENV_MAX_STREAM_PEER_SERVERS"
SWARM_ENV_LIGHT_NODE_ENABLE = "SWARM_LIGHT_NODE_ENABLE"
SWARM_ENV_DELIVERY_SKIP_CHECK = "SWARM_DELIVERY_SKIP_CHECK"
SWARM_ENV_ENS_API = "SWARM_ENS_API"
SWARM_ENV_ENS_ADDR = "SWARM_ENS_ADDR"
SWARM_ENV_CORS = "SWARM_CORS"
SWARM_ENV_BOOTNODES = "SWARM_BOOTNODES"
SWARM_ENV_PSS_ENABLE = "SWARM_PSS_ENABLE"
SWARM_ENV_STORE_PATH = "SWARM_STORE_PATH"
SWARM_ENV_STORE_CAPACITY = "SWARM_STORE_CAPACITY"
SWARM_ENV_STORE_CACHE_CAPACITY = "SWARM_STORE_CACHE_CAPACITY"
SWARM_ACCESS_PASSWORD = "SWARM_ACCESS_PASSWORD"
GETH_ENV_DATADIR = "GETH_DATADIR"
)
// These settings ensure that TOML keys use the same names as Go struct fields.
@ -207,6 +208,9 @@ func cmdLineOverride(currentConfig *bzzapi.Config, ctx *cli.Context) *bzzapi.Con
currentConfig.SyncUpdateDelay = d
}
// any value including 0 is acceptable
currentConfig.MaxStreamPeerServers = ctx.GlobalInt(SwarmMaxStreamPeerServersFlag.Name)
if ctx.GlobalIsSet(SwarmLightNodeEnabled.Name) {
currentConfig.LightNodeEnabled = true
}
@ -308,6 +312,14 @@ func envVarsOverride(currentConfig *bzzapi.Config) (config *bzzapi.Config) {
}
}
if max := os.Getenv(SWARM_ENV_MAX_STREAM_PEER_SERVERS); max != "" {
m, err := strconv.Atoi(max)
if err != nil {
utils.Fatalf("invalid environment variable %s: %v", SWARM_ENV_MAX_STREAM_PEER_SERVERS, err)
}
currentConfig.MaxStreamPeerServers = m
}
if lne := os.Getenv(SWARM_ENV_LIGHT_NODE_ENABLE); lne != "" {
if lightnode, err := strconv.ParseBool(lne); err != nil {
currentConfig.LightNodeEnabled = lightnode

@ -116,6 +116,12 @@ var (
Usage: "Duration for sync subscriptions update after no new peers are added (default 15s)",
EnvVar: SWARM_ENV_SYNC_UPDATE_DELAY,
}
SwarmMaxStreamPeerServersFlag = cli.IntFlag{
Name: "max-stream-peer-servers",
Usage: "Limit of Stream peer servers, 0 denotes unlimited",
EnvVar: SWARM_ENV_MAX_STREAM_PEER_SERVERS,
Value: 10000, // A very large default value is possible as stream servers have very small memory footprint
}
SwarmLightNodeEnabled = cli.BoolFlag{
Name: "lightnode",
Usage: "Enable Swarm LightNode (default false)",
@ -542,6 +548,7 @@ pv(1) tool to get a progress bar:
SwarmSwapAPIFlag,
SwarmSyncDisabledFlag,
SwarmSyncUpdateDelay,
SwarmMaxStreamPeerServersFlag,
SwarmLightNodeEnabled,
SwarmDeliverySkipCheckFlag,
SwarmListenAddrFlag,

@ -50,26 +50,27 @@ type Config struct {
Swap *swap.LocalProfile
Pss *pss.PssParams
//*network.SyncParams
Contract common.Address
EnsRoot common.Address
EnsAPIs []string
Path string
ListenAddr string
Port string
PublicKey string
BzzKey string
NodeID string
NetworkID uint64
SwapEnabled bool
SyncEnabled bool
SyncingSkipCheck bool
DeliverySkipCheck bool
LightNodeEnabled bool
SyncUpdateDelay time.Duration
SwapAPI string
Cors string
BzzAccount string
privateKey *ecdsa.PrivateKey
Contract common.Address
EnsRoot common.Address
EnsAPIs []string
Path string
ListenAddr string
Port string
PublicKey string
BzzKey string
NodeID string
NetworkID uint64
SwapEnabled bool
SyncEnabled bool
SyncingSkipCheck bool
DeliverySkipCheck bool
MaxStreamPeerServers int
LightNodeEnabled bool
SyncUpdateDelay time.Duration
SwapAPI string
Cors string
BzzAccount string
privateKey *ecdsa.PrivateKey
}
//create a default config with all parameters to set to defaults
@ -80,20 +81,21 @@ func NewConfig() (c *Config) {
FileStoreParams: storage.NewFileStoreParams(),
HiveParams: network.NewHiveParams(),
//SyncParams: network.NewDefaultSyncParams(),
Swap: swap.NewDefaultSwapParams(),
Pss: pss.NewPssParams(),
ListenAddr: DefaultHTTPListenAddr,
Port: DefaultHTTPPort,
Path: node.DefaultDataDir(),
EnsAPIs: nil,
EnsRoot: ens.TestNetAddress,
NetworkID: network.DefaultNetworkID,
SwapEnabled: false,
SyncEnabled: true,
SyncingSkipCheck: false,
DeliverySkipCheck: true,
SyncUpdateDelay: 15 * time.Second,
SwapAPI: "",
Swap: swap.NewDefaultSwapParams(),
Pss: pss.NewPssParams(),
ListenAddr: DefaultHTTPListenAddr,
Port: DefaultHTTPPort,
Path: node.DefaultDataDir(),
EnsAPIs: nil,
EnsRoot: ens.TestNetAddress,
NetworkID: network.DefaultNetworkID,
SwapEnabled: false,
SyncEnabled: true,
SyncingSkipCheck: false,
MaxStreamPeerServers: 10000,
DeliverySkipCheck: true,
SyncUpdateDelay: 15 * time.Second,
SwapAPI: "",
}
return

@ -84,7 +84,7 @@ func createGlobalStore() (string, *mockdb.GlobalStore, error) {
return globalStoreDir, globalStore, nil
}
func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
func newStreamerTester(t *testing.T, registryOptions *RegistryOptions) (*p2ptest.ProtocolTester, *Registry, *storage.LocalStore, func(), error) {
// setup
addr := network.RandomAddr() // tested peers peer address
to := network.NewKademlia(addr.OAddr, network.NewKadParams())
@ -114,7 +114,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora
delivery := NewDelivery(to, netStore)
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), nil)
streamer := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), registryOptions)
teardown := func() {
streamer.Close()
removeDataDir()

@ -39,7 +39,7 @@ import (
)
func TestStreamerRetrieveRequest(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -75,7 +75,7 @@ func TestStreamerRetrieveRequest(t *testing.T) {
}
func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -127,7 +127,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
// upstream request server receives a retrieve Request and responds with
// offered hashes or delivery if skipHash is set to true
func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t)
tester, streamer, localStore, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -221,7 +221,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
}
func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
tester, streamer, localStore, teardown, err := newStreamerTester(t)
tester, streamer, localStore, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)

@ -84,11 +84,13 @@ func (p *Peer) handleSubscribeMsg(ctx context.Context, req *SubscribeMsg) (err e
defer func() {
if err != nil {
if e := p.Send(context.TODO(), SubscribeErrorMsg{
// The error will be sent as a subscribe error message
// and will not be returned as it will prevent any new message
// exchange between peers over p2p. Instead, error will be returned
// only if there is one from sending subscribe error message.
err = p.Send(context.TODO(), SubscribeErrorMsg{
Error: err.Error(),
}); e != nil {
log.Error("send stream subscribe error message", "err", err)
}
})
}
}()

@ -18,6 +18,7 @@ package stream
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -46,6 +47,10 @@ func (e *notFoundError) Error() string {
return fmt.Sprintf("%s not found for stream %q", e.t, e.s)
}
// ErrMaxPeerServers will be returned if peer server limit is reached.
// It will be sent in the SubscribeErrorMsg.
var ErrMaxPeerServers = errors.New("max peer servers")
// Peer is the Peer extension for the streaming protocol
type Peer struct {
*protocols.Peer
@ -204,6 +209,11 @@ func (p *Peer) setServer(s Stream, o Server, priority uint8) (*server, error) {
if p.servers[s] != nil {
return nil, fmt.Errorf("server %s already registered", s)
}
if p.streamer.maxPeerServers > 0 && len(p.servers) >= p.streamer.maxPeerServers {
return nil, ErrMaxPeerServers
}
os := &server{
Server: o,
stream: s,
@ -346,6 +356,7 @@ func (p *Peer) removeClient(s Stream) error {
return newNotFoundError("client", s)
}
client.close()
delete(p.clients, s)
return nil
}

@ -60,6 +60,7 @@ type Registry struct {
delivery *Delivery
intervalsStore state.Store
doRetrieve bool
maxPeerServers int
}
// RegistryOptions holds optional values for NewRegistry constructor.
@ -68,6 +69,7 @@ type RegistryOptions struct {
DoSync bool
DoRetrieve bool
SyncUpdateDelay time.Duration
MaxPeerServers int // The limit of servers for each peer in registry
}
// NewRegistry is Streamer constructor
@ -87,6 +89,7 @@ func NewRegistry(localID enode.ID, delivery *Delivery, syncChunkStore storage.Sy
delivery: delivery,
intervalsStore: intervalsStore,
doRetrieve: options.DoRetrieve,
maxPeerServers: options.MaxPeerServers,
}
streamer.api = NewAPI(streamer)
delivery.getPeer = streamer.getPeer

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"errors"
"strconv"
"testing"
"time"
@ -28,7 +29,7 @@ import (
)
func TestStreamerSubscribe(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -42,7 +43,7 @@ func TestStreamerSubscribe(t *testing.T) {
}
func TestStreamerRequestSubscription(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -127,7 +128,7 @@ func (self *testServer) Close() {
}
func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -220,7 +221,7 @@ func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -287,7 +288,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -353,7 +354,7 @@ func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) {
}
func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -397,7 +398,7 @@ func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) {
}
func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -462,7 +463,7 @@ func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) {
}
func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -527,7 +528,7 @@ func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) {
}
func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -626,7 +627,7 @@ func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) {
}
func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
tester, streamer, _, teardown, err := newStreamerTester(t)
tester, streamer, _, teardown, err := newStreamerTester(t, nil)
defer teardown()
if err != nil {
t.Fatal(err)
@ -752,3 +753,165 @@ func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) {
t.Fatal(err)
}
}
// TestMaxPeerServersWithUnsubscribe creates a registry with a limited
// number of stream servers, and performs a test with subscriptions and
// unsubscriptions, checking if unsubscriptions will remove streams,
// leaving place for new streams.
func TestMaxPeerServersWithUnsubscribe(t *testing.T) {
var maxPeerServers = 6
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
MaxPeerServers: maxPeerServers,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t), nil
})
node := tester.Nodes[0]
for i := 0; i < maxPeerServers+10; i++ {
stream := NewStream("foo", strconv.Itoa(i), true)
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Triggers: []p2ptest.Trigger{
{
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
Priority: Top,
},
Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
{
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 1,
},
Peer: node.ID(),
},
},
})
if err != nil {
t.Fatal(err)
}
err = tester.TestExchanges(p2ptest.Exchange{
Label: "unsubscribe message",
Triggers: []p2ptest.Trigger{
{
Code: 0,
Msg: &UnsubscribeMsg{
Stream: stream,
},
Peer: node.ID(),
},
},
})
if err != nil {
t.Fatal(err)
}
}
}
// TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited
// number of stream servers, and performs subscriptions to detect subscriptions
// error message exchange.
func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) {
var maxPeerServers = 6
tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{
MaxPeerServers: maxPeerServers,
})
defer teardown()
if err != nil {
t.Fatal(err)
}
streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) {
return newTestServer(t), nil
})
node := tester.Nodes[0]
for i := 0; i < maxPeerServers+10; i++ {
stream := NewStream("foo", strconv.Itoa(i), true)
if i >= maxPeerServers {
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Triggers: []p2ptest.Trigger{
{
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
Priority: Top,
},
Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
{
Code: 7,
Msg: &SubscribeErrorMsg{
Error: ErrMaxPeerServers.Error(),
},
Peer: node.ID(),
},
},
})
if err != nil {
t.Fatal(err)
}
continue
}
err = tester.TestExchanges(p2ptest.Exchange{
Label: "Subscribe message",
Triggers: []p2ptest.Trigger{
{
Code: 4,
Msg: &SubscribeMsg{
Stream: stream,
Priority: Top,
},
Peer: node.ID(),
},
},
Expects: []p2ptest.Expect{
{
Code: 1,
Msg: &OfferedHashesMsg{
Stream: stream,
HandoverProof: &HandoverProof{
Handover: &Handover{},
},
Hashes: make([]byte, HashSize),
From: 1,
To: 1,
},
Peer: node.ID(),
},
},
})
if err != nil {
t.Fatal(err)
}
}
}

@ -180,6 +180,7 @@ func NewSwarm(config *api.Config, mockStore *mock.NodeStore) (self *Swarm, err e
DoSync: config.SyncEnabled,
DoRetrieve: true,
SyncUpdateDelay: config.SyncUpdateDelay,
MaxPeerServers: config.MaxStreamPeerServers,
})
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage