les, light: improve txstatus retrieval (#22349)

Transaction unindexing will be enabled by default as of 1.10, which causes tx status retrieval will be broken without this PR. 

This PR introduces a retry mechanism in TxStatus retrieval.
This commit is contained in:
gary rong 2021-02-25 21:24:04 +08:00 committed by GitHub
parent 378e961d85
commit 7a3c890009
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 779 additions and 245 deletions

@ -122,7 +122,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*LightEthereum, error) {
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool.getTimeout)
leth.relay = newLesTxRelay(peers, leth.retriever)
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.peers, leth.retriever)
leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequency, params.HelperTrieConfirmations, config.LightNoPrune)
leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency, config.LightNoPrune)
leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer)

@ -66,7 +66,12 @@ func TestSequentialAnnouncementsLes2(t *testing.T) { testSequentialAnnouncements
func TestSequentialAnnouncementsLes3(t *testing.T) { testSequentialAnnouncements(t, 3) }
func testSequentialAnnouncements(t *testing.T, protocol int) {
s, c, teardown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, false, true)
netconfig := testnetConfig{
blocks: 4,
protocol: protocol,
nopruning: true,
}
s, c, teardown := newClientServerEnv(t, netconfig)
defer teardown()
// Create connected peer pair.
@ -101,7 +106,12 @@ func TestGappedAnnouncementsLes2(t *testing.T) { testGappedAnnouncements(t, 2) }
func TestGappedAnnouncementsLes3(t *testing.T) { testGappedAnnouncements(t, 3) }
func testGappedAnnouncements(t *testing.T, protocol int) {
s, c, teardown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, false, true)
netconfig := testnetConfig{
blocks: 4,
protocol: protocol,
nopruning: true,
}
s, c, teardown := newClientServerEnv(t, netconfig)
defer teardown()
// Create connected peer pair.
@ -183,7 +193,13 @@ func testTrustedAnnouncement(t *testing.T, protocol int) {
ids = append(ids, n.String())
}
}
_, c, teardown := newClientServerEnv(t, 0, protocol, nil, ids, 60, false, false, true)
netconfig := testnetConfig{
protocol: protocol,
nopruning: true,
ulcServers: ids,
ulcFraction: 60,
}
_, c, teardown := newClientServerEnv(t, netconfig)
defer teardown()
defer func() {
for i := 0; i < len(teardowns); i++ {
@ -233,8 +249,17 @@ func testTrustedAnnouncement(t *testing.T, protocol int) {
check([]uint64{10}, 10, func() { <-newHead }) // Sync the whole chain.
}
func TestInvalidAnnounces(t *testing.T) {
s, c, teardown := newClientServerEnv(t, 4, lpv3, nil, nil, 0, false, false, true)
func TestInvalidAnnouncesLES2(t *testing.T) { testInvalidAnnounces(t, lpv2) }
func TestInvalidAnnouncesLES3(t *testing.T) { testInvalidAnnounces(t, lpv3) }
func TestInvalidAnnouncesLES4(t *testing.T) { testInvalidAnnounces(t, lpv4) }
func testInvalidAnnounces(t *testing.T, protocol int) {
netconfig := testnetConfig{
blocks: 4,
protocol: protocol,
nopruning: true,
}
s, c, teardown := newClientServerEnv(t, netconfig)
defer teardown()
// Create connected peer pair.

@ -52,9 +52,16 @@ func TestGetBlockHeadersLes3(t *testing.T) { testGetBlockHeaders(t, 3) }
func TestGetBlockHeadersLes4(t *testing.T) { testGetBlockHeaders(t, 4) }
func testGetBlockHeaders(t *testing.T, protocol int) {
server, tearDown := newServerEnv(t, downloader.MaxHeaderFetch+15, protocol, nil, false, true, 0)
netconfig := testnetConfig{
blocks: downloader.MaxHeaderFetch + 15,
protocol: protocol,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
// Create a "random" unknown hash for testing
@ -169,8 +176,8 @@ func testGetBlockHeaders(t *testing.T, protocol int) {
// Send the hash request and verify the response
reqID++
sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, tt.query)
if err := expectResponse(server.peer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil {
sendRequest(rawPeer.app, GetBlockHeadersMsg, reqID, tt.query)
if err := expectResponse(rawPeer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil {
t.Errorf("test %d: headers mismatch: %v", i, err)
}
}
@ -182,9 +189,17 @@ func TestGetBlockBodiesLes3(t *testing.T) { testGetBlockBodies(t, 3) }
func TestGetBlockBodiesLes4(t *testing.T) { testGetBlockBodies(t, 4) }
func testGetBlockBodies(t *testing.T, protocol int) {
server, tearDown := newServerEnv(t, downloader.MaxBlockFetch+15, protocol, nil, false, true, 0)
netconfig := testnetConfig{
blocks: downloader.MaxHeaderFetch + 15,
protocol: protocol,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
// Create a batch of tests for various scenarios
@ -247,8 +262,8 @@ func testGetBlockBodies(t *testing.T, protocol int) {
reqID++
// Send the hash request and verify the response
sendRequest(server.peer.app, GetBlockBodiesMsg, reqID, hashes)
if err := expectResponse(server.peer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil {
sendRequest(rawPeer.app, GetBlockBodiesMsg, reqID, hashes)
if err := expectResponse(rawPeer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil {
t.Errorf("test %d: bodies mismatch: %v", i, err)
}
}
@ -261,8 +276,17 @@ func TestGetCodeLes4(t *testing.T) { testGetCode(t, 4) }
func testGetCode(t *testing.T, protocol int) {
// Assemble the test environment
server, tearDown := newServerEnv(t, 4, protocol, nil, false, true, 0)
netconfig := testnetConfig{
blocks: 4,
protocol: protocol,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
var codereqs []*CodeReq
@ -279,8 +303,8 @@ func testGetCode(t *testing.T, protocol int) {
}
}
sendRequest(server.peer.app, GetCodeMsg, 42, codereqs)
if err := expectResponse(server.peer.app, CodeMsg, 42, testBufLimit, codes); err != nil {
sendRequest(rawPeer.app, GetCodeMsg, 42, codereqs)
if err := expectResponse(rawPeer.app, CodeMsg, 42, testBufLimit, codes); err != nil {
t.Errorf("codes mismatch: %v", err)
}
}
@ -291,8 +315,17 @@ func TestGetStaleCodeLes3(t *testing.T) { testGetStaleCode(t, 3) }
func TestGetStaleCodeLes4(t *testing.T) { testGetStaleCode(t, 4) }
func testGetStaleCode(t *testing.T, protocol int) {
server, tearDown := newServerEnv(t, core.TriesInMemory+4, protocol, nil, false, true, 0)
netconfig := testnetConfig{
blocks: core.TriesInMemory + 4,
protocol: protocol,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
check := func(number uint64, expected [][]byte) {
@ -300,8 +333,8 @@ func testGetStaleCode(t *testing.T, protocol int) {
BHash: bc.GetHeaderByNumber(number).Hash(),
AccKey: crypto.Keccak256(testContractAddr[:]),
}
sendRequest(server.peer.app, GetCodeMsg, 42, []*CodeReq{req})
if err := expectResponse(server.peer.app, CodeMsg, 42, testBufLimit, expected); err != nil {
sendRequest(rawPeer.app, GetCodeMsg, 42, []*CodeReq{req})
if err := expectResponse(rawPeer.app, CodeMsg, 42, testBufLimit, expected); err != nil {
t.Errorf("codes mismatch: %v", err)
}
}
@ -317,9 +350,17 @@ func TestGetReceiptLes4(t *testing.T) { testGetReceipt(t, 4) }
func testGetReceipt(t *testing.T, protocol int) {
// Assemble the test environment
server, tearDown := newServerEnv(t, 4, protocol, nil, false, true, 0)
netconfig := testnetConfig{
blocks: 4,
protocol: protocol,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
// Collect the hashes to request, and the response to expect
@ -332,8 +373,8 @@ func testGetReceipt(t *testing.T, protocol int) {
receipts = append(receipts, rawdb.ReadRawReceipts(server.db, block.Hash(), block.NumberU64()))
}
// Send the hash request and verify the response
sendRequest(server.peer.app, GetReceiptsMsg, 42, hashes)
if err := expectResponse(server.peer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil {
sendRequest(rawPeer.app, GetReceiptsMsg, 42, hashes)
if err := expectResponse(rawPeer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil {
t.Errorf("receipts mismatch: %v", err)
}
}
@ -345,9 +386,17 @@ func TestGetProofsLes4(t *testing.T) { testGetProofs(t, 4) }
func testGetProofs(t *testing.T, protocol int) {
// Assemble the test environment
server, tearDown := newServerEnv(t, 4, protocol, nil, false, true, 0)
netconfig := testnetConfig{
blocks: 4,
protocol: protocol,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
var proofreqs []ProofReq
@ -368,8 +417,8 @@ func testGetProofs(t *testing.T, protocol int) {
}
}
// Send the proof request and verify the response
sendRequest(server.peer.app, GetProofsV2Msg, 42, proofreqs)
if err := expectResponse(server.peer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil {
sendRequest(rawPeer.app, GetProofsV2Msg, 42, proofreqs)
if err := expectResponse(rawPeer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil {
t.Errorf("proofs mismatch: %v", err)
}
}
@ -380,8 +429,17 @@ func TestGetStaleProofLes3(t *testing.T) { testGetStaleProof(t, 3) }
func TestGetStaleProofLes4(t *testing.T) { testGetStaleProof(t, 4) }
func testGetStaleProof(t *testing.T, protocol int) {
server, tearDown := newServerEnv(t, core.TriesInMemory+4, protocol, nil, false, true, 0)
netconfig := testnetConfig{
blocks: core.TriesInMemory + 4,
protocol: protocol,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
check := func(number uint64, wantOK bool) {
@ -393,7 +451,7 @@ func testGetStaleProof(t *testing.T, protocol int) {
BHash: header.Hash(),
Key: account,
}
sendRequest(server.peer.app, GetProofsV2Msg, 42, []*ProofReq{req})
sendRequest(rawPeer.app, GetProofsV2Msg, 42, []*ProofReq{req})
var expected []rlp.RawValue
if wantOK {
@ -402,7 +460,7 @@ func testGetStaleProof(t *testing.T, protocol int) {
t.Prove(account, 0, proofsV2)
expected = proofsV2.NodeList()
}
if err := expectResponse(server.peer.app, ProofsV2Msg, 42, testBufLimit, expected); err != nil {
if err := expectResponse(rawPeer.app, ProofsV2Msg, 42, testBufLimit, expected); err != nil {
t.Errorf("codes mismatch: %v", err)
}
}
@ -417,9 +475,9 @@ func TestGetCHTProofsLes3(t *testing.T) { testGetCHTProofs(t, 3) }
func TestGetCHTProofsLes4(t *testing.T) { testGetCHTProofs(t, 4) }
func testGetCHTProofs(t *testing.T, protocol int) {
config := light.TestServerIndexerConfig
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
var (
config = light.TestServerIndexerConfig
waitIndexers = func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
for {
cs, _, _ := cIndexer.Sections()
if cs >= 1 {
@ -428,9 +486,19 @@ func testGetCHTProofs(t *testing.T, protocol int) {
time.Sleep(10 * time.Millisecond)
}
}
server, tearDown := newServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, false, true, 0)
netconfig = testnetConfig{
blocks: int(config.ChtSize + config.ChtConfirms),
protocol: protocol,
indexFn: waitIndexers,
nopruning: true,
}
)
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
// Assemble the proofs from the different protocols
@ -454,8 +522,8 @@ func testGetCHTProofs(t *testing.T, protocol int) {
AuxReq: htAuxHeader,
}}
// Send the proof request and verify the response
sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, requestsV2)
if err := expectResponse(server.peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil {
sendRequest(rawPeer.app, GetHelperTrieProofsMsg, 42, requestsV2)
if err := expectResponse(rawPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil {
t.Errorf("proofs mismatch: %v", err)
}
}
@ -466,9 +534,9 @@ func TestGetBloombitsProofsLes4(t *testing.T) { testGetBloombitsProofs(t, 4) }
// Tests that bloombits proofs can be correctly retrieved.
func testGetBloombitsProofs(t *testing.T, protocol int) {
config := light.TestServerIndexerConfig
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
var (
config = light.TestServerIndexerConfig
waitIndexers = func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
for {
bts, _, _ := btIndexer.Sections()
if bts >= 1 {
@ -477,9 +545,19 @@ func testGetBloombitsProofs(t *testing.T, protocol int) {
time.Sleep(10 * time.Millisecond)
}
}
server, tearDown := newServerEnv(t, int(config.BloomTrieSize+config.BloomTrieConfirms), protocol, waitIndexers, false, true, 0)
netconfig = testnetConfig{
blocks: int(config.BloomTrieSize + config.BloomTrieConfirms),
protocol: protocol,
indexFn: waitIndexers,
nopruning: true,
}
)
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
bc := server.handler.blockchain
// Request and verify each bit of the bloom bits proofs
@ -503,20 +581,28 @@ func testGetBloombitsProofs(t *testing.T, protocol int) {
trie.Prove(key, 0, &proofs.Proofs)
// Send the proof request and verify the response
sendRequest(server.peer.app, GetHelperTrieProofsMsg, 42, requests)
if err := expectResponse(server.peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil {
sendRequest(rawPeer.app, GetHelperTrieProofsMsg, 42, requests)
if err := expectResponse(rawPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil {
t.Errorf("bit %d: proofs mismatch: %v", bit, err)
}
}
}
func TestTransactionStatusLes2(t *testing.T) { testTransactionStatus(t, 2) }
func TestTransactionStatusLes3(t *testing.T) { testTransactionStatus(t, 3) }
func TestTransactionStatusLes4(t *testing.T) { testTransactionStatus(t, 4) }
func TestTransactionStatusLes2(t *testing.T) { testTransactionStatus(t, lpv2) }
func TestTransactionStatusLes3(t *testing.T) { testTransactionStatus(t, lpv3) }
func TestTransactionStatusLes4(t *testing.T) { testTransactionStatus(t, lpv4) }
func testTransactionStatus(t *testing.T, protocol int) {
server, tearDown := newServerEnv(t, 0, protocol, nil, false, true, 0)
netconfig := testnetConfig{
protocol: protocol,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
server.handler.addTxsSync = true
chain := server.handler.blockchain
@ -526,11 +612,11 @@ func testTransactionStatus(t *testing.T, protocol int) {
test := func(tx *types.Transaction, send bool, expStatus light.TxStatus) {
reqID++
if send {
sendRequest(server.peer.app, SendTxV2Msg, reqID, types.Transactions{tx})
sendRequest(rawPeer.app, SendTxV2Msg, reqID, types.Transactions{tx})
} else {
sendRequest(server.peer.app, GetTxStatusMsg, reqID, []common.Hash{tx.Hash()})
sendRequest(rawPeer.app, GetTxStatusMsg, reqID, []common.Hash{tx.Hash()})
}
if err := expectResponse(server.peer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil {
if err := expectResponse(rawPeer.app, TxStatusMsg, reqID, testBufLimit, []light.TxStatus{expStatus}); err != nil {
t.Errorf("transaction status mismatch")
}
}
@ -572,7 +658,7 @@ func testTransactionStatus(t *testing.T, protocol int) {
t.Fatalf("pending count mismatch: have %d, want 1", pending)
}
// Discard new block announcement
msg, _ := server.peer.app.ReadMsg()
msg, _ := rawPeer.app.ReadMsg()
msg.Discard()
// check if their status is included now
@ -597,7 +683,7 @@ func testTransactionStatus(t *testing.T, protocol int) {
t.Fatalf("pending count mismatch: have %d, want 3", pending)
}
// Discard new block announcement
msg, _ = server.peer.app.ReadMsg()
msg, _ = rawPeer.app.ReadMsg()
msg.Discard()
// check if their status is pending again
@ -605,11 +691,23 @@ func testTransactionStatus(t *testing.T, protocol int) {
test(tx2, false, light.TxStatus{Status: core.TxStatusPending})
}
func TestStopResumeLes3(t *testing.T) {
server, tearDown := newServerEnv(t, 0, 3, nil, true, true, testBufLimit/10)
func TestStopResumeLES3(t *testing.T) { testStopResume(t, lpv3) }
func TestStopResumeLES4(t *testing.T) { testStopResume(t, lpv4) }
func testStopResume(t *testing.T, protocol int) {
netconfig := testnetConfig{
protocol: protocol,
simClock: true,
nopruning: true,
}
server, _, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
server.handler.server.costTracker.testing = true
server.handler.server.costTracker.testCostList = testCostList(testBufLimit / 10)
rawPeer, closePeer, _ := server.newRawPeer(t, "peer", protocol)
defer closePeer()
var (
reqID uint64
@ -619,14 +717,14 @@ func TestStopResumeLes3(t *testing.T) {
header := server.handler.blockchain.CurrentHeader()
req := func() {
reqID++
sendRequest(server.peer.app, GetBlockHeadersMsg, reqID, &GetBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1})
sendRequest(rawPeer.app, GetBlockHeadersMsg, reqID, &GetBlockHeadersData{Origin: hashOrNumber{Hash: header.Hash()}, Amount: 1})
}
for i := 1; i <= 5; i++ {
// send requests while we still have enough buffer and expect a response
for expBuf >= testCost {
req()
expBuf -= testCost
if err := expectResponse(server.peer.app, BlockHeadersMsg, reqID, expBuf, []*types.Header{header}); err != nil {
if err := expectResponse(rawPeer.app, BlockHeadersMsg, reqID, expBuf, []*types.Header{header}); err != nil {
t.Errorf("expected response and failed: %v", err)
}
}
@ -636,7 +734,7 @@ func TestStopResumeLes3(t *testing.T) {
req()
c--
}
if err := p2p.ExpectMsg(server.peer.app, StopMsg, nil); err != nil {
if err := p2p.ExpectMsg(rawPeer.app, StopMsg, nil); err != nil {
t.Errorf("expected StopMsg and failed: %v", err)
}
// wait until the buffer is recharged by half of the limit
@ -645,7 +743,7 @@ func TestStopResumeLes3(t *testing.T) {
// expect a ResumeMsg with the partially recharged buffer value
expBuf += testBufRecharge * wait
if err := p2p.ExpectMsg(server.peer.app, ResumeMsg, expBuf); err != nil {
if err := p2p.ExpectMsg(rawPeer.app, ResumeMsg, expBuf); err != nil {
t.Errorf("expected ResumeMsg and failed: %v", err)
}
}

@ -18,6 +18,7 @@ package les
import (
"context"
"sort"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
@ -31,14 +32,16 @@ type LesOdr struct {
db ethdb.Database
indexerConfig *light.IndexerConfig
chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer
peers *serverPeerSet
retriever *retrieveManager
stop chan struct{}
}
func NewLesOdr(db ethdb.Database, config *light.IndexerConfig, retriever *retrieveManager) *LesOdr {
func NewLesOdr(db ethdb.Database, config *light.IndexerConfig, peers *serverPeerSet, retriever *retrieveManager) *LesOdr {
return &LesOdr{
db: db,
indexerConfig: config,
peers: peers,
retriever: retriever,
stop: make(chan struct{}),
}
@ -98,7 +101,101 @@ type Msg struct {
Obj interface{}
}
// Retrieve tries to fetch an object from the LES network.
// peerByTxHistory is a heap.Interface implementation which can sort
// the peerset by transaction history.
type peerByTxHistory []*serverPeer
func (h peerByTxHistory) Len() int { return len(h) }
func (h peerByTxHistory) Less(i, j int) bool {
if h[i].txHistory == txIndexUnlimited {
return false
}
if h[j].txHistory == txIndexUnlimited {
return true
}
return h[i].txHistory < h[j].txHistory
}
func (h peerByTxHistory) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
const (
maxTxStatusRetry = 3 // The maximum retrys will be made for tx status request.
maxTxStatusCandidates = 5 // The maximum les servers the tx status requests will be sent to.
)
// RetrieveTxStatus retrieves the transaction status from the LES network.
// There is no guarantee in the LES protocol that the mined transaction will
// be retrieved back for sure because of different reasons(the transaction
// is unindexed, the malicous server doesn't reply it deliberately, etc).
// Therefore, unretrieved transactions(UNKNOWN) will receive a certain number
// of retries, thus giving a weak guarantee.
func (odr *LesOdr) RetrieveTxStatus(ctx context.Context, req *light.TxStatusRequest) error {
// Sort according to the transaction history supported by the peer and
// select the peers with longest history.
var (
retries int
peers []*serverPeer
missing = len(req.Hashes)
result = make([]light.TxStatus, len(req.Hashes))
canSend = make(map[string]bool)
)
for _, peer := range odr.peers.allPeers() {
if peer.txHistory == txIndexDisabled {
continue
}
peers = append(peers, peer)
}
sort.Sort(sort.Reverse(peerByTxHistory(peers)))
for i := 0; i < maxTxStatusCandidates && i < len(peers); i++ {
canSend[peers[i].id] = true
}
// Send out the request and assemble the result.
for {
if retries >= maxTxStatusRetry || len(canSend) == 0 {
break
}
var (
// Deep copy the request, so that the partial result won't be mixed.
req = &TxStatusRequest{Hashes: req.Hashes}
id = genReqID()
distreq = &distReq{
getCost: func(dp distPeer) uint64 { return req.GetCost(dp.(*serverPeer)) },
canSend: func(dp distPeer) bool { return canSend[dp.(*serverPeer).id] },
request: func(dp distPeer) func() {
p := dp.(*serverPeer)
p.fcServer.QueuedRequest(id, req.GetCost(p))
delete(canSend, p.id)
return func() { req.Request(id, p) }
},
}
)
if err := odr.retriever.retrieve(ctx, id, distreq, func(p distPeer, msg *Msg) error { return req.Validate(odr.db, msg) }, odr.stop); err != nil {
return err
}
// Collect the response and assemble them to the final result.
// All the response is not verifiable, so always pick the first
// one we get.
for index, status := range req.Status {
if result[index].Status != core.TxStatusUnknown {
continue
}
if status.Status == core.TxStatusUnknown {
continue
}
result[index], missing = status, missing-1
}
// Abort the procedure if all the status are retrieved
if missing == 0 {
break
}
retries += 1
}
req.Status = result
return nil
}
// Retrieve tries to fetch an object from the LES network. It's a common API
// for most of the LES requests except for the TxStatusRequest which needs
// the additional retry mechanism.
// If the network retrieval was successful, it stores the object in local db.
func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) {
lreq := LesRequest(req)

@ -487,7 +487,7 @@ func (r *TxStatusRequest) GetCost(peer *serverPeer) uint64 {
// CanSend tells if a certain peer is suitable for serving the given request
func (r *TxStatusRequest) CanSend(peer *serverPeer) bool {
return peer.serveTxLookup
return peer.txHistory != txIndexDisabled
}
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
@ -496,13 +496,12 @@ func (r *TxStatusRequest) Request(reqID uint64, peer *serverPeer) error {
return peer.requestTxStatus(reqID, r.Hashes)
}
// Valid processes an ODR request reply message from the LES network
// Validate processes an ODR request reply message from the LES network
// returns true and stores results in memory if the message was a valid reply
// to the request (implementation of LesOdrRequest)
func (r *TxStatusRequest) Validate(db ethdb.Database, msg *Msg) error {
log.Debug("Validating transaction status", "count", len(r.Hashes))
// Ensure we have a correct message with a single block body
if msg.MsgType != MsgTxStatus {
return errInvalidMessageType
}

@ -19,7 +19,10 @@ package les
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"math/big"
"reflect"
"testing"
"time"
@ -190,7 +193,13 @@ func odrTxStatus(ctx context.Context, db ethdb.Database, config *params.ChainCon
// testOdr tests odr requests whose validation guaranteed by block headers.
func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn odrTestFn) {
// Assemble the test environment
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true, true)
netconfig := testnetConfig{
blocks: 4,
protocol: protocol,
connect: true,
nopruning: true,
}
server, client, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
// Ensure the client has synced all necessary data.
@ -246,3 +255,184 @@ func testOdr(t *testing.T, protocol int, expFail uint64, checkCached bool, fn od
test(5)
}
}
func TestGetTxStatusFromUnindexedPeersLES4(t *testing.T) { testGetTxStatusFromUnindexedPeers(t, lpv4) }
func testGetTxStatusFromUnindexedPeers(t *testing.T, protocol int) {
var (
blocks = 8
netconfig = testnetConfig{
blocks: blocks,
protocol: protocol,
nopruning: true,
}
)
server, client, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
// Iterate the chain, create the tx indexes locally
var (
testHash common.Hash
testStatus light.TxStatus
txs = make(map[common.Hash]*types.Transaction) // Transaction objects set
blockNumbers = make(map[common.Hash]uint64) // Transaction hash to block number mappings
blockHashes = make(map[common.Hash]common.Hash) // Transaction hash to block hash mappings
intraIndex = make(map[common.Hash]uint64) // Transaction intra-index in block
)
for number := uint64(1); number < server.backend.Blockchain().CurrentBlock().NumberU64(); number++ {
block := server.backend.Blockchain().GetBlockByNumber(number)
if block == nil {
t.Fatalf("Failed to retrieve block %d", number)
}
for index, tx := range block.Transactions() {
txs[tx.Hash()] = tx
blockNumbers[tx.Hash()] = number
blockHashes[tx.Hash()] = block.Hash()
intraIndex[tx.Hash()] = uint64(index)
if testHash == (common.Hash{}) {
testHash = tx.Hash()
testStatus = light.TxStatus{
Status: core.TxStatusIncluded,
Lookup: &rawdb.LegacyTxLookupEntry{
BlockHash: block.Hash(),
BlockIndex: block.NumberU64(),
Index: uint64(index),
},
}
}
}
}
// serveMsg processes incoming GetTxStatusMsg and sends the response back.
serveMsg := func(peer *testPeer, txLookup uint64) error {
msg, err := peer.app.ReadMsg()
if err != nil {
return err
}
if msg.Code != GetTxStatusMsg {
return fmt.Errorf("message code mismatch: got %d, expected %d", msg.Code, GetTxStatusMsg)
}
var r GetTxStatusPacket
if err := msg.Decode(&r); err != nil {
return err
}
stats := make([]light.TxStatus, len(r.Hashes))
for i, hash := range r.Hashes {
number, exist := blockNumbers[hash]
if !exist {
continue // Filter out unknown transactions
}
min := uint64(blocks) - txLookup
if txLookup != txIndexUnlimited && (txLookup == txIndexDisabled || number < min) {
continue // Filter out unindexed transactions
}
stats[i].Status = core.TxStatusIncluded
stats[i].Lookup = &rawdb.LegacyTxLookupEntry{
BlockHash: blockHashes[hash],
BlockIndex: number,
Index: intraIndex[hash],
}
}
data, _ := rlp.EncodeToBytes(stats)
reply := &reply{peer.app, TxStatusMsg, r.ReqID, data}
reply.send(testBufLimit)
return nil
}
var testspecs = []struct {
peers int
txLookups []uint64
txs []common.Hash
results []light.TxStatus
}{
// Retrieve mined transaction from the empty peerset
{
peers: 0,
txLookups: []uint64{},
txs: []common.Hash{testHash},
results: []light.TxStatus{{}},
},
// Retrieve unknown transaction from the full peers
{
peers: 3,
txLookups: []uint64{txIndexUnlimited, txIndexUnlimited, txIndexUnlimited},
txs: []common.Hash{randomHash()},
results: []light.TxStatus{{}},
},
// Retrieve mined transaction from the full peers
{
peers: 3,
txLookups: []uint64{txIndexUnlimited, txIndexUnlimited, txIndexUnlimited},
txs: []common.Hash{testHash},
results: []light.TxStatus{testStatus},
},
// Retrieve mixed transactions from the full peers
{
peers: 3,
txLookups: []uint64{txIndexUnlimited, txIndexUnlimited, txIndexUnlimited},
txs: []common.Hash{randomHash(), testHash},
results: []light.TxStatus{{}, testStatus},
},
// Retrieve mixed transactions from unindexed peer(but the target is still available)
{
peers: 3,
txLookups: []uint64{uint64(blocks) - testStatus.Lookup.BlockIndex, uint64(blocks) - testStatus.Lookup.BlockIndex - 1, uint64(blocks) - testStatus.Lookup.BlockIndex - 2},
txs: []common.Hash{randomHash(), testHash},
results: []light.TxStatus{{}, testStatus},
},
// Retrieve mixed transactions from unindexed peer(but the target is not available)
{
peers: 3,
txLookups: []uint64{uint64(blocks) - testStatus.Lookup.BlockIndex - 1, uint64(blocks) - testStatus.Lookup.BlockIndex - 1, uint64(blocks) - testStatus.Lookup.BlockIndex - 2},
txs: []common.Hash{randomHash(), testHash},
results: []light.TxStatus{{}, {}},
},
}
for _, testspec := range testspecs {
// Create a bunch of server peers with different tx history
var (
serverPeers []*testPeer
closeFns []func()
)
for i := 0; i < testspec.peers; i++ {
peer, closePeer, _ := client.newRawPeer(t, fmt.Sprintf("server-%d", i), protocol, testspec.txLookups[i])
serverPeers = append(serverPeers, peer)
closeFns = append(closeFns, closePeer)
// Create a one-time routine for serving message
go func(i int, peer *testPeer) {
serveMsg(peer, testspec.txLookups[i])
}(i, peer)
}
// Send out the GetTxStatus requests, compare the result with
// expected value.
r := &light.TxStatusRequest{Hashes: testspec.txs}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
err := client.handler.backend.odr.RetrieveTxStatus(ctx, r)
if err != nil {
t.Errorf("Failed to retrieve tx status %v", err)
} else {
if !reflect.DeepEqual(testspec.results, r.Status) {
t.Errorf("Result mismatch, diff")
}
}
// Close all connected peers and start the next round
for _, closeFn := range closeFns {
closeFn()
}
}
}
// randomHash generates a random blob of data and returns it as a hash.
func randomHash() common.Hash {
var hash common.Hash
if n, err := rand.Read(hash[:]); n != common.HashLength || err != nil {
panic(err)
}
return hash
}

@ -341,7 +341,7 @@ type serverPeer struct {
onlyAnnounce bool // The flag whether the server sends announcement only.
chainSince, chainRecent uint64 // The range of chain server peer can serve.
stateSince, stateRecent uint64 // The range of state server peer can serve.
serveTxLookup bool // The server peer can serve tx lookups.
txHistory uint64 // The length of available tx history, 0 means all, 1 means disabled
// Advertised checkpoint fields
checkpointNumber uint64 // The block height which the checkpoint is registered.
@ -634,13 +634,13 @@ func (p *serverPeer) Handshake(genesis common.Hash, forkid forkid.ID, forkFilter
if err := recv.get("recentTxLookup", &recentTx); err != nil {
return err
}
// Note: in the current version we only consider the tx index service useful
// if it is unlimited. This can be made configurable in the future.
p.serveTxLookup = recentTx == txIndexUnlimited
p.txHistory = uint64(recentTx)
} else {
p.serveTxLookup = true
// The weak assumption is held here that legacy les server(les2,3)
// has unlimited transaction history. The les serving in these legacy
// versions is disabled if the transaction is unindexed.
p.txHistory = txIndexUnlimited
}
if p.onlyAnnounce && !p.trusted {
return errResp(ErrUselessPeer, "peer cannot serve requests")
}

@ -28,9 +28,8 @@ import (
)
func TestLightPruner(t *testing.T) {
config := light.TestClientIndexerConfig
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
var (
waitIndexers = func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
for {
cs, _, _ := cIndexer.Sections()
bts, _, _ := btIndexer.Sections()
@ -40,7 +39,15 @@ func TestLightPruner(t *testing.T) {
time.Sleep(10 * time.Millisecond)
}
}
server, client, tearDown := newClientServerEnv(t, int(3*config.ChtSize+config.ChtConfirms), 2, waitIndexers, nil, 0, false, true, false)
config = light.TestClientIndexerConfig
netconfig = testnetConfig{
blocks: int(3*config.ChtSize + config.ChtConfirms),
protocol: 3,
indexFn: waitIndexers,
connect: true,
}
)
server, client, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
// checkDB iterates the chain with given prefix, resolves the block number

@ -83,7 +83,14 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, num uint64) light.OdrReq
func testAccess(t *testing.T, protocol int, fn accessTestFn) {
// Assemble the test environment
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, nil, 0, false, true, true)
netconfig := testnetConfig{
blocks: 4,
protocol: protocol,
indexFn: nil,
connect: true,
nopruning: true,
}
server, client, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
// Ensure the client has synced all necessary data.

@ -31,15 +31,15 @@ import (
)
// Test light syncing which will download all headers from genesis.
func TestLightSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 0) }
func TestLightSyncingLes3(t *testing.T) { testCheckpointSyncing(t, lpv3, 0) }
// Test legacy checkpoint syncing which will download tail headers
// based on a hardcoded checkpoint.
func TestLegacyCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 1) }
func TestLegacyCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, lpv3, 1) }
// Test checkpoint syncing which will download tail headers based
// on a verified checkpoint.
func TestCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, 3, 2) }
func TestCheckpointSyncingLes3(t *testing.T) { testCheckpointSyncing(t, lpv3, 2) }
func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
config := light.TestServerIndexerConfig
@ -55,7 +55,13 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
}
}
// Generate 128+1 blocks (totally 1 CHT section)
server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), protocol, waitIndexers, nil, 0, false, false, true)
netconfig := testnetConfig{
blocks: int(config.ChtSize + config.ChtConfirms),
protocol: protocol,
indexFn: waitIndexers,
nopruning: true,
}
server, client, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
expected := config.ChtSize + config.ChtConfirms
@ -78,7 +84,7 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
// Register the assembled checkpoint into oracle.
header := server.backend.Blockchain().CurrentHeader()
data := append([]byte{0x19, 0x00}, append(registrarAddr.Bytes(), append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, cp.Hash().Bytes()...)...)...)
data := append([]byte{0x19, 0x00}, append(oracleAddr.Bytes(), append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, cp.Hash().Bytes()...)...)...)
sig, _ := crypto.Sign(crypto.Keccak256(data), signerKey)
sig[64] += 27 // Transform V from 0/1 to 27/28 according to the yellow paper
auth, _ := bind.NewKeyedTransactorWithChainID(signerKey, big.NewInt(1337))
@ -128,10 +134,10 @@ func testCheckpointSyncing(t *testing.T, protocol int, syncMode int) {
}
}
func TestMissOracleBackend(t *testing.T) { testMissOracleBackend(t, true) }
func TestMissOracleBackendNoCheckpoint(t *testing.T) { testMissOracleBackend(t, false) }
func TestMissOracleBackendLES3(t *testing.T) { testMissOracleBackend(t, true, lpv3) }
func TestMissOracleBackendNoCheckpointLES3(t *testing.T) { testMissOracleBackend(t, false, lpv3) }
func testMissOracleBackend(t *testing.T, hasCheckpoint bool) {
func testMissOracleBackend(t *testing.T, hasCheckpoint bool, protocol int) {
config := light.TestServerIndexerConfig
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
@ -145,7 +151,13 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) {
}
}
// Generate 128+1 blocks (totally 1 CHT section)
server, client, tearDown := newClientServerEnv(t, int(config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false, true)
netconfig := testnetConfig{
blocks: int(config.ChtSize + config.ChtConfirms),
protocol: protocol,
indexFn: waitIndexers,
nopruning: true,
}
server, client, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
expected := config.ChtSize + config.ChtConfirms
@ -160,7 +172,7 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) {
// Register the assembled checkpoint into oracle.
header := server.backend.Blockchain().CurrentHeader()
data := append([]byte{0x19, 0x00}, append(registrarAddr.Bytes(), append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, cp.Hash().Bytes()...)...)...)
data := append([]byte{0x19, 0x00}, append(oracleAddr.Bytes(), append([]byte{0, 0, 0, 0, 0, 0, 0, 0}, cp.Hash().Bytes()...)...)...)
sig, _ := crypto.Sign(crypto.Keccak256(data), signerKey)
sig[64] += 27 // Transform V from 0/1 to 27/28 according to the yellow paper
auth, _ := bind.NewKeyedTransactorWithChainID(signerKey, big.NewInt(1337))
@ -220,7 +232,9 @@ func testMissOracleBackend(t *testing.T, hasCheckpoint bool) {
}
}
func TestSyncFromConfiguredCheckpoint(t *testing.T) {
func TestSyncFromConfiguredCheckpointLES3(t *testing.T) { testSyncFromConfiguredCheckpoint(t, lpv3) }
func testSyncFromConfiguredCheckpoint(t *testing.T, protocol int) {
config := light.TestServerIndexerConfig
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
@ -234,7 +248,13 @@ func TestSyncFromConfiguredCheckpoint(t *testing.T) {
}
}
// Generate 256+1 blocks (totally 2 CHT sections)
server, client, tearDown := newClientServerEnv(t, int(2*config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false, true)
netconfig := testnetConfig{
blocks: int(2*config.ChtSize + config.ChtConfirms),
protocol: protocol,
indexFn: waitIndexers,
nopruning: true,
}
server, client, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
// Configure the local checkpoint(the first section)
@ -296,7 +316,9 @@ func TestSyncFromConfiguredCheckpoint(t *testing.T) {
}
}
func TestSyncAll(t *testing.T) {
func TestSyncAll(t *testing.T) { testSyncAll(t, lpv3) }
func testSyncAll(t *testing.T, protocol int) {
config := light.TestServerIndexerConfig
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
@ -310,7 +332,13 @@ func TestSyncAll(t *testing.T) {
}
}
// Generate 256+1 blocks (totally 2 CHT sections)
server, client, tearDown := newClientServerEnv(t, int(2*config.ChtSize+config.ChtConfirms), 3, waitIndexers, nil, 0, false, false, true)
netconfig := testnetConfig{
blocks: int(2*config.ChtSize + config.ChtConfirms),
protocol: protocol,
indexFn: waitIndexers,
nopruning: true,
}
server, client, tearDown := newClientServerEnv(t, netconfig)
defer tearDown()
client.handler.backend.config.SyncFromCheckpoint = true

@ -15,7 +15,8 @@
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// This file contains some shares testing functionality, common to multiple
// different files and modules being tested.
// different files and modules being tested. Client based network and Server
// based network can be created easily with available APIs.
package les
@ -68,8 +69,8 @@ var (
testEventEmitterCode = common.Hex2Bytes("60606040523415600e57600080fd5b7f57050ab73f6b9ebdd9f76b8d4997793f48cf956e965ee070551b9ca0bb71584e60405160405180910390a160358060476000396000f3006060604052600080fd00a165627a7a723058203f727efcad8b5811f8cb1fc2620ce5e8c63570d697aef968172de296ea3994140029")
// Checkpoint registrar relative
registrarAddr common.Address
// Checkpoint oracle relative fields
oracleAddr common.Address
signerKey, _ = crypto.GenerateKey()
signerAddr = crypto.PubkeyToAddress(signerKey.PublicKey)
)
@ -112,14 +113,23 @@ func prepare(n int, backend *backends.SimulatedBackend) {
for i := 0; i < n; i++ {
switch i {
case 0:
// Builtin-block
// number: 1
// txs: 2
// deploy checkpoint contract
auth, _ := bind.NewKeyedTransactorWithChainID(bankKey, big.NewInt(1337))
registrarAddr, _, _, _ = contract.DeployCheckpointOracle(auth, backend, []common.Address{signerAddr}, sectionSize, processConfirms, big.NewInt(1))
oracleAddr, _, _, _ = contract.DeployCheckpointOracle(auth, backend, []common.Address{signerAddr}, sectionSize, processConfirms, big.NewInt(1))
// bankUser transfers some ether to user1
nonce, _ := backend.PendingNonceAt(ctx, bankAddr)
tx, _ := types.SignTx(types.NewTransaction(nonce, userAddr1, big.NewInt(10000), params.TxGas, nil, nil), signer, bankKey)
backend.SendTransaction(ctx, tx)
case 1:
// Builtin-block
// number: 2
// txs: 4
bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr)
userNonce1, _ := backend.PendingNonceAt(ctx, userAddr1)
@ -140,6 +150,10 @@ func prepare(n int, backend *backends.SimulatedBackend) {
tx4, _ := types.SignTx(types.NewContractCreation(userNonce1+2, big.NewInt(0), 200000, big.NewInt(0), testEventEmitterCode), signer, userKey1)
backend.SendTransaction(ctx, tx4)
case 2:
// Builtin-block
// number: 3
// txs: 2
// bankUser transfer some ether to signer
bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr)
tx1, _ := types.SignTx(types.NewTransaction(bankNonce, signerAddr, big.NewInt(1000000000), params.TxGas, nil, nil), signer, bankKey)
@ -150,6 +164,10 @@ func prepare(n int, backend *backends.SimulatedBackend) {
tx2, _ := types.SignTx(types.NewTransaction(bankNonce+1, testContractAddr, big.NewInt(0), 100000, nil, data), signer, bankKey)
backend.SendTransaction(ctx, tx2)
case 3:
// Builtin-block
// number: 4
// txs: 1
// invoke test contract
bankNonce, _ := backend.PendingNonceAt(ctx, bankAddr)
data := common.Hex2Bytes("C16431B900000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002")
@ -310,45 +328,61 @@ type testPeer struct {
app *p2p.MsgPipeRW // Application layer reader/writer to simulate the local side
}
// newTestPeer creates a new peer registered at the given protocol manager.
func newTestPeer(t *testing.T, name string, version int, handler *serverHandler, shake bool, testCost uint64) (*testPeer, <-chan error) {
// Create a message pipe to communicate through
app, net := p2p.MsgPipe()
// handshakeWithServer executes the handshake with the remote server peer.
func (p *testPeer) handshakeWithServer(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, forkID forkid.ID) {
// It only works for the simulated client peer
if p.cpeer == nil {
t.Fatal("handshake for client peer only")
}
var sendList keyValueList
sendList = sendList.add("protocolVersion", uint64(p.cpeer.version))
sendList = sendList.add("networkId", uint64(NetworkId))
sendList = sendList.add("headTd", td)
sendList = sendList.add("headHash", head)
sendList = sendList.add("headNum", headNum)
sendList = sendList.add("genesisHash", genesis)
if p.cpeer.version >= lpv4 {
sendList = sendList.add("forkID", &forkID)
}
if err := p2p.ExpectMsg(p.app, StatusMsg, nil); err != nil {
t.Fatalf("status recv: %v", err)
}
if err := p2p.Send(p.app, StatusMsg, sendList); err != nil {
t.Fatalf("status send: %v", err)
}
}
// Generate a random id and create the peer
var id enode.ID
rand.Read(id[:])
peer := newClientPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net)
// Start the peer on a new thread
errCh := make(chan error, 1)
go func() {
select {
case <-handler.closeCh:
errCh <- p2p.DiscQuitting
case errCh <- handler.handle(peer):
// handshakeWithClient executes the handshake with the remote client peer.
func (p *testPeer) handshakeWithClient(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, forkID forkid.ID, costList RequestCostList, recentTxLookup uint64) {
// It only works for the simulated client peer
if p.speer == nil {
t.Fatal("handshake for server peer only")
}
}()
tp := &testPeer{
app: app,
net: net,
cpeer: peer,
var sendList keyValueList
sendList = sendList.add("protocolVersion", uint64(p.speer.version))
sendList = sendList.add("networkId", uint64(NetworkId))
sendList = sendList.add("headTd", td)
sendList = sendList.add("headHash", head)
sendList = sendList.add("headNum", headNum)
sendList = sendList.add("genesisHash", genesis)
sendList = sendList.add("serveHeaders", nil)
sendList = sendList.add("serveChainSince", uint64(0))
sendList = sendList.add("serveStateSince", uint64(0))
sendList = sendList.add("serveRecentState", uint64(core.TriesInMemory-4))
sendList = sendList.add("txRelay", nil)
sendList = sendList.add("flowControl/BL", testBufLimit)
sendList = sendList.add("flowControl/MRR", testBufRecharge)
sendList = sendList.add("flowControl/MRC", costList)
if p.speer.version >= lpv4 {
sendList = sendList.add("forkID", &forkID)
sendList = sendList.add("recentTxLookup", recentTxLookup)
}
// Execute any implicitly requested handshakes and return
if shake {
// Customize the cost table if required.
if testCost != 0 {
handler.server.costTracker.testCostList = testCostList(testCost)
if err := p2p.ExpectMsg(p.app, StatusMsg, nil); err != nil {
t.Fatalf("status recv: %v", err)
}
var (
genesis = handler.blockchain.Genesis()
head = handler.blockchain.CurrentHeader()
td = handler.blockchain.GetTd(head.Hash(), head.Number.Uint64())
)
forkID := forkid.NewID(handler.blockchain.Config(), genesis.Hash(), head.Number.Uint64())
tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), forkID, testCostList(testCost))
if err := p2p.Send(p.app, StatusMsg, sendList); err != nil {
t.Fatalf("status send: %v", err)
}
return tp, errCh
}
// close terminates the local side of the peer, notifying the remote protocol
@ -402,48 +436,9 @@ func newTestPeerPair(name string, version int, server *serverHandler, client *cl
return &testPeer{cpeer: peer1, net: net, app: app}, &testPeer{speer: peer2, net: app, app: net}, nil
}
// handshake simulates a trivial handshake that expects the same state from the
// remote side as we are simulating locally.
func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, forkID forkid.ID, costList RequestCostList) {
var expList keyValueList
expList = expList.add("protocolVersion", uint64(p.cpeer.version))
expList = expList.add("networkId", uint64(NetworkId))
expList = expList.add("headTd", td)
expList = expList.add("headHash", head)
expList = expList.add("headNum", headNum)
expList = expList.add("genesisHash", genesis)
if p.cpeer.version >= lpv4 {
expList = expList.add("forkID", &forkID)
}
sendList := make(keyValueList, len(expList))
copy(sendList, expList)
expList = expList.add("serveHeaders", nil)
expList = expList.add("serveChainSince", uint64(0))
expList = expList.add("serveStateSince", uint64(0))
expList = expList.add("serveRecentState", uint64(core.TriesInMemory-4))
expList = expList.add("txRelay", nil)
if p.cpeer.version >= lpv4 {
expList = expList.add("recentTxLookup", uint64(0))
}
expList = expList.add("flowControl/BL", testBufLimit)
expList = expList.add("flowControl/MRR", testBufRecharge)
expList = expList.add("flowControl/MRC", costList)
if err := p2p.ExpectMsg(p.app, StatusMsg, expList); err != nil {
t.Fatalf("status recv: %v", err)
}
if err := p2p.Send(p.app, StatusMsg, sendList); err != nil {
t.Fatalf("status send: %v", err)
}
p.cpeer.fcParams = flowcontrol.ServerParams{
BufLimit: testBufLimit,
MinRecharge: testBufRecharge,
}
}
type indexerCallback func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer)
// testClient represents a client for testing with necessary auxiliary fields.
// testClient represents a client object for testing with necessary auxiliary fields.
type testClient struct {
clock mclock.Clock
db ethdb.Database
@ -455,7 +450,58 @@ type testClient struct {
bloomTrieIndexer *core.ChainIndexer
}
// testServer represents a server for testing with necessary auxiliary fields.
// newRawPeer creates a new server peer connects to the server and do the handshake.
func (client *testClient) newRawPeer(t *testing.T, name string, version int, recentTxLookup uint64) (*testPeer, func(), <-chan error) {
// Create a message pipe to communicate through
app, net := p2p.MsgPipe()
// Generate a random id and create the peer
var id enode.ID
rand.Read(id[:])
peer := newServerPeer(version, NetworkId, false, p2p.NewPeer(id, name, nil), net)
// Start the peer on a new thread
errCh := make(chan error, 1)
go func() {
select {
case <-client.handler.closeCh:
errCh <- p2p.DiscQuitting
case errCh <- client.handler.handle(peer):
}
}()
tp := &testPeer{
app: app,
net: net,
speer: peer,
}
var (
genesis = client.handler.backend.blockchain.Genesis()
head = client.handler.backend.blockchain.CurrentHeader()
td = client.handler.backend.blockchain.GetTd(head.Hash(), head.Number.Uint64())
)
forkID := forkid.NewID(client.handler.backend.blockchain.Config(), genesis.Hash(), head.Number.Uint64())
tp.handshakeWithClient(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), forkID, testCostList(0), recentTxLookup) // disable flow control by default
// Ensure the connection is established or exits when any error occurs
for {
select {
case <-errCh:
return nil, nil, nil
default:
}
if atomic.LoadUint32(&peer.serving) == 1 {
break
}
time.Sleep(50 * time.Millisecond)
}
closePeer := func() {
tp.speer.close()
tp.close()
}
return tp, closePeer, errCh
}
// testServer represents a server object for testing with necessary auxiliary fields.
type testServer struct {
clock mclock.Clock
backend *backends.SimulatedBackend
@ -468,89 +514,109 @@ type testServer struct {
bloomTrieIndexer *core.ChainIndexer
}
func newServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, simClock bool, newPeer bool, testCost uint64) (*testServer, func()) {
db := rawdb.NewMemoryDatabase()
indexers := testIndexers(db, nil, light.TestServerIndexerConfig, true)
// newRawPeer creates a new client peer connects to the server and do the handshake.
func (server *testServer) newRawPeer(t *testing.T, name string, version int) (*testPeer, func(), <-chan error) {
// Create a message pipe to communicate through
app, net := p2p.MsgPipe()
var clock mclock.Clock = &mclock.System{}
if simClock {
clock = &mclock.Simulated{}
}
handler, b := newTestServerHandler(blocks, indexers, db, clock)
// Generate a random id and create the peer
var id enode.ID
rand.Read(id[:])
peer := newClientPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net)
var peer *testPeer
if newPeer {
peer, _ = newTestPeer(t, "peer", protocol, handler, true, testCost)
// Start the peer on a new thread
errCh := make(chan error, 1)
go func() {
select {
case <-server.handler.closeCh:
errCh <- p2p.DiscQuitting
case errCh <- server.handler.handle(peer):
}
}()
tp := &testPeer{
app: app,
net: net,
cpeer: peer,
}
var (
genesis = server.handler.blockchain.Genesis()
head = server.handler.blockchain.CurrentHeader()
td = server.handler.blockchain.GetTd(head.Hash(), head.Number.Uint64())
)
forkID := forkid.NewID(server.handler.blockchain.Config(), genesis.Hash(), head.Number.Uint64())
tp.handshakeWithServer(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash(), forkID)
cIndexer, bIndexer, btIndexer := indexers[0], indexers[1], indexers[2]
cIndexer.Start(handler.blockchain)
bIndexer.Start(handler.blockchain)
// Wait until indexers generate enough index data.
if callback != nil {
callback(cIndexer, bIndexer, btIndexer)
// Ensure the connection is established or exits when any error occurs
for {
select {
case <-errCh:
return nil, nil, nil
default:
}
server := &testServer{
clock: clock,
backend: b,
db: db,
peer: peer,
handler: handler,
chtIndexer: cIndexer,
bloomIndexer: bIndexer,
bloomTrieIndexer: btIndexer,
if atomic.LoadUint32(&peer.serving) == 1 {
break
}
teardown := func() {
if newPeer {
peer.close()
peer.cpeer.close()
b.Close()
time.Sleep(50 * time.Millisecond)
}
cIndexer.Close()
bIndexer.Close()
closePeer := func() {
tp.cpeer.close()
tp.close()
}
return server, teardown
return tp, closePeer, errCh
}
func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexerCallback, ulcServers []string, ulcFraction int, simClock bool, connect bool, disablePruning bool) (*testServer, *testClient, func()) {
sdb, cdb := rawdb.NewMemoryDatabase(), rawdb.NewMemoryDatabase()
speers := newServerPeerSet()
// testnetConfig wraps all the configurations for testing network.
type testnetConfig struct {
blocks int
protocol int
indexFn indexerCallback
ulcServers []string
ulcFraction int
simClock bool
connect bool
nopruning bool
}
func newClientServerEnv(t *testing.T, config testnetConfig) (*testServer, *testClient, func()) {
var (
sdb = rawdb.NewMemoryDatabase()
cdb = rawdb.NewMemoryDatabase()
speers = newServerPeerSet()
)
var clock mclock.Clock = &mclock.System{}
if simClock {
if config.simClock {
clock = &mclock.Simulated{}
}
dist := newRequestDistributor(speers, clock)
rm := newRetrieveManager(speers, dist, func() time.Duration { return time.Millisecond * 500 })
odr := NewLesOdr(cdb, light.TestClientIndexerConfig, rm)
odr := NewLesOdr(cdb, light.TestClientIndexerConfig, speers, rm)
sindexers := testIndexers(sdb, nil, light.TestServerIndexerConfig, true)
cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig, disablePruning)
cIndexers := testIndexers(cdb, odr, light.TestClientIndexerConfig, config.nopruning)
scIndexer, sbIndexer, sbtIndexer := sindexers[0], sindexers[1], sindexers[2]
ccIndexer, cbIndexer, cbtIndexer := cIndexers[0], cIndexers[1], cIndexers[2]
odr.SetIndexers(ccIndexer, cbIndexer, cbtIndexer)
server, b := newTestServerHandler(blocks, sindexers, sdb, clock)
client := newTestClientHandler(b, odr, cIndexers, cdb, speers, ulcServers, ulcFraction)
server, b := newTestServerHandler(config.blocks, sindexers, sdb, clock)
client := newTestClientHandler(b, odr, cIndexers, cdb, speers, config.ulcServers, config.ulcFraction)
scIndexer.Start(server.blockchain)
sbIndexer.Start(server.blockchain)
ccIndexer.Start(client.backend.blockchain)
cbIndexer.Start(client.backend.blockchain)
if callback != nil {
callback(scIndexer, sbIndexer, sbtIndexer)
if config.indexFn != nil {
config.indexFn(scIndexer, sbIndexer, sbtIndexer)
}
var (
err error
speer, cpeer *testPeer
)
if connect {
if config.connect {
done := make(chan struct{})
client.syncEnd = func(_ *types.Header) { close(done) }
cpeer, speer, err = newTestPeerPair("peer", protocol, server, client)
cpeer, speer, err = newTestPeerPair("peer", config.protocol, server, client)
if err != nil {
t.Fatalf("Failed to connect testing peers %v", err)
}
@ -580,7 +646,7 @@ func newClientServerEnv(t *testing.T, blocks int, protocol int, callback indexer
bloomTrieIndexer: cbtIndexer,
}
teardown := func() {
if connect {
if config.connect {
speer.close()
cpeer.close()
cpeer.cpeer.close()

@ -126,7 +126,12 @@ func connect(server *serverHandler, serverId enode.ID, client *clientHandler, pr
// newTestServerPeer creates server peer.
func newTestServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *enode.Node, func()) {
s, teardown := newServerEnv(t, blocks, protocol, nil, false, false, 0)
netconfig := testnetConfig{
blocks: blocks,
protocol: protocol,
nopruning: true,
}
s, _, teardown := newClientServerEnv(t, netconfig)
key, err := crypto.GenerateKey()
if err != nil {
t.Fatal("generate key err:", err)
@ -138,6 +143,12 @@ func newTestServerPeer(t *testing.T, blocks int, protocol int) (*testServer, *en
// newTestLightPeer creates node with light sync mode
func newTestLightPeer(t *testing.T, protocol int, ulcServers []string, ulcFraction int) (*testClient, func()) {
_, c, teardown := newClientServerEnv(t, 0, protocol, nil, ulcServers, ulcFraction, false, false, true)
netconfig := testnetConfig{
protocol: protocol,
ulcServers: ulcServers,
ulcFraction: ulcFraction,
nopruning: true,
}
_, c, teardown := newClientServerEnv(t, netconfig)
return c, teardown
}

@ -42,6 +42,7 @@ type OdrBackend interface {
BloomTrieIndexer() *core.ChainIndexer
BloomIndexer() *core.ChainIndexer
Retrieve(ctx context.Context, req OdrRequest) error
RetrieveTxStatus(ctx context.Context, req *TxStatusRequest) error
IndexerConfig() *IndexerConfig
}

@ -269,10 +269,15 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bit uint, sections []uint
return result, nil
}
// GetTransaction retrieves a canonical transaction by hash and also returns its position in the chain
// GetTransaction retrieves a canonical transaction by hash and also returns
// its position in the chain. There is no guarantee in the LES protocol that
// the mined transaction will be retrieved back for sure because of different
// reasons(the transaction is unindexed, the malicous server doesn't reply it
// deliberately, etc). Therefore, unretrieved transactions will receive a certain
// number of retrys, thus giving a weak guarantee.
func GetTransaction(ctx context.Context, odr OdrBackend, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
r := &TxStatusRequest{Hashes: []common.Hash{txHash}}
if err := odr.Retrieve(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded {
if err := odr.RetrieveTxStatus(ctx, r); err != nil || r.Status[0].Status != core.TxStatusIncluded {
return nil, common.Hash{}, 0, 0, err
}
pos := r.Status[0].Lookup