diff --git a/les/client.go b/les/client.go index faaf6095e8..053118df5a 100644 --- a/les/client.go +++ b/les/client.go @@ -122,7 +122,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) { leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.getTimeout) leth.relay = newLesTxRelay(peers, leth.retriever) - leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever) + leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.peers, leth.retriever) leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations, config.LightNoPrune) leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency, config.LightNoPrune) leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer) diff --git a/les/fetcher_test.go b/les/fetcher_test.go index a9e6e6835e..d3a74d25c2 100644 --- a/les/fetcher_test.go +++ b/les/fetcher_test.go @@ -66,7 +66,12 @@ func TestSequentialAnnouncementsLes2(t *testing.T) { testSequentialAnnouncements func TestSequentialAnnouncementsLes3(t *testing.T) { testSequentialAnnouncements(t, 3) } func testSequentialAnnouncements(t *testing.T, protocol int) { - s, c, teardown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, false, true) + netconfig := testnetConfig{ + blocks: 4, + protocol: protocol, + nopruning: true, + } + s, c, teardown := newClientServerEnv(t, netconfig) defer teardown() // Create connected peer pair. @@ -101,7 +106,12 @@ func TestGappedAnnouncementsLes2(t *testing.T) { testGappedAnnouncements(t, 2) } func TestGappedAnnouncementsLes3(t *testing.T) { testGappedAnnouncements(t, 3) } func testGappedAnnouncements(t *testing.T, protocol int) { - s, c, teardown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, false, true) + netconfig := testnetConfig{ + blocks: 4, + protocol: protocol, + nopruning: true, + } + s, c, teardown := newClientServerEnv(t, netconfig) defer teardown() // Create connected peer pair. @@ -183,7 +193,13 @@ func testTrustedAnnouncement(t *testing.T, protocol int) { ids = append(ids, n.String()) } } - _, c, teardown := newClientServerEnv(t, 0, protocol, nil, ids, 60, false, false, true) + netconfig := testnetConfig{ + protocol: protocol, + nopruning: true, + ulcServers: ids, + ulcFraction: 60, + } + _, c, teardown := newClientServerEnv(t, netconfig) defer teardown() defer func() { for i := 0; i < len(teardowns); i++ { @@ -233,8 +249,17 @@ func testTrustedAnnouncement(t *testing.T, protocol int) { check([]uint64{10}, 10, func() { <-newHead }) // Sync the whole chain. } -func TestInvalidAnnounces(t *testing.T) { - s, c, teardown := newClientServerEnv(t, 4, lpv3, nil, nil, 0, false, false, true) +func TestInvalidAnnouncesLES2(t *testing.T) { testInvalidAnnounces(t, lpv2) } +func TestInvalidAnnouncesLES3(t *testing.T) { testInvalidAnnounces(t, lpv3) } +func TestInvalidAnnouncesLES4(t *testing.T) { testInvalidAnnounces(t, lpv4) } + +func testInvalidAnnounces(t *testing.T, protocol int) { + netconfig := testnetConfig{ + blocks: 4, + protocol: protocol, + nopruning: true, + } + s, c, teardown := newClientServerEnv(t, netconfig) defer teardown() // Create connected peer pair. diff --git a/les/handler_test.go b/les/handler_test.go index e251f4503b..d1dbee6bdf 100644 --- a/les/handler_test.go +++ b/les/handler_test.go @@ -52,9 +52,16 @@ func TestGetBlockHeadersLes3(t *testing.T) { testGetBlockHeaders(t, 3) } func TestGetBlockHeadersLes4(t *testing.T) { testGetBlockHeaders(t, 4) } func testGetBlockHeaders(t *testing.T, protocol int) { - server, tearDown := newServerEnv(t, downloader.MaxHeaderFetch+15, protocol, nil, false, true, 0) + netconfig := testnetConfig{ + blocks: downloader.MaxHeaderFetch + 15, + protocol: protocol, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() bc := server.handler.blockchain // Create a "random" unknown hash for testing @@ -169,8 +176,8 @@ func testGetBlockHeaders(t *testing.T, protocol int) { // Send the hash request and verify the response reqID++ - sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, tt.query) - if err := expectResponse(server.peer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil { + sendRequest(rawPeer.app, GetBlockHeadersMsg, reqID, tt.query) + if err := expectResponse(rawPeer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil { t.Errorf("test %d: headers mismatch: %v", i, err) } } @@ -182,9 +189,17 @@ func TestGetBlockBodiesLes3(t *testing.T) { testGetBlockBodies(t, 3) } func TestGetBlockBodiesLes4(t *testing.T) { testGetBlockBodies(t, 4) } func testGetBlockBodies(t *testing.T, protocol int) { - server, tearDown := newServerEnv(t, downloader.MaxBlockFetch+15, protocol, nil, false, true, 0) + netconfig := testnetConfig{ + blocks: downloader.MaxHeaderFetch + 15, + protocol: protocol, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + bc := server.handler.blockchain // Create a batch of tests for various scenarios @@ -247,8 +262,8 @@ func testGetBlockBodies(t *testing.T, protocol int) { reqID++ // Send the hash request and verify the response - sendRequest(server.peer.app, GetBlockBodiesMsg, reqID, hashes) - if err := expectResponse(server.peer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil { + sendRequest(rawPeer.app, GetBlockBodiesMsg, reqID, hashes) + if err := expectResponse(rawPeer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil { t.Errorf("test %d: bodies mismatch: %v", i, err) } } @@ -261,8 +276,17 @@ func TestGetCodeLes4(t *testing.T) { testGetCode(t, 4) } func testGetCode(t *testing.T, protocol int) { // Assemble the test environment - server, tearDown := newServerEnv(t, 4, protocol, nil, false, true, 0) + netconfig := testnetConfig{ + blocks: 4, + protocol: protocol, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + bc := server.handler.blockchain var codereqs []*CodeReq @@ -279,8 +303,8 @@ func testGetCode(t *testing.T, protocol int) { } } - sendRequest(server.peer.app, GetCodeMsg, 42, codereqs) - if err := expectResponse(server.peer.app, CodeMsg, 42, testBufLimit, codes); err != nil { + sendRequest(rawPeer.app, GetCodeMsg, 42, codereqs) + if err := expectResponse(rawPeer.app, CodeMsg, 42, testBufLimit, codes); err != nil { t.Errorf("codes mismatch: %v", err) } } @@ -291,8 +315,17 @@ func TestGetStaleCodeLes3(t *testing.T) { testGetStaleCode(t, 3) } func TestGetStaleCodeLes4(t *testing.T) { testGetStaleCode(t, 4) } func testGetStaleCode(t *testing.T, protocol int) { - server, tearDown := newServerEnv(t, core.TriesInMemory+4, protocol, nil, false, true, 0) + netconfig := testnetConfig{ + blocks: core.TriesInMemory + 4, + protocol: protocol, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + bc := server.handler.blockchain check := func(number uint64, expected [][]byte) { @@ -300,8 +333,8 @@ func testGetStaleCode(t *testing.T, protocol int) { BHash: bc.GetHeaderByNumber(number).Hash(), AccKey: crypto.Keccak256(testContractAddr[:]), } - sendRequest(server.peer.app, GetCodeMsg, 42, []*CodeReq{req}) - if err := expectResponse(server.peer.app, CodeMsg, 42, testBufLimit, expected); err != nil { + sendRequest(rawPeer.app, GetCodeMsg, 42, []*CodeReq{req}) + if err := expectResponse(rawPeer.app, CodeMsg, 42, testBufLimit, expected); err != nil { t.Errorf("codes mismatch: %v", err) } } @@ -317,9 +350,17 @@ func TestGetReceiptLes4(t *testing.T) { testGetReceipt(t, 4) } func testGetReceipt(t *testing.T, protocol int) { // Assemble the test environment - server, tearDown := newServerEnv(t, 4, protocol, nil, false, true, 0) + netconfig := testnetConfig{ + blocks: 4, + protocol: protocol, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + bc := server.handler.blockchain // Collect the hashes to request, and the response to expect @@ -332,8 +373,8 @@ func testGetReceipt(t *testing.T, protocol int) { receipts = append(receipts, rawdb.ReadRawReceipts(server.db, block.Hash(), block.NumberU64())) } // Send the hash request and verify the response - sendRequest(server.peer.app, GetReceiptsMsg, 42, hashes) - if err := expectResponse(server.peer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil { + sendRequest(rawPeer.app, GetReceiptsMsg, 42, hashes) + if err := expectResponse(rawPeer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil { t.Errorf("receipts mismatch: %v", err) } } @@ -345,9 +386,17 @@ func TestGetProofsLes4(t *testing.T) { testGetProofs(t, 4) } func testGetProofs(t *testing.T, protocol int) { // Assemble the test environment - server, tearDown := newServerEnv(t, 4, protocol, nil, false, true, 0) + netconfig := testnetConfig{ + blocks: 4, + protocol: protocol, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + bc := server.handler.blockchain var proofreqs []ProofReq @@ -368,8 +417,8 @@ func testGetProofs(t *testing.T, protocol int) { } } // Send the proof request and verify the response - sendRequest(server.peer.app, GetProofsV2Msg, 42, proofreqs) - if err := expectResponse(server.peer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil { + sendRequest(rawPeer.app, GetProofsV2Msg, 42, proofreqs) + if err := expectResponse(rawPeer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil { t.Errorf("proofs mismatch: %v", err) } } @@ -380,8 +429,17 @@ func TestGetStaleProofLes3(t *testing.T) { testGetStaleProof(t, 3) } func TestGetStaleProofLes4(t *testing.T) { testGetStaleProof(t, 4) } func testGetStaleProof(t *testing.T, protocol int) { - server, tearDown := newServerEnv(t, core.TriesInMemory+4, protocol, nil, false, true, 0) + netconfig := testnetConfig{ + blocks: core.TriesInMemory + 4, + protocol: protocol, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + bc := server.handler.blockchain check := func(number uint64, wantOK bool) { @@ -393,7 +451,7 @@ func testGetStaleProof(t *testing.T, protocol int) { BHash: header.Hash(), Key: account, } - sendRequest(server.peer.app, GetProofsV2Msg, 42, []*ProofReq{req}) + sendRequest(rawPeer.app, GetProofsV2Msg, 42, []*ProofReq{req}) var expected []rlp.RawValue if wantOK { @@ -402,7 +460,7 @@ func testGetStaleProof(t *testing.T, protocol int) { t.Prove(account, 0, proofsV2) expected = proofsV2.NodeList() } - if err := expectResponse(server.peer.app, ProofsV2Msg, 42, testBufLimit, expected); err != nil { + if err := expectResponse(rawPeer.app, ProofsV2Msg, 42, testBufLimit, expected); err != nil { t.Errorf("codes mismatch: %v", err) } } @@ -417,20 +475,30 @@ func TestGetCHTProofsLes3(t *testing.T) { testGetCHTProofs(t, 3) } func TestGetCHTProofsLes4(t *testing.T) { testGetCHTProofs(t, 4) } func testGetCHTProofs(t *testing.T, protocol int) { - config := light.TestServerIndexerConfig - - waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { - for { - cs, _, _ := cIndexer.Sections() - if cs >= 1 { - break + var ( + config = light.TestServerIndexerConfig + waitIndexers = func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { + for { + cs, _, _ := cIndexer.Sections() + if cs >= 1 { + break + } + time.Sleep(10 * time.Millisecond) } - time.Sleep(10 * time.Millisecond) } - } - server, tearDown := newServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, false, true, 0) + netconfig = testnetConfig{ + blocks: int(config.ChtSize + config.ChtConfirms), + protocol: protocol, + indexFn: waitIndexers, + nopruning: true, + } + ) + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + bc := server.handler.blockchain // Assemble the proofs from the different protocols @@ -454,8 +522,8 @@ func testGetCHTProofs(t *testing.T, protocol int) { AuxReq: htAuxHeader, }} // Send the proof request and verify the response - sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, requestsV2) - if err := expectResponse(server.peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil { + sendRequest(rawPeer.app, GetHelperTrieProofsMsg, 42, requestsV2) + if err := expectResponse(rawPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil { t.Errorf("proofs mismatch: %v", err) } } @@ -466,20 +534,30 @@ func TestGetBloombitsProofsLes4(t *testing.T) { testGetBloombitsProofs(t, 4) } // Tests that bloombits proofs can be correctly retrieved. func testGetBloombitsProofs(t *testing.T, protocol int) { - config := light.TestServerIndexerConfig - - waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { - for { - bts, _, _ := btIndexer.Sections() - if bts >= 1 { - break + var ( + config = light.TestServerIndexerConfig + waitIndexers = func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { + for { + bts, _, _ := btIndexer.Sections() + if bts >= 1 { + break + } + time.Sleep(10 * time.Millisecond) } - time.Sleep(10 * time.Millisecond) } - } - server, tearDown := newServerEnv(t, int(config.BloomTrieSize+config.BloomTrieConfirms), protocol, waitIndexers, false, true, 0) + netconfig = testnetConfig{ + blocks: int(config.BloomTrieSize + config.BloomTrieConfirms), + protocol: protocol, + indexFn: waitIndexers, + nopruning: true, + } + ) + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + bc := server.handler.blockchain // Request and verify each bit of the bloom bits proofs @@ -503,20 +581,28 @@ func testGetBloombitsProofs(t *testing.T, protocol int) { trie.Prove(key, 0, &proofs.Proofs) // Send the proof request and verify the response - sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, requests) - if err := expectResponse(server.peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil { + sendRequest(rawPeer.app, GetHelperTrieProofsMsg, 42, requests) + if err := expectResponse(rawPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil { t.Errorf("bit %d: proofs mismatch: %v", bit, err) } } } -func TestTransactionStatusLes2(t *testing.T) { testTransactionStatus(t, 2) } -func TestTransactionStatusLes3(t *testing.T) { testTransactionStatus(t, 3) } -func TestTransactionStatusLes4(t *testing.T) { testTransactionStatus(t, 4) } +func TestTransactionStatusLes2(t *testing.T) { testTransactionStatus(t, lpv2) } +func TestTransactionStatusLes3(t *testing.T) { testTransactionStatus(t, lpv3) } +func TestTransactionStatusLes4(t *testing.T) { testTransactionStatus(t, lpv4) } func testTransactionStatus(t *testing.T, protocol int) { - server, tearDown := newServerEnv(t, 0, protocol, nil, false, true, 0) + netconfig := testnetConfig{ + protocol: protocol, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() + + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() + server.handler.addTxsSync = true chain := server.handler.blockchain @@ -526,11 +612,11 @@ func testTransactionStatus(t *testing.T, protocol int) { test := func(tx *types.Transaction, send bool, expStatus light.TxStatus) { reqID++ if send { - sendRequest(server.peer.app, SendTxV2Msg, reqID, types.Transactions{tx}) + sendRequest(rawPeer.app, SendTxV2Msg, reqID, types.Transactions{tx}) } else { - sendRequest(server.peer.app, GetTxStatusMsg, reqID, []common.Hash{tx.Hash()}) + sendRequest(rawPeer.app, GetTxStatusMsg, reqID, []common.Hash{tx.Hash()}) } - if err := expectResponse(server.peer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil { + if err := expectResponse(rawPeer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil { t.Errorf("transaction status mismatch") } } @@ -572,7 +658,7 @@ func testTransactionStatus(t *testing.T, protocol int) { t.Fatalf("pending count mismatch: have %d, want 1", pending) } // Discard new block announcement - msg, _ := server.peer.app.ReadMsg() + msg, _ := rawPeer.app.ReadMsg() msg.Discard() // check if their status is included now @@ -597,7 +683,7 @@ func testTransactionStatus(t *testing.T, protocol int) { t.Fatalf("pending count mismatch: have %d, want 3", pending) } // Discard new block announcement - msg, _ = server.peer.app.ReadMsg() + msg, _ = rawPeer.app.ReadMsg() msg.Discard() // check if their status is pending again @@ -605,11 +691,23 @@ func testTransactionStatus(t *testing.T, protocol int) { test(tx2, false, light.TxStatus{Status: core.TxStatusPending}) } -func TestStopResumeLes3(t *testing.T) { - server, tearDown := newServerEnv(t, 0, 3, nil, true, true, testBufLimit/10) +func TestStopResumeLES3(t *testing.T) { testStopResume(t, lpv3) } +func TestStopResumeLES4(t *testing.T) { testStopResume(t, lpv4) } + +func testStopResume(t *testing.T, protocol int) { + netconfig := testnetConfig{ + protocol: protocol, + simClock: true, + nopruning: true, + } + server, _, tearDown := newClientServerEnv(t, netconfig) defer tearDown() server.handler.server.costTracker.testing = true + server.handler.server.costTracker.testCostList = testCostList(testBufLimit / 10) + + rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol) + defer closePeer() var ( reqID uint64 @@ -619,14 +717,14 @@ func TestStopResumeLes3(t *testing.T) { header := server.handler.blockchain.CurrentHeader() req := func() { reqID++ - sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, &GetBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1}) + sendRequest(rawPeer.app, GetBlockHeadersMsg, reqID, &GetBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1}) } for i := 1; i <= 5; i++ { // send requests while we still have enough buffer and expect a response for expBuf >= testCost { req() expBuf -= testCost - if err := expectResponse(server.peer.app, BlockHeadersMsg, reqID, expBuf, []*types.Header{header}); err != nil { + if err := expectResponse(rawPeer.app, BlockHeadersMsg, reqID, expBuf, []*types.Header{header}); err != nil { t.Errorf("expected response and failed: %v", err) } } @@ -636,7 +734,7 @@ func TestStopResumeLes3(t *testing.T) { req() c-- } - if err := p2p.ExpectMsg(server.peer.app, StopMsg, nil); err != nil { + if err := p2p.ExpectMsg(rawPeer.app, StopMsg, nil); err != nil { t.Errorf("expected StopMsg and failed: %v", err) } // wait until the buffer is recharged by half of the limit @@ -645,7 +743,7 @@ func TestStopResumeLes3(t *testing.T) { // expect a ResumeMsg with the partially recharged buffer value expBuf += testBufRecharge * wait - if err := p2p.ExpectMsg(server.peer.app, ResumeMsg, expBuf); err != nil { + if err := p2p.ExpectMsg(rawPeer.app, ResumeMsg, expBuf); err != nil { t.Errorf("expected ResumeMsg and failed: %v", err) } } diff --git a/les/odr.go b/les/odr.go index 2c36f512de..d45c6a1a5d 100644 --- a/les/odr.go +++ b/les/odr.go @@ -18,6 +18,7 @@ package les import ( "context" + "sort" "time" "github.com/ethereum/go-ethereum/common/mclock" @@ -31,14 +32,16 @@ type LesOdr struct { db ethdb.Database indexerConfig *light.IndexerConfig chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer + peers *serverPeerSet retriever *retrieveManager stop chan struct{} } -func NewLesOdr(db ethdb.Database, config *light.IndexerConfig, retriever *retrieveManager) *LesOdr { +func NewLesOdr(db ethdb.Database, config *light.IndexerConfig, peers *serverPeerSet, retriever *retrieveManager) *LesOdr { return &LesOdr{ db: db, indexerConfig: config, + peers: peers, retriever: retriever, stop: make(chan struct{}), } @@ -98,7 +101,101 @@ type Msg struct { Obj interface{} } -// Retrieve tries to fetch an object from the LES network. +// peerByTxHistory is a heap.Interface implementation which can sort +// the peerset by transaction history. +type peerByTxHistory []*serverPeer + +func (h peerByTxHistory) Len() int { return len(h) } +func (h peerByTxHistory) Less(i, j int) bool { + if h[i].txHistory == txIndexUnlimited { + return false + } + if h[j].txHistory == txIndexUnlimited { + return true + } + return h[i].txHistory < h[j].txHistory +} +func (h peerByTxHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +const ( + maxTxStatusRetry = 3 // The maximum retrys will be made for tx status request. + maxTxStatusCandidates = 5 // The maximum les servers the tx status requests will be sent to. +) + +// RetrieveTxStatus retrieves the transaction status from the LES network. +// There is no guarantee in the LES protocol that the mined transaction will +// be retrieved back for sure because of different reasons(the transaction +// is unindexed, the malicous server doesn't reply it deliberately, etc). +// Therefore, unretrieved transactions(UNKNOWN) will receive a certain number +// of retries, thus giving a weak guarantee. +func (odr *LesOdr) RetrieveTxStatus(ctx context.Context, req *light.TxStatusRequest) error { + // Sort according to the transaction history supported by the peer and + // select the peers with longest history. + var ( + retries int + peers []*serverPeer + missing = len(req.Hashes) + result = make([]light.TxStatus, len(req.Hashes)) + canSend = make(map[string]bool) + ) + for _, peer := range odr.peers.allPeers() { + if peer.txHistory == txIndexDisabled { + continue + } + peers = append(peers, peer) + } + sort.Sort(sort.Reverse(peerByTxHistory(peers))) + for i := 0; i < maxTxStatusCandidates && i < len(peers); i++ { + canSend[peers[i].id] = true + } + // Send out the request and assemble the result. + for { + if retries >= maxTxStatusRetry || len(canSend) == 0 { + break + } + var ( + // Deep copy the request, so that the partial result won't be mixed. + req = &TxStatusRequest{Hashes: req.Hashes} + id = genReqID() + distreq = &distReq{ + getCost: func(dp distPeer) uint64 { return req.GetCost(dp.(*serverPeer)) }, + canSend: func(dp distPeer) bool { return canSend[dp.(*serverPeer).id] }, + request: func(dp distPeer) func() { + p := dp.(*serverPeer) + p.fcServer.QueuedRequest(id, req.GetCost(p)) + delete(canSend, p.id) + return func() { req.Request(id, p) } + }, + } + ) + if err := odr.retriever.retrieve(ctx, id, distreq, func(p distPeer, msg *Msg) error { return req.Validate(odr.db, msg) }, odr.stop); err != nil { + return err + } + // Collect the response and assemble them to the final result. + // All the response is not verifiable, so always pick the first + // one we get. + for index, status := range req.Status { + if result[index].Status != core.TxStatusUnknown { + continue + } + if status.Status == core.TxStatusUnknown { + continue + } + result[index], missing = status, missing-1 + } + // Abort the procedure if all the status are retrieved + if missing == 0 { + break + } + retries += 1 + } + req.Status = result + return nil +} + +// Retrieve tries to fetch an object from the LES network. It's a common API +// for most of the LES requests except for the TxStatusRequest which needs +// the additional retry mechanism. // If the network retrieval was successful, it stores the object in local db. func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { lreq := LesRequest(req) diff --git a/les/odr_requests.go b/les/odr_requests.go index 711c54b40f..d548fb1ee0 100644 --- a/les/odr_requests.go +++ b/les/odr_requests.go @@ -487,7 +487,7 @@ func (r *TxStatusRequest) GetCost(peer *serverPeer) uint64 { // CanSend tells if a certain peer is suitable for serving the given request func (r *TxStatusRequest) CanSend(peer *serverPeer) bool { - return peer.serveTxLookup + return peer.txHistory != txIndexDisabled } // Request sends an ODR request to the LES network (implementation of LesOdrRequest) @@ -496,13 +496,12 @@ func (r *TxStatusRequest) Request(reqID uint64, peer *serverPeer) error { return peer.requestTxStatus(reqID, r.Hashes) } -// Valid processes an ODR request reply message from the LES network +// Validate processes an ODR request reply message from the LES network // returns true and stores results in memory if the message was a valid reply // to the request (implementation of LesOdrRequest) func (r *TxStatusRequest) Validate(db ethdb.Database, msg *Msg) error { log.Debug("Validating transaction status", "count", len(r.Hashes)) - // Ensure we have a correct message with a single block body if msg.MsgType != MsgTxStatus { return errInvalidMessageType } diff --git a/les/odr_test.go b/les/odr_test.go index 2a70a296c8..a43382e05e 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -19,7 +19,10 @@ package les import ( "bytes" "context" + "crypto/rand" + "fmt" "math/big" + "reflect" "testing" "time" @@ -190,7 +193,13 @@ func odrTxStatus(ctx context.Context, db ethdb.Database, config *params.ChainCon // testOdr tests odr requests whose validation guaranteed by block headers. func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) { // Assemble the test environment - server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true, true) + netconfig := testnetConfig{ + blocks: 4, + protocol: protocol, + connect: true, + nopruning: true, + } + server, client, tearDown := newClientServerEnv(t, netconfig) defer tearDown() // Ensure the client has synced all necessary data. @@ -246,3 +255,184 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od test(5) } } + +func TestGetTxStatusFromUnindexedPeersLES4(t *testing.T) { testGetTxStatusFromUnindexedPeers(t, lpv4) } + +func testGetTxStatusFromUnindexedPeers(t *testing.T, protocol int) { + var ( + blocks = 8 + netconfig = testnetConfig{ + blocks: blocks, + protocol: protocol, + nopruning: true, + } + ) + server, client, tearDown := newClientServerEnv(t, netconfig) + defer tearDown() + + // Iterate the chain, create the tx indexes locally + var ( + testHash common.Hash + testStatus light.TxStatus + + txs = make(map[common.Hash]*types.Transaction) // Transaction objects set + blockNumbers = make(map[common.Hash]uint64) // Transaction hash to block number mappings + blockHashes = make(map[common.Hash]common.Hash) // Transaction hash to block hash mappings + intraIndex = make(map[common.Hash]uint64) // Transaction intra-index in block + ) + for number := uint64(1); number < server.backend.Blockchain().CurrentBlock().NumberU64(); number++ { + block := server.backend.Blockchain().GetBlockByNumber(number) + if block == nil { + t.Fatalf("Failed to retrieve block %d", number) + } + for index, tx := range block.Transactions() { + txs[tx.Hash()] = tx + blockNumbers[tx.Hash()] = number + blockHashes[tx.Hash()] = block.Hash() + intraIndex[tx.Hash()] = uint64(index) + + if testHash == (common.Hash{}) { + testHash = tx.Hash() + testStatus = light.TxStatus{ + Status: core.TxStatusIncluded, + Lookup: &rawdb.LegacyTxLookupEntry{ + BlockHash: block.Hash(), + BlockIndex: block.NumberU64(), + Index: uint64(index), + }, + } + } + } + } + // serveMsg processes incoming GetTxStatusMsg and sends the response back. + serveMsg := func(peer *testPeer, txLookup uint64) error { + msg, err := peer.app.ReadMsg() + if err != nil { + return err + } + if msg.Code != GetTxStatusMsg { + return fmt.Errorf("message code mismatch: got %d, expected %d", msg.Code, GetTxStatusMsg) + } + var r GetTxStatusPacket + if err := msg.Decode(&r); err != nil { + return err + } + stats := make([]light.TxStatus, len(r.Hashes)) + for i, hash := range r.Hashes { + number, exist := blockNumbers[hash] + if !exist { + continue // Filter out unknown transactions + } + min := uint64(blocks) - txLookup + if txLookup != txIndexUnlimited && (txLookup == txIndexDisabled || number < min) { + continue // Filter out unindexed transactions + } + stats[i].Status = core.TxStatusIncluded + stats[i].Lookup = &rawdb.LegacyTxLookupEntry{ + BlockHash: blockHashes[hash], + BlockIndex: number, + Index: intraIndex[hash], + } + } + data, _ := rlp.EncodeToBytes(stats) + reply := &reply{peer.app, TxStatusMsg, r.ReqID, data} + reply.send(testBufLimit) + return nil + } + + var testspecs = []struct { + peers int + txLookups []uint64 + txs []common.Hash + results []light.TxStatus + }{ + // Retrieve mined transaction from the empty peerset + { + peers: 0, + txLookups: []uint64{}, + txs: []common.Hash{testHash}, + results: []light.TxStatus{{}}, + }, + // Retrieve unknown transaction from the full peers + { + peers: 3, + txLookups: []uint64{txIndexUnlimited, txIndexUnlimited, txIndexUnlimited}, + txs: []common.Hash{randomHash()}, + results: []light.TxStatus{{}}, + }, + // Retrieve mined transaction from the full peers + { + peers: 3, + txLookups: []uint64{txIndexUnlimited, txIndexUnlimited, txIndexUnlimited}, + txs: []common.Hash{testHash}, + results: []light.TxStatus{testStatus}, + }, + // Retrieve mixed transactions from the full peers + { + peers: 3, + txLookups: []uint64{txIndexUnlimited, txIndexUnlimited, txIndexUnlimited}, + txs: []common.Hash{randomHash(), testHash}, + results: []light.TxStatus{{}, testStatus}, + }, + // Retrieve mixed transactions from unindexed peer(but the target is still available) + { + peers: 3, + txLookups: []uint64{uint64(blocks) - testStatus.Lookup.BlockIndex, uint64(blocks) - testStatus.Lookup.BlockIndex - 1, uint64(blocks) - testStatus.Lookup.BlockIndex - 2}, + txs: []common.Hash{randomHash(), testHash}, + results: []light.TxStatus{{}, testStatus}, + }, + // Retrieve mixed transactions from unindexed peer(but the target is not available) + { + peers: 3, + txLookups: []uint64{uint64(blocks) - testStatus.Lookup.BlockIndex - 1, uint64(blocks) - testStatus.Lookup.BlockIndex - 1, uint64(blocks) - testStatus.Lookup.BlockIndex - 2}, + txs: []common.Hash{randomHash(), testHash}, + results: []light.TxStatus{{}, {}}, + }, + } + for _, testspec := range testspecs { + // Create a bunch of server peers with different tx history + var ( + serverPeers []*testPeer + closeFns []func() + ) + for i := 0; i < testspec.peers; i++ { + peer, closePeer, _ := client.newRawPeer(t, fmt.Sprintf("server-%d", i), protocol, testspec.txLookups[i]) + serverPeers = append(serverPeers, peer) + closeFns = append(closeFns, closePeer) + + // Create a one-time routine for serving message + go func(i int, peer *testPeer) { + serveMsg(peer, testspec.txLookups[i]) + }(i, peer) + } + + // Send out the GetTxStatus requests, compare the result with + // expected value. + r := &light.TxStatusRequest{Hashes: testspec.txs} + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err := client.handler.backend.odr.RetrieveTxStatus(ctx, r) + if err != nil { + t.Errorf("Failed to retrieve tx status %v", err) + } else { + if !reflect.DeepEqual(testspec.results, r.Status) { + t.Errorf("Result mismatch, diff") + } + } + + // Close all connected peers and start the next round + for _, closeFn := range closeFns { + closeFn() + } + } +} + +// randomHash generates a random blob of data and returns it as a hash. +func randomHash() common.Hash { + var hash common.Hash + if n, err := rand.Read(hash[:]); n != common.HashLength || err != nil { + panic(err) + } + return hash +} diff --git a/les/peer.go b/les/peer.go index 479b4034bc..0361167ee1 100644 --- a/les/peer.go +++ b/les/peer.go @@ -341,7 +341,7 @@ type serverPeer struct { onlyAnnounce bool // The flag whether the server sends announcement only. chainSince, chainRecent uint64 // The range of chain server peer can serve. stateSince, stateRecent uint64 // The range of state server peer can serve. - serveTxLookup bool // The server peer can serve tx lookups. + txHistory uint64 // The length of available tx history, 0 means all, 1 means disabled // Advertised checkpoint fields checkpointNumber uint64 // The block height which the checkpoint is registered. @@ -634,13 +634,13 @@ func (p *serverPeer) Handshake(genesis common.Hash, forkid forkid.ID, forkFilter if err := recv.get("recentTxLookup", &recentTx); err != nil { return err } - // Note: in the current version we only consider the tx index service useful - // if it is unlimited. This can be made configurable in the future. - p.serveTxLookup = recentTx == txIndexUnlimited + p.txHistory = uint64(recentTx) } else { - p.serveTxLookup = true + // The weak assumption is held here that legacy les server(les2,3) + // has unlimited transaction history. The les serving in these legacy + // versions is disabled if the transaction is unindexed. + p.txHistory = txIndexUnlimited } - if p.onlyAnnounce && !p.trusted { return errResp(ErrUselessPeer, "peer cannot serve requests") } diff --git a/les/pruner_test.go b/les/pruner_test.go index 62b4e9a950..c6f198c088 100644 --- a/les/pruner_test.go +++ b/les/pruner_test.go @@ -28,19 +28,26 @@ import ( ) func TestLightPruner(t *testing.T) { - config := light.TestClientIndexerConfig - - waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { - for { - cs, _, _ := cIndexer.Sections() - bts, _, _ := btIndexer.Sections() - if cs >= 3 && bts >= 3 { - break + var ( + waitIndexers = func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { + for { + cs, _, _ := cIndexer.Sections() + bts, _, _ := btIndexer.Sections() + if cs >= 3 && bts >= 3 { + break + } + time.Sleep(10 * time.Millisecond) } - time.Sleep(10 * time.Millisecond) } - } - server, client, tearDown := newClientServerEnv(t, int(3*config.ChtSize+config.ChtConfirms), 2, waitIndexers, nil, 0, false, true, false) + config = light.TestClientIndexerConfig + netconfig = testnetConfig{ + blocks: int(3*config.ChtSize + config.ChtConfirms), + protocol: 3, + indexFn: waitIndexers, + connect: true, + } + ) + server, client, tearDown := newClientServerEnv(t, netconfig) defer tearDown() // checkDB iterates the chain with given prefix, resolves the block number diff --git a/les/request_test.go b/les/request_test.go index b054fd88df..c65405e375 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -83,7 +83,14 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, num uint64) light.OdrReq func testAccess(t *testing.T, protocol int, fn accessTestFn) { // Assemble the test environment - server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true, true) + netconfig := testnetConfig{ + blocks: 4, + protocol: protocol, + indexFn: nil, + connect: true, + nopruning: true, + } + server, client, tearDown := newClientServerEnv(t, netconfig) defer tearDown() // Ensure the client has synced all necessary data. diff --git a/les/sync_test.go b/les/sync_test.go index 64e7283663..d3bb90df02 100644 --- a/les/sync_test.go +++ b/les/sync_test.go @@ -31,15 +31,15 @@ import ( ) // Test light syncing which will download all headers from genesis. -func TestLightSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 0) } +func TestLightSyncingLes3(t *testing.T) { testCheckpointSyncing(t, lpv3, 0) } // Test legacy checkpoint syncing which will download tail headers // based on a hardcoded checkpoint. -func TestLegacyCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 1) } +func TestLegacyCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, lpv3, 1) } // Test checkpoint syncing which will download tail headers based // on a verified checkpoint. -func TestCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 2) } +func TestCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, lpv3, 2) } func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { config := light.TestServerIndexerConfig @@ -55,7 +55,13 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { } } // Generate 128+1 blocks (totally 1 CHT section) - server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false, true) + netconfig := testnetConfig{ + blocks: int(config.ChtSize + config.ChtConfirms), + protocol: protocol, + indexFn: waitIndexers, + nopruning: true, + } + server, client, tearDown := newClientServerEnv(t, netconfig) defer tearDown() expected := config.ChtSize + config.ChtConfirms @@ -78,7 +84,7 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { // Register the assembled checkpoint into oracle. header := server.backend.Blockchain().CurrentHeader() - data := append([]byte{0x19, 0x00}, append(registrarAddr.Bytes(), append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, cp.Hash().Bytes()...)...)...) + data := append([]byte{0x19, 0x00}, append(oracleAddr.Bytes(), append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, cp.Hash().Bytes()...)...)...) sig, _ := crypto.Sign(crypto.Keccak256(data), signerKey) sig[64] += 27 // Transform V from 0/1 to 27/28 according to the yellow paper auth, _ := bind.NewKeyedTransactorWithChainID(signerKey, big.NewInt(1337)) @@ -128,10 +134,10 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) { } } -func TestMissOracleBackend(t *testing.T) { testMissOracleBackend(t, true) } -func TestMissOracleBackendNoCheckpoint(t *testing.T) { testMissOracleBackend(t, false) } +func TestMissOracleBackendLES3(t *testing.T) { testMissOracleBackend(t, true, lpv3) } +func TestMissOracleBackendNoCheckpointLES3(t *testing.T) { testMissOracleBackend(t, false, lpv3) } -func testMissOracleBackend(t *testing.T, hasCheckpoint bool) { +func testMissOracleBackend(t *testing.T, hasCheckpoint bool, protocol int) { config := light.TestServerIndexerConfig waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { @@ -145,7 +151,13 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) { } } // Generate 128+1 blocks (totally 1 CHT section) - server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false, true) + netconfig := testnetConfig{ + blocks: int(config.ChtSize + config.ChtConfirms), + protocol: protocol, + indexFn: waitIndexers, + nopruning: true, + } + server, client, tearDown := newClientServerEnv(t, netconfig) defer tearDown() expected := config.ChtSize + config.ChtConfirms @@ -160,7 +172,7 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) { // Register the assembled checkpoint into oracle. header := server.backend.Blockchain().CurrentHeader() - data := append([]byte{0x19, 0x00}, append(registrarAddr.Bytes(), append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, cp.Hash().Bytes()...)...)...) + data := append([]byte{0x19, 0x00}, append(oracleAddr.Bytes(), append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, cp.Hash().Bytes()...)...)...) sig, _ := crypto.Sign(crypto.Keccak256(data), signerKey) sig[64] += 27 // Transform V from 0/1 to 27/28 according to the yellow paper auth, _ := bind.NewKeyedTransactorWithChainID(signerKey, big.NewInt(1337)) @@ -220,7 +232,9 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) { } } -func TestSyncFromConfiguredCheckpoint(t *testing.T) { +func TestSyncFromConfiguredCheckpointLES3(t *testing.T) { testSyncFromConfiguredCheckpoint(t, lpv3) } + +func testSyncFromConfiguredCheckpoint(t *testing.T, protocol int) { config := light.TestServerIndexerConfig waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { @@ -234,7 +248,13 @@ func TestSyncFromConfiguredCheckpoint(t *testing.T) { } } // Generate 256+1 blocks (totally 2 CHT sections) - server, client, tearDown := newClientServerEnv(t, int(2*config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false, true) + netconfig := testnetConfig{ + blocks: int(2*config.ChtSize + config.ChtConfirms), + protocol: protocol, + indexFn: waitIndexers, + nopruning: true, + } + server, client, tearDown := newClientServerEnv(t, netconfig) defer tearDown() // Configure the local checkpoint(the first section) @@ -296,7 +316,9 @@ func TestSyncFromConfiguredCheckpoint(t *testing.T) { } } -func TestSyncAll(t *testing.T) { +func TestSyncAll(t *testing.T) { testSyncAll(t, lpv3) } + +func testSyncAll(t *testing.T, protocol int) { config := light.TestServerIndexerConfig waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { @@ -310,7 +332,13 @@ func TestSyncAll(t *testing.T) { } } // Generate 256+1 blocks (totally 2 CHT sections) - server, client, tearDown := newClientServerEnv(t, int(2*config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false, true) + netconfig := testnetConfig{ + blocks: int(2*config.ChtSize + config.ChtConfirms), + protocol: protocol, + indexFn: waitIndexers, + nopruning: true, + } + server, client, tearDown := newClientServerEnv(t, netconfig) defer tearDown() client.handler.backend.config.SyncFromCheckpoint = true diff --git a/les/test_helper.go b/les/test_helper.go index 6f93c8a48f..39313b1e3b 100644 --- a/les/test_helper.go +++ b/les/test_helper.go @@ -14,8 +14,9 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// This file contains some shares testing functionality, common to multiple -// different files and modules being tested. +// This file contains some shares testing functionality, common to multiple +// different files and modules being tested. Client based network and Server +// based network can be created easily with available APIs. package les @@ -68,10 +69,10 @@ var ( testEventEmitterCode = common.Hex2Bytes("60606040523415600e57600080fd5b7f57050ab73f6b9ebdd9f76b8d4997793f48cf956e965ee070551b9ca0bb71584e60405160405180910390a160358060476000396000f3006060604052600080fd00a165627a7a723058203f727efcad8b5811f8cb1fc2620ce5e8c63570d697aef968172de296ea3994140029") - // Checkpoint registrar relative - registrarAddr common.Address - signerKey, _ = crypto.GenerateKey() - signerAddr = crypto.PubkeyToAddress(signerKey.PublicKey) + // Checkpoint oracle relative fields + oracleAddr common.Address + signerKey, _ = crypto.GenerateKey() + signerAddr = crypto.PubkeyToAddress(signerKey.PublicKey) ) var ( @@ -112,14 +113,23 @@ func prepare(n int, backend *backends.SimulatedBackend) { for i := 0; i < n; i++ { switch i { case 0: + // Builtin-block + // number: 1 + // txs: 2 + // deploy checkpoint contract auth, _ := bind.NewKeyedTransactorWithChainID(bankKey, big.NewInt(1337)) - registrarAddr, _, _, _ = contract.DeployCheckpointOracle(auth, backend, []common.Address{signerAddr}, sectionSize, processConfirms, big.NewInt(1)) + oracleAddr, _, _, _ = contract.DeployCheckpointOracle(auth, backend, []common.Address{signerAddr}, sectionSize, processConfirms, big.NewInt(1)) + // bankUser transfers some ether to user1 nonce, _ := backend.PendingNonceAt(ctx, bankAddr) tx, _ := types.SignTx(types.NewTransaction(nonce, userAddr1, big.NewInt(10000), params.TxGas, nil, nil), signer, bankKey) backend.SendTransaction(ctx, tx) case 1: + // Builtin-block + // number: 2 + // txs: 4 + bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr) userNonce1, _ := backend.PendingNonceAt(ctx, userAddr1) @@ -140,6 +150,10 @@ func prepare(n int, backend *backends.SimulatedBackend) { tx4, _ := types.SignTx(types.NewContractCreation(userNonce1+2, big.NewInt(0), 200000, big.NewInt(0), testEventEmitterCode), signer, userKey1) backend.SendTransaction(ctx, tx4) case 2: + // Builtin-block + // number: 3 + // txs: 2 + // bankUser transfer some ether to signer bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr) tx1, _ := types.SignTx(types.NewTransaction(bankNonce, signerAddr, big.NewInt(1000000000), params.TxGas, nil, nil), signer, bankKey) @@ -150,6 +164,10 @@ func prepare(n int, backend *backends.SimulatedBackend) { tx2, _ := types.SignTx(types.NewTransaction(bankNonce+1, testContractAddr, big.NewInt(0), 100000, nil, data), signer, bankKey) backend.SendTransaction(ctx, tx2) case 3: + // Builtin-block + // number: 4 + // txs: 1 + // invoke test contract bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr) data := common.Hex2Bytes("C16431B900000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002") @@ -310,45 +328,61 @@ type testPeer struct { app *p2p.MsgPipeRW // Application layer reader/writer to simulate the local side } -// newTestPeer creates a new peer registered at the given protocol manager. -func newTestPeer(t *testing.T, name string, version int, handler *serverHandler, shake bool, testCost uint64) (*testPeer, <-chan error) { - // Create a message pipe to communicate through - app, net := p2p.MsgPipe() - - // Generate a random id and create the peer - var id enode.ID - rand.Read(id[:]) - peer := newClientPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) - - // Start the peer on a new thread - errCh := make(chan error, 1) - go func() { - select { - case <-handler.closeCh: - errCh <- p2p.DiscQuitting - case errCh <- handler.handle(peer): - } - }() - tp := &testPeer{ - app: app, - net: net, - cpeer: peer, +// handshakeWithServer executes the handshake with the remote server peer. +func (p *testPeer) handshakeWithServer(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, forkID forkid.ID) { + // It only works for the simulated client peer + if p.cpeer == nil { + t.Fatal("handshake for client peer only") } - // Execute any implicitly requested handshakes and return - if shake { - // Customize the cost table if required. - if testCost != 0 { - handler.server.costTracker.testCostList = testCostList(testCost) - } - var ( - genesis = handler.blockchain.Genesis() - head = handler.blockchain.CurrentHeader() - td = handler.blockchain.GetTd(head.Hash(), head.Number.Uint64()) - ) - forkID := forkid.NewID(handler.blockchain.Config(), genesis.Hash(), head.Number.Uint64()) - tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), forkID, testCostList(testCost)) + var sendList keyValueList + sendList = sendList.add("protocolVersion", uint64(p.cpeer.version)) + sendList = sendList.add("networkId", uint64(NetworkId)) + sendList = sendList.add("headTd", td) + sendList = sendList.add("headHash", head) + sendList = sendList.add("headNum", headNum) + sendList = sendList.add("genesisHash", genesis) + if p.cpeer.version >= lpv4 { + sendList = sendList.add("forkID", &forkID) + } + if err := p2p.ExpectMsg(p.app, StatusMsg, nil); err != nil { + t.Fatalf("status recv: %v", err) + } + if err := p2p.Send(p.app, StatusMsg, sendList); err != nil { + t.Fatalf("status send: %v", err) + } +} + +// handshakeWithClient executes the handshake with the remote client peer. +func (p *testPeer) handshakeWithClient(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, forkID forkid.ID, costList RequestCostList, recentTxLookup uint64) { + // It only works for the simulated client peer + if p.speer == nil { + t.Fatal("handshake for server peer only") + } + var sendList keyValueList + sendList = sendList.add("protocolVersion", uint64(p.speer.version)) + sendList = sendList.add("networkId", uint64(NetworkId)) + sendList = sendList.add("headTd", td) + sendList = sendList.add("headHash", head) + sendList = sendList.add("headNum", headNum) + sendList = sendList.add("genesisHash", genesis) + sendList = sendList.add("serveHeaders", nil) + sendList = sendList.add("serveChainSince", uint64(0)) + sendList = sendList.add("serveStateSince", uint64(0)) + sendList = sendList.add("serveRecentState", uint64(core.TriesInMemory-4)) + sendList = sendList.add("txRelay", nil) + sendList = sendList.add("flowControl/BL", testBufLimit) + sendList = sendList.add("flowControl/MRR", testBufRecharge) + sendList = sendList.add("flowControl/MRC", costList) + if p.speer.version >= lpv4 { + sendList = sendList.add("forkID", &forkID) + sendList = sendList.add("recentTxLookup", recentTxLookup) + } + if err := p2p.ExpectMsg(p.app, StatusMsg, nil); err != nil { + t.Fatalf("status recv: %v", err) + } + if err := p2p.Send(p.app, StatusMsg, sendList); err != nil { + t.Fatalf("status send: %v", err) } - return tp, errCh } // close terminates the local side of the peer, notifying the remote protocol @@ -402,48 +436,9 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl return &testPeer{cpeer: peer1, net: net, app: app}, &testPeer{speer: peer2, net: app, app: net}, nil } -// handshake simulates a trivial handshake that expects the same state from the -// remote side as we are simulating locally. -func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, forkID forkid.ID, costList RequestCostList) { - var expList keyValueList - expList = expList.add("protocolVersion", uint64(p.cpeer.version)) - expList = expList.add("networkId", uint64(NetworkId)) - expList = expList.add("headTd", td) - expList = expList.add("headHash", head) - expList = expList.add("headNum", headNum) - expList = expList.add("genesisHash", genesis) - if p.cpeer.version >= lpv4 { - expList = expList.add("forkID", &forkID) - } - sendList := make(keyValueList, len(expList)) - copy(sendList, expList) - expList = expList.add("serveHeaders", nil) - expList = expList.add("serveChainSince", uint64(0)) - expList = expList.add("serveStateSince", uint64(0)) - expList = expList.add("serveRecentState", uint64(core.TriesInMemory-4)) - expList = expList.add("txRelay", nil) - if p.cpeer.version >= lpv4 { - expList = expList.add("recentTxLookup", uint64(0)) - } - expList = expList.add("flowControl/BL", testBufLimit) - expList = expList.add("flowControl/MRR", testBufRecharge) - expList = expList.add("flowControl/MRC", costList) - - if err := p2p.ExpectMsg(p.app, StatusMsg, expList); err != nil { - t.Fatalf("status recv: %v", err) - } - if err := p2p.Send(p.app, StatusMsg, sendList); err != nil { - t.Fatalf("status send: %v", err) - } - p.cpeer.fcParams = flowcontrol.ServerParams{ - BufLimit: testBufLimit, - MinRecharge: testBufRecharge, - } -} - type indexerCallback func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer) -// testClient represents a client for testing with necessary auxiliary fields. +// testClient represents a client object for testing with necessary auxiliary fields. type testClient struct { clock mclock.Clock db ethdb.Database @@ -455,7 +450,58 @@ type testClient struct { bloomTrieIndexer *core.ChainIndexer } -// testServer represents a server for testing with necessary auxiliary fields. +// newRawPeer creates a new server peer connects to the server and do the handshake. +func (client *testClient) newRawPeer(t *testing.T, name string, version int, recentTxLookup uint64) (*testPeer, func(), <-chan error) { + // Create a message pipe to communicate through + app, net := p2p.MsgPipe() + + // Generate a random id and create the peer + var id enode.ID + rand.Read(id[:]) + peer := newServerPeer(version, NetworkId, false, p2p.NewPeer(id, name, nil), net) + + // Start the peer on a new thread + errCh := make(chan error, 1) + go func() { + select { + case <-client.handler.closeCh: + errCh <- p2p.DiscQuitting + case errCh <- client.handler.handle(peer): + } + }() + tp := &testPeer{ + app: app, + net: net, + speer: peer, + } + var ( + genesis = client.handler.backend.blockchain.Genesis() + head = client.handler.backend.blockchain.CurrentHeader() + td = client.handler.backend.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + ) + forkID := forkid.NewID(client.handler.backend.blockchain.Config(), genesis.Hash(), head.Number.Uint64()) + tp.handshakeWithClient(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), forkID, testCostList(0), recentTxLookup) // disable flow control by default + + // Ensure the connection is established or exits when any error occurs + for { + select { + case <-errCh: + return nil, nil, nil + default: + } + if atomic.LoadUint32(&peer.serving) == 1 { + break + } + time.Sleep(50 * time.Millisecond) + } + closePeer := func() { + tp.speer.close() + tp.close() + } + return tp, closePeer, errCh +} + +// testServer represents a server object for testing with necessary auxiliary fields. type testServer struct { clock mclock.Clock backend *backends.SimulatedBackend @@ -468,89 +514,109 @@ type testServer struct { bloomTrieIndexer *core.ChainIndexer } -func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, simClock bool, newPeer bool, testCost uint64) (*testServer, func()) { - db := rawdb.NewMemoryDatabase() - indexers := testIndexers(db, nil, light.TestServerIndexerConfig, true) +// newRawPeer creates a new client peer connects to the server and do the handshake. +func (server *testServer) newRawPeer(t *testing.T, name string, version int) (*testPeer, func(), <-chan error) { + // Create a message pipe to communicate through + app, net := p2p.MsgPipe() - var clock mclock.Clock = &mclock.System{} - if simClock { - clock = &mclock.Simulated{} - } - handler, b := newTestServerHandler(blocks, indexers, db, clock) + // Generate a random id and create the peer + var id enode.ID + rand.Read(id[:]) + peer := newClientPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) - var peer *testPeer - if newPeer { - peer, _ = newTestPeer(t, "peer", protocol, handler, true, testCost) - } - - cIndexer, bIndexer, btIndexer := indexers[0], indexers[1], indexers[2] - cIndexer.Start(handler.blockchain) - bIndexer.Start(handler.blockchain) - - // Wait until indexers generate enough index data. - if callback != nil { - callback(cIndexer, bIndexer, btIndexer) - } - server := &testServer{ - clock: clock, - backend: b, - db: db, - peer: peer, - handler: handler, - chtIndexer: cIndexer, - bloomIndexer: bIndexer, - bloomTrieIndexer: btIndexer, - } - teardown := func() { - if newPeer { - peer.close() - peer.cpeer.close() - b.Close() + // Start the peer on a new thread + errCh := make(chan error, 1) + go func() { + select { + case <-server.handler.closeCh: + errCh <- p2p.DiscQuitting + case errCh <- server.handler.handle(peer): } - cIndexer.Close() - bIndexer.Close() + }() + tp := &testPeer{ + app: app, + net: net, + cpeer: peer, } - return server, teardown + var ( + genesis = server.handler.blockchain.Genesis() + head = server.handler.blockchain.CurrentHeader() + td = server.handler.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + ) + forkID := forkid.NewID(server.handler.blockchain.Config(), genesis.Hash(), head.Number.Uint64()) + tp.handshakeWithServer(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), forkID) + + // Ensure the connection is established or exits when any error occurs + for { + select { + case <-errCh: + return nil, nil, nil + default: + } + if atomic.LoadUint32(&peer.serving) == 1 { + break + } + time.Sleep(50 * time.Millisecond) + } + closePeer := func() { + tp.cpeer.close() + tp.close() + } + return tp, closePeer, errCh } -func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, ulcServers []string, ulcFraction int, simClock bool, connect bool, disablePruning bool) (*testServer, *testClient, func()) { - sdb, cdb := rawdb.NewMemoryDatabase(), rawdb.NewMemoryDatabase() - speers := newServerPeerSet() +// testnetConfig wraps all the configurations for testing network. +type testnetConfig struct { + blocks int + protocol int + indexFn indexerCallback + ulcServers []string + ulcFraction int + simClock bool + connect bool + nopruning bool +} +func newClientServerEnv(t *testing.T, config testnetConfig) (*testServer, *testClient, func()) { + var ( + sdb = rawdb.NewMemoryDatabase() + cdb = rawdb.NewMemoryDatabase() + speers = newServerPeerSet() + ) var clock mclock.Clock = &mclock.System{} - if simClock { + if config.simClock { clock = &mclock.Simulated{} } dist := newRequestDistributor(speers, clock) rm := newRetrieveManager(speers, dist, func() time.Duration { return time.Millisecond * 500 }) - odr := NewLesOdr(cdb, light.TestClientIndexerConfig, rm) + odr := NewLesOdr(cdb, light.TestClientIndexerConfig, speers, rm) sindexers := testIndexers(sdb, nil, light.TestServerIndexerConfig, true) - cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig, disablePruning) + cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig, config.nopruning) scIndexer, sbIndexer, sbtIndexer := sindexers[0], sindexers[1], sindexers[2] ccIndexer, cbIndexer, cbtIndexer := cIndexers[0], cIndexers[1], cIndexers[2] odr.SetIndexers(ccIndexer, cbIndexer, cbtIndexer) - server, b := newTestServerHandler(blocks, sindexers, sdb, clock) - client := newTestClientHandler(b, odr, cIndexers, cdb, speers, ulcServers, ulcFraction) + server, b := newTestServerHandler(config.blocks, sindexers, sdb, clock) + client := newTestClientHandler(b, odr, cIndexers, cdb, speers, config.ulcServers, config.ulcFraction) scIndexer.Start(server.blockchain) sbIndexer.Start(server.blockchain) ccIndexer.Start(client.backend.blockchain) cbIndexer.Start(client.backend.blockchain) - if callback != nil { - callback(scIndexer, sbIndexer, sbtIndexer) + if config.indexFn != nil { + config.indexFn(scIndexer, sbIndexer, sbtIndexer) } var ( err error speer, cpeer *testPeer ) - if connect { + if config.connect { done := make(chan struct{}) client.syncEnd = func(_ *types.Header) { close(done) } - cpeer, speer, err = newTestPeerPair("peer", protocol, server, client) + cpeer, speer, err = newTestPeerPair("peer", config.protocol, server, client) if err != nil { t.Fatalf("Failed to connect testing peers %v", err) } @@ -580,7 +646,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer bloomTrieIndexer: cbtIndexer, } teardown := func() { - if connect { + if config.connect { speer.close() cpeer.close() cpeer.cpeer.close() diff --git a/les/ulc_test.go b/les/ulc_test.go index 657b13db2c..d7308fa593 100644 --- a/les/ulc_test.go +++ b/les/ulc_test.go @@ -126,7 +126,12 @@ func connect(server *serverHandler, serverId enode.ID, client *clientHandler, pr // newTestServerPeer creates server peer. func newTestServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *enode.Node, func()) { - s, teardown := newServerEnv(t, blocks, protocol, nil, false, false, 0) + netconfig := testnetConfig{ + blocks: blocks, + protocol: protocol, + nopruning: true, + } + s, _, teardown := newClientServerEnv(t, netconfig) key, err := crypto.GenerateKey() if err != nil { t.Fatal("generate key err:", err) @@ -138,6 +143,12 @@ func newTestServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *en // newTestLightPeer creates node with light sync mode func newTestLightPeer(t *testing.T, protocol int, ulcServers []string, ulcFraction int) (*testClient, func()) { - _, c, teardown := newClientServerEnv(t, 0, protocol, nil, ulcServers, ulcFraction, false, false, true) + netconfig := testnetConfig{ + protocol: protocol, + ulcServers: ulcServers, + ulcFraction: ulcFraction, + nopruning: true, + } + _, c, teardown := newClientServerEnv(t, netconfig) return c, teardown } diff --git a/light/odr.go b/light/odr.go index bb243f9152..9521dd53e8 100644 --- a/light/odr.go +++ b/light/odr.go @@ -42,6 +42,7 @@ type OdrBackend interface { BloomTrieIndexer() *core.ChainIndexer BloomIndexer() *core.ChainIndexer Retrieve(ctx context.Context, req OdrRequest) error + RetrieveTxStatus(ctx context.Context, req *TxStatusRequest) error IndexerConfig() *IndexerConfig } diff --git a/light/odr_util.go b/light/odr_util.go index ec2d1e6491..bbbcdbce21 100644 --- a/light/odr_util.go +++ b/light/odr_util.go @@ -269,10 +269,15 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bit uint, sections []uint return result, nil } -// GetTransaction retrieves a canonical transaction by hash and also returns its position in the chain +// GetTransaction retrieves a canonical transaction by hash and also returns +// its position in the chain. There is no guarantee in the LES protocol that +// the mined transaction will be retrieved back for sure because of different +// reasons(the transaction is unindexed, the malicous server doesn't reply it +// deliberately, etc). Therefore, unretrieved transactions will receive a certain +// number of retrys, thus giving a weak guarantee. func GetTransaction(ctx context.Context, odr OdrBackend, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { r := &TxStatusRequest{Hashes: []common.Hash{txHash}} - if err := odr.Retrieve(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded { + if err := odr.RetrieveTxStatus(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded { return nil, common.Hash{}, 0, 0, err } pos := r.Status[0].Lookup