// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package stream import ( "bytes" "context" "errors" "strconv" "testing" "time" "github.com/ethereum/go-ethereum/crypto/sha3" p2ptest "github.com/ethereum/go-ethereum/p2p/testing" ) func TestStreamerSubscribe(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } stream := NewStream("foo", "", true) err = streamer.Subscribe(tester.Nodes[0].ID(), stream, NewRange(0, 0), Top) if err == nil || err.Error() != "stream foo not registered" { t.Fatalf("Expected error %v, got %v", "stream foo not registered", err) } } func TestStreamerRequestSubscription(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } stream := NewStream("foo", "", false) err = streamer.RequestSubscription(tester.Nodes[0].ID(), stream, &Range{}, Top) if err == nil || err.Error() != "stream foo not registered" { t.Fatalf("Expected error %v, got %v", "stream foo not registered", err) } } var ( hash0 = sha3.Sum256([]byte{0}) hash1 = sha3.Sum256([]byte{1}) hash2 = sha3.Sum256([]byte{2}) hashesTmp = append(hash0[:], hash1[:]...) hashes = append(hashesTmp, hash2[:]...) corruptHashes = append(hashes[:40]) ) type testClient struct { t string wait0 chan bool wait2 chan bool batchDone chan bool receivedHashes map[string][]byte } func newTestClient(t string) *testClient { return &testClient{ t: t, wait0: make(chan bool), wait2: make(chan bool), batchDone: make(chan bool), receivedHashes: make(map[string][]byte), } } func (self *testClient) NeedData(ctx context.Context, hash []byte) func(context.Context) error { self.receivedHashes[string(hash)] = hash if bytes.Equal(hash, hash0[:]) { return func(context.Context) error { <-self.wait0 return nil } } else if bytes.Equal(hash, hash2[:]) { return func(context.Context) error { <-self.wait2 return nil } } return nil } func (self *testClient) BatchDone(Stream, uint64, []byte, []byte) func() (*TakeoverProof, error) { close(self.batchDone) return nil } func (self *testClient) Close() {} type testServer struct { t string } func newTestServer(t string) *testServer { return &testServer{ t: t, } } func (self *testServer) SetNextBatch(from uint64, to uint64) ([]byte, uint64, uint64, *HandoverProof, error) { return make([]byte, HashSize), from + 1, to + 1, nil, nil } func (self *testServer) GetData(context.Context, []byte) ([]byte, error) { return nil, nil } func (self *testServer) Close() { } func TestStreamerDownstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { return newTestClient(t), nil }) node := tester.Nodes[0] stream := NewStream("foo", "", true) err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) if err != nil { t.Fatalf("Expected no error, got %v", err) } err = tester.TestExchanges( p2ptest.Exchange{ Label: "Subscribe message", Expects: []p2ptest.Expect{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, History: NewRange(5, 8), Priority: Top, }, Peer: node.ID(), }, }, }, // trigger OfferedHashesMsg to actually create the client p2ptest.Exchange{ Label: "OfferedHashes message", Triggers: []p2ptest.Trigger{ { Code: 1, Msg: &OfferedHashesMsg{ HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: hashes, From: 5, To: 8, Stream: stream, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 2, Msg: &WantedHashesMsg{ Stream: stream, Want: []byte{5}, From: 9, To: 0, }, Peer: node.ID(), }, }, }, ) if err != nil { t.Fatal(err) } err = streamer.Unsubscribe(node.ID(), stream) if err != nil { t.Fatalf("Expected no error, got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Unsubscribe message", Expects: []p2ptest.Expect{ { Code: 0, Msg: &UnsubscribeMsg{ Stream: stream, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchange(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } stream := NewStream("foo", "", false) streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t), nil }) node := tester.Nodes[0] err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Triggers: []p2ptest.Trigger{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, History: NewRange(5, 8), Priority: Top, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: make([]byte, HashSize), From: 6, To: 9, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } err = tester.TestExchanges(p2ptest.Exchange{ Label: "unsubscribe message", Triggers: []p2ptest.Trigger{ { Code: 0, Msg: &UnsubscribeMsg{ Stream: stream, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } } func TestStreamerUpstreamSubscribeUnsubscribeMsgExchangeLive(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } stream := NewStream("foo", "", true) streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t), nil }) node := tester.Nodes[0] err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Triggers: []p2ptest.Trigger{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, Priority: Top, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: make([]byte, HashSize), From: 1, To: 1, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } err = tester.TestExchanges(p2ptest.Exchange{ Label: "unsubscribe message", Triggers: []p2ptest.Trigger{ { Code: 0, Msg: &UnsubscribeMsg{ Stream: stream, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } } func TestStreamerUpstreamSubscribeErrorMsgExchange(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t), nil }) stream := NewStream("bar", "", true) node := tester.Nodes[0] err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Triggers: []p2ptest.Trigger{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, History: NewRange(5, 8), Priority: Top, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 7, Msg: &SubscribeErrorMsg{ Error: "stream bar not registered", }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } } func TestStreamerUpstreamSubscribeLiveAndHistory(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } stream := NewStream("foo", "", true) streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return &testServer{ t: t, }, nil }) node := tester.Nodes[0] err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Triggers: []p2ptest.Trigger{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, History: NewRange(5, 8), Priority: Top, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 1, Msg: &OfferedHashesMsg{ Stream: NewStream("foo", "", false), HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: make([]byte, HashSize), From: 6, To: 9, }, Peer: node.ID(), }, { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, HandoverProof: &HandoverProof{ Handover: &Handover{}, }, From: 1, To: 1, Hashes: make([]byte, HashSize), }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } } func TestStreamerDownstreamCorruptHashesMsgExchange(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } stream := NewStream("foo", "", true) var tc *testClient streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { tc = newTestClient(t) return tc, nil }) node := tester.Nodes[0] err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) if err != nil { t.Fatalf("Expected no error, got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Expects: []p2ptest.Expect{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, History: NewRange(5, 8), Priority: Top, }, Peer: node.ID(), }, }, }, p2ptest.Exchange{ Label: "Corrupt offered hash message", Triggers: []p2ptest.Trigger{ { Code: 1, Msg: &OfferedHashesMsg{ HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: corruptHashes, From: 5, To: 8, Stream: stream, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } expectedError := errors.New("Message handler error: (msg code 1): error invalid hashes length (len: 40)") if err := tester.TestDisconnected(&p2ptest.Disconnect{Peer: node.ID(), Error: expectedError}); err != nil { t.Fatal(err) } } func TestStreamerDownstreamOfferedHashesMsgExchange(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } stream := NewStream("foo", "", true) var tc *testClient streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) { tc = newTestClient(t) return tc, nil }) node := tester.Nodes[0] err = streamer.Subscribe(node.ID(), stream, NewRange(5, 8), Top) if err != nil { t.Fatalf("Expected no error, got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Expects: []p2ptest.Expect{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, History: NewRange(5, 8), Priority: Top, }, Peer: node.ID(), }, }, }, p2ptest.Exchange{ Label: "WantedHashes message", Triggers: []p2ptest.Trigger{ { Code: 1, Msg: &OfferedHashesMsg{ HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: hashes, From: 5, To: 8, Stream: stream, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 2, Msg: &WantedHashesMsg{ Stream: stream, Want: []byte{5}, From: 9, To: 0, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } if len(tc.receivedHashes) != 3 { t.Fatalf("Expected number of received hashes %v, got %v", 3, len(tc.receivedHashes)) } close(tc.wait0) timeout := time.NewTimer(100 * time.Millisecond) defer timeout.Stop() select { case <-tc.batchDone: t.Fatal("batch done early") case <-timeout.C: } close(tc.wait2) timeout2 := time.NewTimer(10000 * time.Millisecond) defer timeout2.Stop() select { case <-tc.batchDone: case <-timeout2.C: t.Fatal("timeout waiting batchdone call") } } func TestStreamerRequestSubscriptionQuitMsgExchange(t *testing.T) { tester, streamer, _, teardown, err := newStreamerTester(t, nil) defer teardown() if err != nil { t.Fatal(err) } streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t), nil }) node := tester.Nodes[0] stream := NewStream("foo", "", true) err = streamer.RequestSubscription(node.ID(), stream, NewRange(5, 8), Top) if err != nil { t.Fatalf("Expected no error, got %v", err) } err = tester.TestExchanges( p2ptest.Exchange{ Label: "RequestSubscription message", Expects: []p2ptest.Expect{ { Code: 8, Msg: &RequestSubscriptionMsg{ Stream: stream, History: NewRange(5, 8), Priority: Top, }, Peer: node.ID(), }, }, }, p2ptest.Exchange{ Label: "Subscribe message", Triggers: []p2ptest.Trigger{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, History: NewRange(5, 8), Priority: Top, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 1, Msg: &OfferedHashesMsg{ Stream: NewStream("foo", "", false), HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: make([]byte, HashSize), From: 6, To: 9, }, Peer: node.ID(), }, { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, HandoverProof: &HandoverProof{ Handover: &Handover{}, }, From: 1, To: 1, Hashes: make([]byte, HashSize), }, Peer: node.ID(), }, }, }, ) if err != nil { t.Fatal(err) } err = streamer.Quit(node.ID(), stream) if err != nil { t.Fatalf("Expected no error, got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Quit message", Expects: []p2ptest.Expect{ { Code: 9, Msg: &QuitMsg{ Stream: stream, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } historyStream := getHistoryStream(stream) err = streamer.Quit(node.ID(), historyStream) if err != nil { t.Fatalf("Expected no error, got %v", err) } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Quit message", Expects: []p2ptest.Expect{ { Code: 9, Msg: &QuitMsg{ Stream: historyStream, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } } // TestMaxPeerServersWithUnsubscribe creates a registry with a limited // number of stream servers, and performs a test with subscriptions and // unsubscriptions, checking if unsubscriptions will remove streams, // leaving place for new streams. func TestMaxPeerServersWithUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ MaxPeerServers: maxPeerServers, }) defer teardown() if err != nil { t.Fatal(err) } streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t), nil }) node := tester.Nodes[0] for i := 0; i < maxPeerServers+10; i++ { stream := NewStream("foo", strconv.Itoa(i), true) err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Triggers: []p2ptest.Trigger{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, Priority: Top, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: make([]byte, HashSize), From: 1, To: 1, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } err = tester.TestExchanges(p2ptest.Exchange{ Label: "unsubscribe message", Triggers: []p2ptest.Trigger{ { Code: 0, Msg: &UnsubscribeMsg{ Stream: stream, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } } } // TestMaxPeerServersWithoutUnsubscribe creates a registry with a limited // number of stream servers, and performs subscriptions to detect subscriptions // error message exchange. func TestMaxPeerServersWithoutUnsubscribe(t *testing.T) { var maxPeerServers = 6 tester, streamer, _, teardown, err := newStreamerTester(t, &RegistryOptions{ MaxPeerServers: maxPeerServers, }) defer teardown() if err != nil { t.Fatal(err) } streamer.RegisterServerFunc("foo", func(p *Peer, t string, live bool) (Server, error) { return newTestServer(t), nil }) node := tester.Nodes[0] for i := 0; i < maxPeerServers+10; i++ { stream := NewStream("foo", strconv.Itoa(i), true) if i >= maxPeerServers { err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Triggers: []p2ptest.Trigger{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, Priority: Top, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 7, Msg: &SubscribeErrorMsg{ Error: ErrMaxPeerServers.Error(), }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } continue } err = tester.TestExchanges(p2ptest.Exchange{ Label: "Subscribe message", Triggers: []p2ptest.Trigger{ { Code: 4, Msg: &SubscribeMsg{ Stream: stream, Priority: Top, }, Peer: node.ID(), }, }, Expects: []p2ptest.Expect{ { Code: 1, Msg: &OfferedHashesMsg{ Stream: stream, HandoverProof: &HandoverProof{ Handover: &Handover{}, }, Hashes: make([]byte, HashSize), From: 1, To: 1, }, Peer: node.ID(), }, }, }) if err != nil { t.Fatal(err) } } }