all: make indexer configurable (#17188)
This commit is contained in:
parent
c64d72bea2
commit
b69476b372
@ -322,7 +322,6 @@ func (c *ChainIndexer) updateLoop() {
|
|||||||
updating = false
|
updating = false
|
||||||
c.log.Info("Finished upgrading chain index")
|
c.log.Info("Finished upgrading chain index")
|
||||||
}
|
}
|
||||||
|
|
||||||
c.cascadedHead = c.storedSections*c.sectionSize - 1
|
c.cascadedHead = c.storedSections*c.sectionSize - 1
|
||||||
for _, child := range c.children {
|
for _, child := range c.children {
|
||||||
c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
|
c.log.Trace("Cascading chain index update", "head", c.cascadedHead)
|
||||||
|
@ -136,7 +136,7 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
|
|||||||
gasPrice: config.MinerGasPrice,
|
gasPrice: config.MinerGasPrice,
|
||||||
etherbase: config.Etherbase,
|
etherbase: config.Etherbase,
|
||||||
bloomRequests: make(chan chan *bloombits.Retrieval),
|
bloomRequests: make(chan chan *bloombits.Retrieval),
|
||||||
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, bloomConfirms),
|
bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
|
log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
|
||||||
@ -426,7 +426,7 @@ func (s *Ethereum) Protocols() []p2p.Protocol {
|
|||||||
// Ethereum protocol implementation.
|
// Ethereum protocol implementation.
|
||||||
func (s *Ethereum) Start(srvr *p2p.Server) error {
|
func (s *Ethereum) Start(srvr *p2p.Server) error {
|
||||||
// Start the bloom bits servicing goroutines
|
// Start the bloom bits servicing goroutines
|
||||||
s.startBloomHandlers()
|
s.startBloomHandlers(params.BloomBitsBlocks)
|
||||||
|
|
||||||
// Start the RPC service
|
// Start the RPC service
|
||||||
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
|
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
|
||||||
|
@ -27,7 +27,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -50,7 +49,7 @@ const (
|
|||||||
|
|
||||||
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
|
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
|
||||||
// retrievals from possibly a range of filters and serving the data to satisfy.
|
// retrievals from possibly a range of filters and serving the data to satisfy.
|
||||||
func (eth *Ethereum) startBloomHandlers() {
|
func (eth *Ethereum) startBloomHandlers(sectionSize uint64) {
|
||||||
for i := 0; i < bloomServiceThreads; i++ {
|
for i := 0; i < bloomServiceThreads; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
@ -62,9 +61,9 @@ func (eth *Ethereum) startBloomHandlers() {
|
|||||||
task := <-request
|
task := <-request
|
||||||
task.Bitsets = make([][]byte, len(task.Sections))
|
task.Bitsets = make([][]byte, len(task.Sections))
|
||||||
for i, section := range task.Sections {
|
for i, section := range task.Sections {
|
||||||
head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*params.BloomBitsBlocks-1)
|
head := rawdb.ReadCanonicalHash(eth.chainDb, (section+1)*sectionSize-1)
|
||||||
if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
|
if compVector, err := rawdb.ReadBloomBits(eth.chainDb, task.Bit, section, head); err == nil {
|
||||||
if blob, err := bitutil.DecompressBytes(compVector, int(params.BloomBitsBlocks)/8); err == nil {
|
if blob, err := bitutil.DecompressBytes(compVector, int(sectionSize/8)); err == nil {
|
||||||
task.Bitsets[i] = blob
|
task.Bitsets[i] = blob
|
||||||
} else {
|
} else {
|
||||||
task.Error = err
|
task.Error = err
|
||||||
@ -81,10 +80,6 @@ func (eth *Ethereum) startBloomHandlers() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// bloomConfirms is the number of confirmation blocks before a bloom section is
|
|
||||||
// considered probably final and its rotated bits are calculated.
|
|
||||||
bloomConfirms = 256
|
|
||||||
|
|
||||||
// bloomThrottling is the time to wait between processing two consecutive index
|
// bloomThrottling is the time to wait between processing two consecutive index
|
||||||
// sections. It's useful during chain upgrades to prevent disk overload.
|
// sections. It's useful during chain upgrades to prevent disk overload.
|
||||||
bloomThrottling = 100 * time.Millisecond
|
bloomThrottling = 100 * time.Millisecond
|
||||||
@ -102,14 +97,14 @@ type BloomIndexer struct {
|
|||||||
|
|
||||||
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
|
// NewBloomIndexer returns a chain indexer that generates bloom bits data for the
|
||||||
// canonical chain for fast logs filtering.
|
// canonical chain for fast logs filtering.
|
||||||
func NewBloomIndexer(db ethdb.Database, size, confReq uint64) *core.ChainIndexer {
|
func NewBloomIndexer(db ethdb.Database, size, confirms uint64) *core.ChainIndexer {
|
||||||
backend := &BloomIndexer{
|
backend := &BloomIndexer{
|
||||||
db: db,
|
db: db,
|
||||||
size: size,
|
size: size,
|
||||||
}
|
}
|
||||||
table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))
|
table := ethdb.NewTable(db, string(rawdb.BloomBitsIndexPrefix))
|
||||||
|
|
||||||
return core.NewChainIndexer(db, table, backend, size, confReq, bloomThrottling, "bloombits")
|
return core.NewChainIndexer(db, table, backend, size, confirms, bloomThrottling, "bloombits")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
|
// Reset implements core.ChainIndexerBackend, starting a new bloombits index
|
||||||
|
@ -192,7 +192,7 @@ func (b *LesApiBackend) BloomStatus() (uint64, uint64) {
|
|||||||
return 0, 0
|
return 0, 0
|
||||||
}
|
}
|
||||||
sections, _, _ := b.eth.bloomIndexer.Sections()
|
sections, _, _ := b.eth.bloomIndexer.Sections()
|
||||||
return light.BloomTrieFrequency, sections
|
return params.BloomBitsBlocksClient, sections
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *LesApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
|
func (b *LesApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
|
||||||
|
@ -95,6 +95,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
|
|||||||
lesCommons: lesCommons{
|
lesCommons: lesCommons{
|
||||||
chainDb: chainDb,
|
chainDb: chainDb,
|
||||||
config: config,
|
config: config,
|
||||||
|
iConfig: light.DefaultClientIndexerConfig,
|
||||||
},
|
},
|
||||||
chainConfig: chainConfig,
|
chainConfig: chainConfig,
|
||||||
eventMux: ctx.EventMux,
|
eventMux: ctx.EventMux,
|
||||||
@ -105,16 +106,16 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
|
|||||||
shutdownChan: make(chan bool),
|
shutdownChan: make(chan bool),
|
||||||
networkId: config.NetworkId,
|
networkId: config.NetworkId,
|
||||||
bloomRequests: make(chan chan *bloombits.Retrieval),
|
bloomRequests: make(chan chan *bloombits.Retrieval),
|
||||||
bloomIndexer: eth.NewBloomIndexer(chainDb, light.BloomTrieFrequency, light.HelperTrieConfirmations),
|
bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations),
|
||||||
}
|
}
|
||||||
|
|
||||||
leth.relay = NewLesTxRelay(peers, leth.reqDist)
|
leth.relay = NewLesTxRelay(peers, leth.reqDist)
|
||||||
leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg)
|
leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg)
|
||||||
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
|
leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool)
|
||||||
|
|
||||||
leth.odr = NewLesOdr(chainDb, leth.retriever)
|
leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever)
|
||||||
leth.chtIndexer = light.NewChtIndexer(chainDb, true, leth.odr)
|
leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequencyClient, params.HelperTrieConfirmations)
|
||||||
leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, true, leth.odr)
|
leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency)
|
||||||
leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer)
|
leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer)
|
||||||
|
|
||||||
// Note: NewLightChain adds the trusted checkpoint so it needs an ODR with
|
// Note: NewLightChain adds the trusted checkpoint so it needs an ODR with
|
||||||
@ -135,7 +136,7 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay)
|
leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay)
|
||||||
if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil {
|
if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
leth.ApiBackend = &LesApiBackend{leth, nil}
|
leth.ApiBackend = &LesApiBackend{leth, nil}
|
||||||
@ -230,8 +231,8 @@ func (s *LightEthereum) Protocols() []p2p.Protocol {
|
|||||||
// Start implements node.Service, starting all internal goroutines needed by the
|
// Start implements node.Service, starting all internal goroutines needed by the
|
||||||
// Ethereum protocol implementation.
|
// Ethereum protocol implementation.
|
||||||
func (s *LightEthereum) Start(srvr *p2p.Server) error {
|
func (s *LightEthereum) Start(srvr *p2p.Server) error {
|
||||||
s.startBloomHandlers()
|
|
||||||
log.Warn("Light client mode is an experimental feature")
|
log.Warn("Light client mode is an experimental feature")
|
||||||
|
s.startBloomHandlers(params.BloomBitsBlocksClient)
|
||||||
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId)
|
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId)
|
||||||
// clients are searching for the first advertised protocol in the list
|
// clients are searching for the first advertised protocol in the list
|
||||||
protocolVersion := AdvertiseProtocolVersions[0]
|
protocolVersion := AdvertiseProtocolVersions[0]
|
||||||
|
@ -43,7 +43,7 @@ const (
|
|||||||
|
|
||||||
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
|
// startBloomHandlers starts a batch of goroutines to accept bloom bit database
|
||||||
// retrievals from possibly a range of filters and serving the data to satisfy.
|
// retrievals from possibly a range of filters and serving the data to satisfy.
|
||||||
func (eth *LightEthereum) startBloomHandlers() {
|
func (eth *LightEthereum) startBloomHandlers(sectionSize uint64) {
|
||||||
for i := 0; i < bloomServiceThreads; i++ {
|
for i := 0; i < bloomServiceThreads; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
@ -57,7 +57,7 @@ func (eth *LightEthereum) startBloomHandlers() {
|
|||||||
compVectors, err := light.GetBloomBits(task.Context, eth.odr, task.Bit, task.Sections)
|
compVectors, err := light.GetBloomBits(task.Context, eth.odr, task.Bit, task.Sections)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
for i := range task.Sections {
|
for i := range task.Sections {
|
||||||
if blob, err := bitutil.DecompressBytes(compVectors[i], int(light.BloomTrieFrequency/8)); err == nil {
|
if blob, err := bitutil.DecompressBytes(compVectors[i], int(sectionSize/8)); err == nil {
|
||||||
task.Bitsets[i] = blob
|
task.Bitsets[i] = blob
|
||||||
} else {
|
} else {
|
||||||
task.Error = err
|
task.Error = err
|
||||||
|
@ -33,6 +33,7 @@ import (
|
|||||||
// lesCommons contains fields needed by both server and client.
|
// lesCommons contains fields needed by both server and client.
|
||||||
type lesCommons struct {
|
type lesCommons struct {
|
||||||
config *eth.Config
|
config *eth.Config
|
||||||
|
iConfig *light.IndexerConfig
|
||||||
chainDb ethdb.Database
|
chainDb ethdb.Database
|
||||||
protocolManager *ProtocolManager
|
protocolManager *ProtocolManager
|
||||||
chtIndexer, bloomTrieIndexer *core.ChainIndexer
|
chtIndexer, bloomTrieIndexer *core.ChainIndexer
|
||||||
@ -81,7 +82,7 @@ func (c *lesCommons) nodeInfo() interface{} {
|
|||||||
|
|
||||||
if !c.protocolManager.lightSync {
|
if !c.protocolManager.lightSync {
|
||||||
// convert to client section size if running in server mode
|
// convert to client section size if running in server mode
|
||||||
sections /= light.CHTFrequencyClient / light.CHTFrequencyServer
|
sections /= c.iConfig.PairChtSize / c.iConfig.ChtSize
|
||||||
}
|
}
|
||||||
|
|
||||||
if sections2 < sections {
|
if sections2 < sections {
|
||||||
@ -94,7 +95,8 @@ func (c *lesCommons) nodeInfo() interface{} {
|
|||||||
if c.protocolManager.lightSync {
|
if c.protocolManager.lightSync {
|
||||||
chtRoot = light.GetChtRoot(c.chainDb, sectionIndex, sectionHead)
|
chtRoot = light.GetChtRoot(c.chainDb, sectionIndex, sectionHead)
|
||||||
} else {
|
} else {
|
||||||
chtRoot = light.GetChtV2Root(c.chainDb, sectionIndex, sectionHead)
|
idxV2 := (sectionIndex+1)*c.iConfig.PairChtSize/c.iConfig.ChtSize - 1
|
||||||
|
chtRoot = light.GetChtRoot(c.chainDb, idxV2, sectionHead)
|
||||||
}
|
}
|
||||||
cht = light.TrustedCheckpoint{
|
cht = light.TrustedCheckpoint{
|
||||||
SectionIdx: sectionIndex,
|
SectionIdx: sectionIndex,
|
||||||
|
@ -94,6 +94,7 @@ type ProtocolManager struct {
|
|||||||
txrelay *LesTxRelay
|
txrelay *LesTxRelay
|
||||||
networkId uint64
|
networkId uint64
|
||||||
chainConfig *params.ChainConfig
|
chainConfig *params.ChainConfig
|
||||||
|
iConfig *light.IndexerConfig
|
||||||
blockchain BlockChain
|
blockchain BlockChain
|
||||||
chainDb ethdb.Database
|
chainDb ethdb.Database
|
||||||
odr *LesOdr
|
odr *LesOdr
|
||||||
@ -123,13 +124,14 @@ type ProtocolManager struct {
|
|||||||
|
|
||||||
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
|
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
|
||||||
// with the ethereum network.
|
// with the ethereum network.
|
||||||
func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
|
func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) {
|
||||||
// Create the protocol manager with the base fields
|
// Create the protocol manager with the base fields
|
||||||
manager := &ProtocolManager{
|
manager := &ProtocolManager{
|
||||||
lightSync: lightSync,
|
lightSync: lightSync,
|
||||||
eventMux: mux,
|
eventMux: mux,
|
||||||
blockchain: blockchain,
|
blockchain: blockchain,
|
||||||
chainConfig: chainConfig,
|
chainConfig: chainConfig,
|
||||||
|
iConfig: indexerConfig,
|
||||||
chainDb: chainDb,
|
chainDb: chainDb,
|
||||||
odr: odr,
|
odr: odr,
|
||||||
networkId: networkId,
|
networkId: networkId,
|
||||||
@ -882,7 +884,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
|||||||
trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix))
|
trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix))
|
||||||
for _, req := range req.Reqs {
|
for _, req := range req.Reqs {
|
||||||
if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
|
if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil {
|
||||||
sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*light.CHTFrequencyServer-1)
|
sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*pm.iConfig.ChtSize-1)
|
||||||
if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) {
|
if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) {
|
||||||
trie, err := trie.New(root, trieDb)
|
trie, err := trie.New(root, trieDb)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1137,10 +1139,11 @@ func (pm *ProtocolManager) getAccount(statedb *state.StateDB, root, hash common.
|
|||||||
func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
|
func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) {
|
||||||
switch id {
|
switch id {
|
||||||
case htCanonical:
|
case htCanonical:
|
||||||
sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*light.CHTFrequencyClient-1)
|
idxV1 := (idx+1)*(pm.iConfig.PairChtSize/pm.iConfig.ChtSize) - 1
|
||||||
return light.GetChtV2Root(pm.chainDb, idx, sectionHead), light.ChtTablePrefix
|
sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idxV1+1)*pm.iConfig.ChtSize-1)
|
||||||
|
return light.GetChtRoot(pm.chainDb, idxV1, sectionHead), light.ChtTablePrefix
|
||||||
case htBloomBits:
|
case htBloomBits:
|
||||||
sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*light.BloomTrieFrequency-1)
|
sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*pm.iConfig.BloomTrieSize-1)
|
||||||
return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
|
return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix
|
||||||
}
|
}
|
||||||
return common.Hash{}, ""
|
return common.Hash{}, ""
|
||||||
|
@ -51,10 +51,9 @@ func TestGetBlockHeadersLes1(t *testing.T) { testGetBlockHeaders(t, 1) }
|
|||||||
func TestGetBlockHeadersLes2(t *testing.T) { testGetBlockHeaders(t, 2) }
|
func TestGetBlockHeadersLes2(t *testing.T) { testGetBlockHeaders(t, 2) }
|
||||||
|
|
||||||
func testGetBlockHeaders(t *testing.T, protocol int) {
|
func testGetBlockHeaders(t *testing.T, protocol int) {
|
||||||
pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil, nil, ethdb.NewMemDatabase())
|
server, tearDown := newServerEnv(t, downloader.MaxHashFetch+15, protocol, nil)
|
||||||
bc := pm.blockchain.(*core.BlockChain)
|
defer tearDown()
|
||||||
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
|
bc := server.pm.blockchain.(*core.BlockChain)
|
||||||
defer peer.close()
|
|
||||||
|
|
||||||
// Create a "random" unknown hash for testing
|
// Create a "random" unknown hash for testing
|
||||||
var unknown common.Hash
|
var unknown common.Hash
|
||||||
@ -167,9 +166,9 @@ func testGetBlockHeaders(t *testing.T, protocol int) {
|
|||||||
}
|
}
|
||||||
// Send the hash request and verify the response
|
// Send the hash request and verify the response
|
||||||
reqID++
|
reqID++
|
||||||
cost := peer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount))
|
cost := server.tPeer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount))
|
||||||
sendRequest(peer.app, GetBlockHeadersMsg, reqID, cost, tt.query)
|
sendRequest(server.tPeer.app, GetBlockHeadersMsg, reqID, cost, tt.query)
|
||||||
if err := expectResponse(peer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil {
|
if err := expectResponse(server.tPeer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil {
|
||||||
t.Errorf("test %d: headers mismatch: %v", i, err)
|
t.Errorf("test %d: headers mismatch: %v", i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -180,10 +179,9 @@ func TestGetBlockBodiesLes1(t *testing.T) { testGetBlockBodies(t, 1) }
|
|||||||
func TestGetBlockBodiesLes2(t *testing.T) { testGetBlockBodies(t, 2) }
|
func TestGetBlockBodiesLes2(t *testing.T) { testGetBlockBodies(t, 2) }
|
||||||
|
|
||||||
func testGetBlockBodies(t *testing.T, protocol int) {
|
func testGetBlockBodies(t *testing.T, protocol int) {
|
||||||
pm := newTestProtocolManagerMust(t, false, downloader.MaxBlockFetch+15, nil, nil, nil, ethdb.NewMemDatabase())
|
server, tearDown := newServerEnv(t, downloader.MaxBlockFetch+15, protocol, nil)
|
||||||
bc := pm.blockchain.(*core.BlockChain)
|
defer tearDown()
|
||||||
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
|
bc := server.pm.blockchain.(*core.BlockChain)
|
||||||
defer peer.close()
|
|
||||||
|
|
||||||
// Create a batch of tests for various scenarios
|
// Create a batch of tests for various scenarios
|
||||||
limit := MaxBodyFetch
|
limit := MaxBodyFetch
|
||||||
@ -243,9 +241,9 @@ func testGetBlockBodies(t *testing.T, protocol int) {
|
|||||||
}
|
}
|
||||||
reqID++
|
reqID++
|
||||||
// Send the hash request and verify the response
|
// Send the hash request and verify the response
|
||||||
cost := peer.GetRequestCost(GetBlockBodiesMsg, len(hashes))
|
cost := server.tPeer.GetRequestCost(GetBlockBodiesMsg, len(hashes))
|
||||||
sendRequest(peer.app, GetBlockBodiesMsg, reqID, cost, hashes)
|
sendRequest(server.tPeer.app, GetBlockBodiesMsg, reqID, cost, hashes)
|
||||||
if err := expectResponse(peer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil {
|
if err := expectResponse(server.tPeer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil {
|
||||||
t.Errorf("test %d: bodies mismatch: %v", i, err)
|
t.Errorf("test %d: bodies mismatch: %v", i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -257,10 +255,9 @@ func TestGetCodeLes2(t *testing.T) { testGetCode(t, 2) }
|
|||||||
|
|
||||||
func testGetCode(t *testing.T, protocol int) {
|
func testGetCode(t *testing.T, protocol int) {
|
||||||
// Assemble the test environment
|
// Assemble the test environment
|
||||||
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, ethdb.NewMemDatabase())
|
server, tearDown := newServerEnv(t, 4, protocol, nil)
|
||||||
bc := pm.blockchain.(*core.BlockChain)
|
defer tearDown()
|
||||||
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
|
bc := server.pm.blockchain.(*core.BlockChain)
|
||||||
defer peer.close()
|
|
||||||
|
|
||||||
var codereqs []*CodeReq
|
var codereqs []*CodeReq
|
||||||
var codes [][]byte
|
var codes [][]byte
|
||||||
@ -277,9 +274,9 @@ func testGetCode(t *testing.T, protocol int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cost := peer.GetRequestCost(GetCodeMsg, len(codereqs))
|
cost := server.tPeer.GetRequestCost(GetCodeMsg, len(codereqs))
|
||||||
sendRequest(peer.app, GetCodeMsg, 42, cost, codereqs)
|
sendRequest(server.tPeer.app, GetCodeMsg, 42, cost, codereqs)
|
||||||
if err := expectResponse(peer.app, CodeMsg, 42, testBufLimit, codes); err != nil {
|
if err := expectResponse(server.tPeer.app, CodeMsg, 42, testBufLimit, codes); err != nil {
|
||||||
t.Errorf("codes mismatch: %v", err)
|
t.Errorf("codes mismatch: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -290,11 +287,9 @@ func TestGetReceiptLes2(t *testing.T) { testGetReceipt(t, 2) }
|
|||||||
|
|
||||||
func testGetReceipt(t *testing.T, protocol int) {
|
func testGetReceipt(t *testing.T, protocol int) {
|
||||||
// Assemble the test environment
|
// Assemble the test environment
|
||||||
db := ethdb.NewMemDatabase()
|
server, tearDown := newServerEnv(t, 4, protocol, nil)
|
||||||
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
|
defer tearDown()
|
||||||
bc := pm.blockchain.(*core.BlockChain)
|
bc := server.pm.blockchain.(*core.BlockChain)
|
||||||
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
|
|
||||||
defer peer.close()
|
|
||||||
|
|
||||||
// Collect the hashes to request, and the response to expect
|
// Collect the hashes to request, and the response to expect
|
||||||
hashes, receipts := []common.Hash{}, []types.Receipts{}
|
hashes, receipts := []common.Hash{}, []types.Receipts{}
|
||||||
@ -302,12 +297,12 @@ func testGetReceipt(t *testing.T, protocol int) {
|
|||||||
block := bc.GetBlockByNumber(i)
|
block := bc.GetBlockByNumber(i)
|
||||||
|
|
||||||
hashes = append(hashes, block.Hash())
|
hashes = append(hashes, block.Hash())
|
||||||
receipts = append(receipts, rawdb.ReadReceipts(db, block.Hash(), block.NumberU64()))
|
receipts = append(receipts, rawdb.ReadReceipts(server.db, block.Hash(), block.NumberU64()))
|
||||||
}
|
}
|
||||||
// Send the hash request and verify the response
|
// Send the hash request and verify the response
|
||||||
cost := peer.GetRequestCost(GetReceiptsMsg, len(hashes))
|
cost := server.tPeer.GetRequestCost(GetReceiptsMsg, len(hashes))
|
||||||
sendRequest(peer.app, GetReceiptsMsg, 42, cost, hashes)
|
sendRequest(server.tPeer.app, GetReceiptsMsg, 42, cost, hashes)
|
||||||
if err := expectResponse(peer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil {
|
if err := expectResponse(server.tPeer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil {
|
||||||
t.Errorf("receipts mismatch: %v", err)
|
t.Errorf("receipts mismatch: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -318,11 +313,9 @@ func TestGetProofsLes2(t *testing.T) { testGetProofs(t, 2) }
|
|||||||
|
|
||||||
func testGetProofs(t *testing.T, protocol int) {
|
func testGetProofs(t *testing.T, protocol int) {
|
||||||
// Assemble the test environment
|
// Assemble the test environment
|
||||||
db := ethdb.NewMemDatabase()
|
server, tearDown := newServerEnv(t, 4, protocol, nil)
|
||||||
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
|
defer tearDown()
|
||||||
bc := pm.blockchain.(*core.BlockChain)
|
bc := server.pm.blockchain.(*core.BlockChain)
|
||||||
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
|
|
||||||
defer peer.close()
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
proofreqs []ProofReq
|
proofreqs []ProofReq
|
||||||
@ -334,7 +327,7 @@ func testGetProofs(t *testing.T, protocol int) {
|
|||||||
for i := uint64(0); i <= bc.CurrentBlock().NumberU64(); i++ {
|
for i := uint64(0); i <= bc.CurrentBlock().NumberU64(); i++ {
|
||||||
header := bc.GetHeaderByNumber(i)
|
header := bc.GetHeaderByNumber(i)
|
||||||
root := header.Root
|
root := header.Root
|
||||||
trie, _ := trie.New(root, trie.NewDatabase(db))
|
trie, _ := trie.New(root, trie.NewDatabase(server.db))
|
||||||
|
|
||||||
for _, acc := range accounts {
|
for _, acc := range accounts {
|
||||||
req := ProofReq{
|
req := ProofReq{
|
||||||
@ -356,15 +349,15 @@ func testGetProofs(t *testing.T, protocol int) {
|
|||||||
// Send the proof request and verify the response
|
// Send the proof request and verify the response
|
||||||
switch protocol {
|
switch protocol {
|
||||||
case 1:
|
case 1:
|
||||||
cost := peer.GetRequestCost(GetProofsV1Msg, len(proofreqs))
|
cost := server.tPeer.GetRequestCost(GetProofsV1Msg, len(proofreqs))
|
||||||
sendRequest(peer.app, GetProofsV1Msg, 42, cost, proofreqs)
|
sendRequest(server.tPeer.app, GetProofsV1Msg, 42, cost, proofreqs)
|
||||||
if err := expectResponse(peer.app, ProofsV1Msg, 42, testBufLimit, proofsV1); err != nil {
|
if err := expectResponse(server.tPeer.app, ProofsV1Msg, 42, testBufLimit, proofsV1); err != nil {
|
||||||
t.Errorf("proofs mismatch: %v", err)
|
t.Errorf("proofs mismatch: %v", err)
|
||||||
}
|
}
|
||||||
case 2:
|
case 2:
|
||||||
cost := peer.GetRequestCost(GetProofsV2Msg, len(proofreqs))
|
cost := server.tPeer.GetRequestCost(GetProofsV2Msg, len(proofreqs))
|
||||||
sendRequest(peer.app, GetProofsV2Msg, 42, cost, proofreqs)
|
sendRequest(server.tPeer.app, GetProofsV2Msg, 42, cost, proofreqs)
|
||||||
if err := expectResponse(peer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil {
|
if err := expectResponse(server.tPeer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil {
|
||||||
t.Errorf("proofs mismatch: %v", err)
|
t.Errorf("proofs mismatch: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -375,28 +368,33 @@ func TestGetCHTProofsLes1(t *testing.T) { testGetCHTProofs(t, 1) }
|
|||||||
func TestGetCHTProofsLes2(t *testing.T) { testGetCHTProofs(t, 2) }
|
func TestGetCHTProofsLes2(t *testing.T) { testGetCHTProofs(t, 2) }
|
||||||
|
|
||||||
func testGetCHTProofs(t *testing.T, protocol int) {
|
func testGetCHTProofs(t *testing.T, protocol int) {
|
||||||
// Figure out the client's CHT frequency
|
config := light.TestServerIndexerConfig
|
||||||
frequency := uint64(light.CHTFrequencyClient)
|
frequency := config.ChtSize
|
||||||
if protocol == 1 {
|
if protocol == 2 {
|
||||||
frequency = uint64(light.CHTFrequencyServer)
|
frequency = config.PairChtSize
|
||||||
}
|
}
|
||||||
// Assemble the test environment
|
|
||||||
db := ethdb.NewMemDatabase()
|
|
||||||
pm := newTestProtocolManagerMust(t, false, int(frequency)+light.HelperTrieProcessConfirmations, testChainGen, nil, nil, db)
|
|
||||||
bc := pm.blockchain.(*core.BlockChain)
|
|
||||||
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
|
|
||||||
defer peer.close()
|
|
||||||
|
|
||||||
// Wait a while for the CHT indexer to process the new headers
|
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
|
||||||
time.Sleep(100 * time.Millisecond * time.Duration(frequency/light.CHTFrequencyServer)) // Chain indexer throttling
|
expectSections := frequency / config.ChtSize
|
||||||
time.Sleep(250 * time.Millisecond) // CI tester slack
|
for {
|
||||||
|
cs, _, _ := cIndexer.Sections()
|
||||||
|
bs, _, _ := bIndexer.Sections()
|
||||||
|
if cs >= expectSections && bs >= expectSections {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
server, tearDown := newServerEnv(t, int(frequency+config.ChtConfirms), protocol, waitIndexers)
|
||||||
|
defer tearDown()
|
||||||
|
bc := server.pm.blockchain.(*core.BlockChain)
|
||||||
|
|
||||||
// Assemble the proofs from the different protocols
|
// Assemble the proofs from the different protocols
|
||||||
header := bc.GetHeaderByNumber(frequency)
|
header := bc.GetHeaderByNumber(frequency - 1)
|
||||||
rlp, _ := rlp.EncodeToBytes(header)
|
rlp, _ := rlp.EncodeToBytes(header)
|
||||||
|
|
||||||
key := make([]byte, 8)
|
key := make([]byte, 8)
|
||||||
binary.BigEndian.PutUint64(key, frequency)
|
binary.BigEndian.PutUint64(key, frequency-1)
|
||||||
|
|
||||||
proofsV1 := []ChtResp{{
|
proofsV1 := []ChtResp{{
|
||||||
Header: header,
|
Header: header,
|
||||||
@ -406,41 +404,41 @@ func testGetCHTProofs(t *testing.T, protocol int) {
|
|||||||
}
|
}
|
||||||
switch protocol {
|
switch protocol {
|
||||||
case 1:
|
case 1:
|
||||||
root := light.GetChtRoot(db, 0, bc.GetHeaderByNumber(frequency-1).Hash())
|
root := light.GetChtRoot(server.db, 0, bc.GetHeaderByNumber(frequency-1).Hash())
|
||||||
trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.ChtTablePrefix)))
|
trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.ChtTablePrefix)))
|
||||||
|
|
||||||
var proof light.NodeList
|
var proof light.NodeList
|
||||||
trie.Prove(key, 0, &proof)
|
trie.Prove(key, 0, &proof)
|
||||||
proofsV1[0].Proof = proof
|
proofsV1[0].Proof = proof
|
||||||
|
|
||||||
case 2:
|
case 2:
|
||||||
root := light.GetChtV2Root(db, 0, bc.GetHeaderByNumber(frequency-1).Hash())
|
root := light.GetChtRoot(server.db, (frequency/config.ChtSize)-1, bc.GetHeaderByNumber(frequency-1).Hash())
|
||||||
trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.ChtTablePrefix)))
|
trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.ChtTablePrefix)))
|
||||||
trie.Prove(key, 0, &proofsV2.Proofs)
|
trie.Prove(key, 0, &proofsV2.Proofs)
|
||||||
}
|
}
|
||||||
// Assemble the requests for the different protocols
|
// Assemble the requests for the different protocols
|
||||||
requestsV1 := []ChtReq{{
|
requestsV1 := []ChtReq{{
|
||||||
ChtNum: 1,
|
ChtNum: frequency / config.ChtSize,
|
||||||
BlockNum: frequency,
|
BlockNum: frequency - 1,
|
||||||
}}
|
}}
|
||||||
requestsV2 := []HelperTrieReq{{
|
requestsV2 := []HelperTrieReq{{
|
||||||
Type: htCanonical,
|
Type: htCanonical,
|
||||||
TrieIdx: 0,
|
TrieIdx: frequency/config.PairChtSize - 1,
|
||||||
Key: key,
|
Key: key,
|
||||||
AuxReq: auxHeader,
|
AuxReq: auxHeader,
|
||||||
}}
|
}}
|
||||||
// Send the proof request and verify the response
|
// Send the proof request and verify the response
|
||||||
switch protocol {
|
switch protocol {
|
||||||
case 1:
|
case 1:
|
||||||
cost := peer.GetRequestCost(GetHeaderProofsMsg, len(requestsV1))
|
cost := server.tPeer.GetRequestCost(GetHeaderProofsMsg, len(requestsV1))
|
||||||
sendRequest(peer.app, GetHeaderProofsMsg, 42, cost, requestsV1)
|
sendRequest(server.tPeer.app, GetHeaderProofsMsg, 42, cost, requestsV1)
|
||||||
if err := expectResponse(peer.app, HeaderProofsMsg, 42, testBufLimit, proofsV1); err != nil {
|
if err := expectResponse(server.tPeer.app, HeaderProofsMsg, 42, testBufLimit, proofsV1); err != nil {
|
||||||
t.Errorf("proofs mismatch: %v", err)
|
t.Errorf("proofs mismatch: %v", err)
|
||||||
}
|
}
|
||||||
case 2:
|
case 2:
|
||||||
cost := peer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2))
|
cost := server.tPeer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2))
|
||||||
sendRequest(peer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2)
|
sendRequest(server.tPeer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2)
|
||||||
if err := expectResponse(peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil {
|
if err := expectResponse(server.tPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil {
|
||||||
t.Errorf("proofs mismatch: %v", err)
|
t.Errorf("proofs mismatch: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -448,24 +446,31 @@ func testGetCHTProofs(t *testing.T, protocol int) {
|
|||||||
|
|
||||||
// Tests that bloombits proofs can be correctly retrieved.
|
// Tests that bloombits proofs can be correctly retrieved.
|
||||||
func TestGetBloombitsProofs(t *testing.T) {
|
func TestGetBloombitsProofs(t *testing.T) {
|
||||||
// Assemble the test environment
|
config := light.TestServerIndexerConfig
|
||||||
db := ethdb.NewMemDatabase()
|
|
||||||
pm := newTestProtocolManagerMust(t, false, light.BloomTrieFrequency+256, testChainGen, nil, nil, db)
|
|
||||||
bc := pm.blockchain.(*core.BlockChain)
|
|
||||||
peer, _ := newTestPeer(t, "peer", 2, pm, true)
|
|
||||||
defer peer.close()
|
|
||||||
|
|
||||||
// Wait a while for the bloombits indexer to process the new headers
|
waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) {
|
||||||
time.Sleep(100 * time.Millisecond * time.Duration(light.BloomTrieFrequency/4096)) // Chain indexer throttling
|
for {
|
||||||
time.Sleep(250 * time.Millisecond) // CI tester slack
|
cs, _, _ := cIndexer.Sections()
|
||||||
|
bs, _, _ := bIndexer.Sections()
|
||||||
|
bts, _, _ := btIndexer.Sections()
|
||||||
|
if cs >= 8 && bs >= 8 && bts >= 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
server, tearDown := newServerEnv(t, int(config.BloomTrieSize+config.BloomTrieConfirms), 2, waitIndexers)
|
||||||
|
defer tearDown()
|
||||||
|
bc := server.pm.blockchain.(*core.BlockChain)
|
||||||
|
|
||||||
// Request and verify each bit of the bloom bits proofs
|
// Request and verify each bit of the bloom bits proofs
|
||||||
for bit := 0; bit < 2048; bit++ {
|
for bit := 0; bit < 2048; bit++ {
|
||||||
// Assemble therequest and proofs for the bloombits
|
// Assemble the request and proofs for the bloombits
|
||||||
key := make([]byte, 10)
|
key := make([]byte, 10)
|
||||||
|
|
||||||
binary.BigEndian.PutUint16(key[:2], uint16(bit))
|
binary.BigEndian.PutUint16(key[:2], uint16(bit))
|
||||||
binary.BigEndian.PutUint64(key[2:], uint64(light.BloomTrieFrequency))
|
// Only the first bloom section has data.
|
||||||
|
binary.BigEndian.PutUint64(key[2:], 0)
|
||||||
|
|
||||||
requests := []HelperTrieReq{{
|
requests := []HelperTrieReq{{
|
||||||
Type: htBloomBits,
|
Type: htBloomBits,
|
||||||
@ -474,14 +479,14 @@ func TestGetBloombitsProofs(t *testing.T) {
|
|||||||
}}
|
}}
|
||||||
var proofs HelperTrieResps
|
var proofs HelperTrieResps
|
||||||
|
|
||||||
root := light.GetBloomTrieRoot(db, 0, bc.GetHeaderByNumber(light.BloomTrieFrequency-1).Hash())
|
root := light.GetBloomTrieRoot(server.db, 0, bc.GetHeaderByNumber(config.BloomTrieSize-1).Hash())
|
||||||
trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(db, light.BloomTrieTablePrefix)))
|
trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.BloomTrieTablePrefix)))
|
||||||
trie.Prove(key, 0, &proofs.Proofs)
|
trie.Prove(key, 0, &proofs.Proofs)
|
||||||
|
|
||||||
// Send the proof request and verify the response
|
// Send the proof request and verify the response
|
||||||
cost := peer.GetRequestCost(GetHelperTrieProofsMsg, len(requests))
|
cost := server.tPeer.GetRequestCost(GetHelperTrieProofsMsg, len(requests))
|
||||||
sendRequest(peer.app, GetHelperTrieProofsMsg, 42, cost, requests)
|
sendRequest(server.tPeer.app, GetHelperTrieProofsMsg, 42, cost, requests)
|
||||||
if err := expectResponse(peer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil {
|
if err := expectResponse(server.tPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil {
|
||||||
t.Errorf("bit %d: proofs mismatch: %v", bit, err)
|
t.Errorf("bit %d: proofs mismatch: %v", bit, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/consensus/ethash"
|
"github.com/ethereum/go-ethereum/consensus/ethash"
|
||||||
@ -123,6 +124,15 @@ func testChainGen(i int, block *core.BlockGen) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testIndexers creates a set of indexers with specified params for testing purpose.
|
||||||
|
func testIndexers(db ethdb.Database, odr light.OdrBackend, iConfig *light.IndexerConfig) (*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer) {
|
||||||
|
chtIndexer := light.NewChtIndexer(db, odr, iConfig.ChtSize, iConfig.ChtConfirms)
|
||||||
|
bloomIndexer := eth.NewBloomIndexer(db, iConfig.BloomSize, iConfig.BloomConfirms)
|
||||||
|
bloomTrieIndexer := light.NewBloomTrieIndexer(db, odr, iConfig.BloomSize, iConfig.BloomTrieSize)
|
||||||
|
bloomIndexer.AddChildIndexer(bloomTrieIndexer)
|
||||||
|
return chtIndexer, bloomIndexer, bloomTrieIndexer
|
||||||
|
}
|
||||||
|
|
||||||
func testRCL() RequestCostList {
|
func testRCL() RequestCostList {
|
||||||
cl := make(RequestCostList, len(reqList))
|
cl := make(RequestCostList, len(reqList))
|
||||||
for i, code := range reqList {
|
for i, code := range reqList {
|
||||||
@ -134,9 +144,9 @@ func testRCL() RequestCostList {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newTestProtocolManager creates a new protocol manager for testing purposes,
|
// newTestProtocolManager creates a new protocol manager for testing purposes,
|
||||||
// with the given number of blocks already known, and potential notification
|
// with the given number of blocks already known, potential notification
|
||||||
// channels for different events.
|
// channels for different events and relative chain indexers array.
|
||||||
func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) (*ProtocolManager, error) {
|
func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) (*ProtocolManager, error) {
|
||||||
var (
|
var (
|
||||||
evmux = new(event.TypeMux)
|
evmux = new(event.TypeMux)
|
||||||
engine = ethash.NewFaker()
|
engine = ethash.NewFaker()
|
||||||
@ -155,16 +165,6 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
|
|||||||
chain, _ = light.NewLightChain(odr, gspec.Config, engine)
|
chain, _ = light.NewLightChain(odr, gspec.Config, engine)
|
||||||
} else {
|
} else {
|
||||||
blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
|
blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
|
||||||
|
|
||||||
chtIndexer := light.NewChtIndexer(db, false, nil)
|
|
||||||
chtIndexer.Start(blockchain)
|
|
||||||
|
|
||||||
bbtIndexer := light.NewBloomTrieIndexer(db, false, nil)
|
|
||||||
|
|
||||||
bloomIndexer := eth.NewBloomIndexer(db, params.BloomBitsBlocks, light.HelperTrieProcessConfirmations)
|
|
||||||
bloomIndexer.AddChildIndexer(bbtIndexer)
|
|
||||||
bloomIndexer.Start(blockchain)
|
|
||||||
|
|
||||||
gchain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator)
|
gchain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator)
|
||||||
if _, err := blockchain.InsertChain(gchain); err != nil {
|
if _, err := blockchain.InsertChain(gchain); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -172,7 +172,11 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
|
|||||||
chain = blockchain
|
chain = blockchain
|
||||||
}
|
}
|
||||||
|
|
||||||
pm, err := NewProtocolManager(gspec.Config, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup))
|
indexConfig := light.TestServerIndexerConfig
|
||||||
|
if lightSync {
|
||||||
|
indexConfig = light.TestClientIndexerConfig
|
||||||
|
}
|
||||||
|
pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -193,11 +197,11 @@ func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *cor
|
|||||||
}
|
}
|
||||||
|
|
||||||
// newTestProtocolManagerMust creates a new protocol manager for testing purposes,
|
// newTestProtocolManagerMust creates a new protocol manager for testing purposes,
|
||||||
// with the given number of blocks already known, and potential notification
|
// with the given number of blocks already known, potential notification
|
||||||
// channels for different events. In case of an error, the constructor force-
|
// channels for different events and relative chain indexers array. In case of an error, the constructor force-
|
||||||
// fails the test.
|
// fails the test.
|
||||||
func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), peers *peerSet, odr *LesOdr, db ethdb.Database) *ProtocolManager {
|
func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) *ProtocolManager {
|
||||||
pm, err := newTestProtocolManager(lightSync, blocks, generator, peers, odr, db)
|
pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create protocol manager: %v", err)
|
t.Fatalf("Failed to create protocol manager: %v", err)
|
||||||
}
|
}
|
||||||
@ -320,3 +324,122 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNu
|
|||||||
func (p *testPeer) close() {
|
func (p *testPeer) close() {
|
||||||
p.app.Close()
|
p.app.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestEntity represents a network entity for testing with necessary auxiliary fields.
|
||||||
|
type TestEntity struct {
|
||||||
|
db ethdb.Database
|
||||||
|
rPeer *peer
|
||||||
|
tPeer *testPeer
|
||||||
|
peers *peerSet
|
||||||
|
pm *ProtocolManager
|
||||||
|
// Indexers
|
||||||
|
chtIndexer *core.ChainIndexer
|
||||||
|
bloomIndexer *core.ChainIndexer
|
||||||
|
bloomTrieIndexer *core.ChainIndexer
|
||||||
|
}
|
||||||
|
|
||||||
|
// newServerEnv creates a server testing environment with a connected test peer for testing purpose.
|
||||||
|
func newServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer)) (*TestEntity, func()) {
|
||||||
|
db := ethdb.NewMemDatabase()
|
||||||
|
cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig)
|
||||||
|
|
||||||
|
pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, nil, db)
|
||||||
|
peer, _ := newTestPeer(t, "peer", protocol, pm, true)
|
||||||
|
|
||||||
|
cIndexer.Start(pm.blockchain.(*core.BlockChain))
|
||||||
|
bIndexer.Start(pm.blockchain.(*core.BlockChain))
|
||||||
|
|
||||||
|
// Wait until indexers generate enough index data.
|
||||||
|
if waitIndexers != nil {
|
||||||
|
waitIndexers(cIndexer, bIndexer, btIndexer)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TestEntity{
|
||||||
|
db: db,
|
||||||
|
tPeer: peer,
|
||||||
|
pm: pm,
|
||||||
|
chtIndexer: cIndexer,
|
||||||
|
bloomIndexer: bIndexer,
|
||||||
|
bloomTrieIndexer: btIndexer,
|
||||||
|
}, func() {
|
||||||
|
peer.close()
|
||||||
|
// Note bloom trie indexer will be closed by it parent recursively.
|
||||||
|
cIndexer.Close()
|
||||||
|
bIndexer.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newClientServerEnv creates a client/server arch environment with a connected les server and light client pair
|
||||||
|
// for testing purpose.
|
||||||
|
func newClientServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer), newPeer bool) (*TestEntity, *TestEntity, func()) {
|
||||||
|
db, ldb := ethdb.NewMemDatabase(), ethdb.NewMemDatabase()
|
||||||
|
peers, lPeers := newPeerSet(), newPeerSet()
|
||||||
|
|
||||||
|
dist := newRequestDistributor(lPeers, make(chan struct{}))
|
||||||
|
rm := newRetrieveManager(lPeers, dist, nil)
|
||||||
|
odr := NewLesOdr(ldb, light.TestClientIndexerConfig, rm)
|
||||||
|
|
||||||
|
cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig)
|
||||||
|
lcIndexer, lbIndexer, lbtIndexer := testIndexers(ldb, odr, light.TestClientIndexerConfig)
|
||||||
|
odr.SetIndexers(lcIndexer, lbtIndexer, lbIndexer)
|
||||||
|
|
||||||
|
pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, peers, db)
|
||||||
|
lpm := newTestProtocolManagerMust(t, true, 0, nil, odr, lPeers, ldb)
|
||||||
|
|
||||||
|
startIndexers := func(clientMode bool, pm *ProtocolManager) {
|
||||||
|
if clientMode {
|
||||||
|
lcIndexer.Start(pm.blockchain.(*light.LightChain))
|
||||||
|
lbIndexer.Start(pm.blockchain.(*light.LightChain))
|
||||||
|
} else {
|
||||||
|
cIndexer.Start(pm.blockchain.(*core.BlockChain))
|
||||||
|
bIndexer.Start(pm.blockchain.(*core.BlockChain))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
startIndexers(false, pm)
|
||||||
|
startIndexers(true, lpm)
|
||||||
|
|
||||||
|
// Execute wait until function if it is specified.
|
||||||
|
if waitIndexers != nil {
|
||||||
|
waitIndexers(cIndexer, bIndexer, btIndexer)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
peer, lPeer *peer
|
||||||
|
err1, err2 <-chan error
|
||||||
|
)
|
||||||
|
if newPeer {
|
||||||
|
peer, err1, lPeer, err2 = newTestPeerPair("peer", protocol, pm, lpm)
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
case err := <-err1:
|
||||||
|
t.Fatalf("peer 1 handshake error: %v", err)
|
||||||
|
case err := <-err2:
|
||||||
|
t.Fatalf("peer 2 handshake error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &TestEntity{
|
||||||
|
db: db,
|
||||||
|
pm: pm,
|
||||||
|
rPeer: peer,
|
||||||
|
peers: peers,
|
||||||
|
chtIndexer: cIndexer,
|
||||||
|
bloomIndexer: bIndexer,
|
||||||
|
bloomTrieIndexer: btIndexer,
|
||||||
|
}, &TestEntity{
|
||||||
|
db: ldb,
|
||||||
|
pm: lpm,
|
||||||
|
rPeer: lPeer,
|
||||||
|
peers: lPeers,
|
||||||
|
chtIndexer: lcIndexer,
|
||||||
|
bloomIndexer: lbIndexer,
|
||||||
|
bloomTrieIndexer: lbtIndexer,
|
||||||
|
}, func() {
|
||||||
|
// Note bloom trie indexers will be closed by their parents recursively.
|
||||||
|
cIndexer.Close()
|
||||||
|
bIndexer.Close()
|
||||||
|
lcIndexer.Close()
|
||||||
|
lbIndexer.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
15
les/odr.go
15
les/odr.go
@ -28,16 +28,18 @@ import (
|
|||||||
// LesOdr implements light.OdrBackend
|
// LesOdr implements light.OdrBackend
|
||||||
type LesOdr struct {
|
type LesOdr struct {
|
||||||
db ethdb.Database
|
db ethdb.Database
|
||||||
|
indexerConfig *light.IndexerConfig
|
||||||
chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer
|
chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer
|
||||||
retriever *retrieveManager
|
retriever *retrieveManager
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLesOdr(db ethdb.Database, retriever *retrieveManager) *LesOdr {
|
func NewLesOdr(db ethdb.Database, config *light.IndexerConfig, retriever *retrieveManager) *LesOdr {
|
||||||
return &LesOdr{
|
return &LesOdr{
|
||||||
db: db,
|
db: db,
|
||||||
retriever: retriever,
|
indexerConfig: config,
|
||||||
stop: make(chan struct{}),
|
retriever: retriever,
|
||||||
|
stop: make(chan struct{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,6 +75,11 @@ func (odr *LesOdr) BloomIndexer() *core.ChainIndexer {
|
|||||||
return odr.bloomIndexer
|
return odr.bloomIndexer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IndexerConfig returns the indexer config.
|
||||||
|
func (odr *LesOdr) IndexerConfig() *light.IndexerConfig {
|
||||||
|
return odr.indexerConfig
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MsgBlockBodies = iota
|
MsgBlockBodies = iota
|
||||||
MsgCode
|
MsgCode
|
||||||
|
@ -365,7 +365,7 @@ func (r *ChtRequest) CanSend(peer *peer) bool {
|
|||||||
peer.lock.RLock()
|
peer.lock.RLock()
|
||||||
defer peer.lock.RUnlock()
|
defer peer.lock.RUnlock()
|
||||||
|
|
||||||
return peer.headInfo.Number >= light.HelperTrieConfirmations && r.ChtNum <= (peer.headInfo.Number-light.HelperTrieConfirmations)/light.CHTFrequencyClient
|
return peer.headInfo.Number >= r.Config.ChtConfirms && r.ChtNum <= (peer.headInfo.Number-r.Config.ChtConfirms)/r.Config.ChtSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
||||||
@ -379,7 +379,21 @@ func (r *ChtRequest) Request(reqID uint64, peer *peer) error {
|
|||||||
Key: encNum[:],
|
Key: encNum[:],
|
||||||
AuxReq: auxHeader,
|
AuxReq: auxHeader,
|
||||||
}
|
}
|
||||||
return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []HelperTrieReq{req})
|
switch peer.version {
|
||||||
|
case lpv1:
|
||||||
|
var reqsV1 ChtReq
|
||||||
|
if req.Type != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 {
|
||||||
|
return fmt.Errorf("Request invalid in LES/1 mode")
|
||||||
|
}
|
||||||
|
blockNum := binary.BigEndian.Uint64(req.Key)
|
||||||
|
// convert HelperTrie request to old CHT request
|
||||||
|
reqsV1 = ChtReq{ChtNum: (req.TrieIdx + 1) * (r.Config.ChtSize / r.Config.PairChtSize), BlockNum: blockNum, FromLevel: req.FromLevel}
|
||||||
|
return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []ChtReq{reqsV1})
|
||||||
|
case lpv2:
|
||||||
|
return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []HelperTrieReq{req})
|
||||||
|
default:
|
||||||
|
panic(nil)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Valid processes an ODR request reply message from the LES network
|
// Valid processes an ODR request reply message from the LES network
|
||||||
@ -484,7 +498,7 @@ func (r *BloomRequest) CanSend(peer *peer) bool {
|
|||||||
if peer.version < lpv2 {
|
if peer.version < lpv2 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return peer.headInfo.Number >= light.HelperTrieConfirmations && r.BloomTrieNum <= (peer.headInfo.Number-light.HelperTrieConfirmations)/light.BloomTrieFrequency
|
return peer.headInfo.Number >= r.Config.BloomTrieConfirms && r.BloomTrieNum <= (peer.headInfo.Number-r.Config.BloomTrieConfirms)/r.Config.BloomTrieSize
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
// Request sends an ODR request to the LES network (implementation of LesOdrRequest)
|
||||||
|
@ -30,7 +30,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/core/vm"
|
"github.com/ethereum/go-ethereum/core/vm"
|
||||||
"github.com/ethereum/go-ethereum/eth"
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/light"
|
"github.com/ethereum/go-ethereum/light"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
@ -160,36 +159,21 @@ func odrContractCall(ctx context.Context, db ethdb.Database, config *params.Chai
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// testOdr tests odr requests whose validation guaranteed by block headers.
|
||||||
func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
|
func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
|
||||||
// Assemble the test environment
|
// Assemble the test environment
|
||||||
peers := newPeerSet()
|
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true)
|
||||||
dist := newRequestDistributor(peers, make(chan struct{}))
|
defer tearDown()
|
||||||
rm := newRetrieveManager(peers, dist, nil)
|
client.pm.synchronise(client.rPeer)
|
||||||
db := ethdb.NewMemDatabase()
|
|
||||||
ldb := ethdb.NewMemDatabase()
|
|
||||||
odr := NewLesOdr(ldb, rm)
|
|
||||||
odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations))
|
|
||||||
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
|
|
||||||
lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
|
|
||||||
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
case err := <-err1:
|
|
||||||
t.Fatalf("peer 1 handshake error: %v", err)
|
|
||||||
case err := <-err2:
|
|
||||||
t.Fatalf("peer 1 handshake error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lpm.synchronise(lpeer)
|
|
||||||
|
|
||||||
test := func(expFail uint64) {
|
test := func(expFail uint64) {
|
||||||
for i := uint64(0); i <= pm.blockchain.CurrentHeader().Number.Uint64(); i++ {
|
for i := uint64(0); i <= server.pm.blockchain.CurrentHeader().Number.Uint64(); i++ {
|
||||||
bhash := rawdb.ReadCanonicalHash(db, i)
|
bhash := rawdb.ReadCanonicalHash(server.db, i)
|
||||||
b1 := fn(light.NoOdr, db, pm.chainConfig, pm.blockchain.(*core.BlockChain), nil, bhash)
|
b1 := fn(light.NoOdr, server.db, server.pm.chainConfig, server.pm.blockchain.(*core.BlockChain), nil, bhash)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
b2 := fn(ctx, ldb, lpm.chainConfig, nil, lpm.blockchain.(*light.LightChain), bhash)
|
b2 := fn(ctx, client.db, client.pm.chainConfig, nil, client.pm.blockchain.(*light.LightChain), bhash)
|
||||||
|
|
||||||
eq := bytes.Equal(b1, b2)
|
eq := bytes.Equal(b1, b2)
|
||||||
exp := i < expFail
|
exp := i < expFail
|
||||||
@ -201,21 +185,20 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// temporarily remove peer to test odr fails
|
// temporarily remove peer to test odr fails
|
||||||
// expect retrievals to fail (except genesis block) without a les peer
|
// expect retrievals to fail (except genesis block) without a les peer
|
||||||
peers.Unregister(lpeer.id)
|
client.peers.Unregister(client.rPeer.id)
|
||||||
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
||||||
test(expFail)
|
test(expFail)
|
||||||
// expect all retrievals to pass
|
// expect all retrievals to pass
|
||||||
peers.Register(lpeer)
|
client.peers.Register(client.rPeer)
|
||||||
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
||||||
lpeer.lock.Lock()
|
client.peers.lock.Lock()
|
||||||
lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
|
client.rPeer.hasBlock = func(common.Hash, uint64) bool { return true }
|
||||||
lpeer.lock.Unlock()
|
client.peers.lock.Unlock()
|
||||||
test(5)
|
test(5)
|
||||||
// still expect all retrievals to pass, now data should be cached locally
|
// still expect all retrievals to pass, now data should be cached locally
|
||||||
peers.Unregister(lpeer.id)
|
client.peers.Unregister(client.rPeer.id)
|
||||||
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
||||||
test(5)
|
test(5)
|
||||||
}
|
}
|
||||||
|
30
les/peer.go
30
les/peer.go
@ -19,7 +19,6 @@ package les
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"encoding/binary"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
@ -36,9 +35,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
errClosed = errors.New("peer set is closed")
|
errClosed = errors.New("peer set is closed")
|
||||||
errAlreadyRegistered = errors.New("peer is already registered")
|
errAlreadyRegistered = errors.New("peer is already registered")
|
||||||
errNotRegistered = errors.New("peer is not registered")
|
errNotRegistered = errors.New("peer is not registered")
|
||||||
|
errInvalidHelpTrieReq = errors.New("invalid help trie request")
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
|
const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam)
|
||||||
@ -284,21 +284,21 @@ func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node.
|
// RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node.
|
||||||
func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, reqs []HelperTrieReq) error {
|
func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, data interface{}) error {
|
||||||
p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
|
|
||||||
switch p.version {
|
switch p.version {
|
||||||
case lpv1:
|
case lpv1:
|
||||||
reqsV1 := make([]ChtReq, len(reqs))
|
reqs, ok := data.([]ChtReq)
|
||||||
for i, req := range reqs {
|
if !ok {
|
||||||
if req.Type != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 {
|
return errInvalidHelpTrieReq
|
||||||
return fmt.Errorf("Request invalid in LES/1 mode")
|
|
||||||
}
|
|
||||||
blockNum := binary.BigEndian.Uint64(req.Key)
|
|
||||||
// convert HelperTrie request to old CHT request
|
|
||||||
reqsV1[i] = ChtReq{ChtNum: (req.TrieIdx + 1) * (light.CHTFrequencyClient / light.CHTFrequencyServer), BlockNum: blockNum, FromLevel: req.FromLevel}
|
|
||||||
}
|
}
|
||||||
return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqsV1)
|
p.Log().Debug("Fetching batch of header proofs", "count", len(reqs))
|
||||||
|
return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs)
|
||||||
case lpv2:
|
case lpv2:
|
||||||
|
reqs, ok := data.([]HelperTrieReq)
|
||||||
|
if !ok {
|
||||||
|
return errInvalidHelpTrieReq
|
||||||
|
}
|
||||||
|
p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs))
|
||||||
return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs)
|
return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs)
|
||||||
default:
|
default:
|
||||||
panic(nil)
|
panic(nil)
|
||||||
|
@ -24,7 +24,6 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/eth"
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/light"
|
"github.com/ethereum/go-ethereum/light"
|
||||||
)
|
)
|
||||||
@ -84,35 +83,17 @@ func tfCodeAccess(db ethdb.Database, bhash common.Hash, num uint64) light.OdrReq
|
|||||||
|
|
||||||
func testAccess(t *testing.T, protocol int, fn accessTestFn) {
|
func testAccess(t *testing.T, protocol int, fn accessTestFn) {
|
||||||
// Assemble the test environment
|
// Assemble the test environment
|
||||||
peers := newPeerSet()
|
server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true)
|
||||||
dist := newRequestDistributor(peers, make(chan struct{}))
|
defer tearDown()
|
||||||
rm := newRetrieveManager(peers, dist, nil)
|
client.pm.synchronise(client.rPeer)
|
||||||
db := ethdb.NewMemDatabase()
|
|
||||||
ldb := ethdb.NewMemDatabase()
|
|
||||||
odr := NewLesOdr(ldb, rm)
|
|
||||||
odr.SetIndexers(light.NewChtIndexer(db, true, nil), light.NewBloomTrieIndexer(db, true, nil), eth.NewBloomIndexer(db, light.BloomTrieFrequency, light.HelperTrieConfirmations))
|
|
||||||
|
|
||||||
pm := newTestProtocolManagerMust(t, false, 4, testChainGen, nil, nil, db)
|
|
||||||
lpm := newTestProtocolManagerMust(t, true, 0, nil, peers, odr, ldb)
|
|
||||||
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
|
|
||||||
select {
|
|
||||||
case <-time.After(time.Millisecond * 100):
|
|
||||||
case err := <-err1:
|
|
||||||
t.Fatalf("peer 1 handshake error: %v", err)
|
|
||||||
case err := <-err2:
|
|
||||||
t.Fatalf("peer 1 handshake error: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
lpm.synchronise(lpeer)
|
|
||||||
|
|
||||||
test := func(expFail uint64) {
|
test := func(expFail uint64) {
|
||||||
for i := uint64(0); i <= pm.blockchain.CurrentHeader().Number.Uint64(); i++ {
|
for i := uint64(0); i <= server.pm.blockchain.CurrentHeader().Number.Uint64(); i++ {
|
||||||
bhash := rawdb.ReadCanonicalHash(db, i)
|
bhash := rawdb.ReadCanonicalHash(server.db, i)
|
||||||
if req := fn(ldb, bhash, i); req != nil {
|
if req := fn(client.db, bhash, i); req != nil {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
err := client.pm.odr.Retrieve(ctx, req)
|
||||||
err := odr.Retrieve(ctx, req)
|
|
||||||
got := err == nil
|
got := err == nil
|
||||||
exp := i < expFail
|
exp := i < expFail
|
||||||
if exp && !got {
|
if exp && !got {
|
||||||
@ -126,16 +107,16 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// temporarily remove peer to test odr fails
|
// temporarily remove peer to test odr fails
|
||||||
peers.Unregister(lpeer.id)
|
client.peers.Unregister(client.rPeer.id)
|
||||||
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
||||||
// expect retrievals to fail (except genesis block) without a les peer
|
// expect retrievals to fail (except genesis block) without a les peer
|
||||||
test(0)
|
test(0)
|
||||||
|
|
||||||
peers.Register(lpeer)
|
client.peers.Register(client.rPeer)
|
||||||
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed
|
||||||
lpeer.lock.Lock()
|
client.rPeer.lock.Lock()
|
||||||
lpeer.hasBlock = func(common.Hash, uint64) bool { return true }
|
client.rPeer.hasBlock = func(common.Hash, uint64) bool { return true }
|
||||||
lpeer.lock.Unlock()
|
client.rPeer.lock.Unlock()
|
||||||
// expect all retrievals to pass
|
// expect all retrievals to pass
|
||||||
test(5)
|
test(5)
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/p2p"
|
"github.com/ethereum/go-ethereum/p2p"
|
||||||
"github.com/ethereum/go-ethereum/p2p/discv5"
|
"github.com/ethereum/go-ethereum/p2p/discv5"
|
||||||
|
"github.com/ethereum/go-ethereum/params"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -50,7 +51,7 @@ type LesServer struct {
|
|||||||
|
|
||||||
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
|
func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
|
||||||
quitSync := make(chan struct{})
|
quitSync := make(chan struct{})
|
||||||
pm, err := NewProtocolManager(eth.BlockChain().Config(), false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup))
|
pm, err := NewProtocolManager(eth.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -64,8 +65,9 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
|
|||||||
lesCommons: lesCommons{
|
lesCommons: lesCommons{
|
||||||
config: config,
|
config: config,
|
||||||
chainDb: eth.ChainDb(),
|
chainDb: eth.ChainDb(),
|
||||||
chtIndexer: light.NewChtIndexer(eth.ChainDb(), false, nil),
|
iConfig: light.DefaultServerIndexerConfig,
|
||||||
bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), false, nil),
|
chtIndexer: light.NewChtIndexer(eth.ChainDb(), nil, params.CHTFrequencyServer, params.HelperTrieProcessConfirmations),
|
||||||
|
bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency),
|
||||||
protocolManager: pm,
|
protocolManager: pm,
|
||||||
},
|
},
|
||||||
quitSync: quitSync,
|
quitSync: quitSync,
|
||||||
@ -75,14 +77,14 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
|
|||||||
logger := log.New()
|
logger := log.New()
|
||||||
|
|
||||||
chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility
|
chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility
|
||||||
chtV2SectionCount := chtV1SectionCount / (light.CHTFrequencyClient / light.CHTFrequencyServer)
|
chtV2SectionCount := chtV1SectionCount / (params.CHTFrequencyClient / params.CHTFrequencyServer)
|
||||||
if chtV2SectionCount != 0 {
|
if chtV2SectionCount != 0 {
|
||||||
// convert to LES/2 section
|
// convert to LES/2 section
|
||||||
chtLastSection := chtV2SectionCount - 1
|
chtLastSection := chtV2SectionCount - 1
|
||||||
// convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead
|
// convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead
|
||||||
chtLastSectionV1 := (chtLastSection+1)*(light.CHTFrequencyClient/light.CHTFrequencyServer) - 1
|
chtLastSectionV1 := (chtLastSection+1)*(params.CHTFrequencyClient/params.CHTFrequencyServer) - 1
|
||||||
chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1)
|
chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1)
|
||||||
chtRoot := light.GetChtV2Root(pm.chainDb, chtLastSection, chtSectionHead)
|
chtRoot := light.GetChtRoot(pm.chainDb, chtLastSectionV1, chtSectionHead)
|
||||||
logger.Info("Loaded CHT", "section", chtLastSection, "head", chtSectionHead, "root", chtRoot)
|
logger.Info("Loaded CHT", "section", chtLastSection, "head", chtSectionHead, "root", chtRoot)
|
||||||
}
|
}
|
||||||
bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections()
|
bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections()
|
||||||
|
@ -48,6 +48,7 @@ var (
|
|||||||
// interface. It only does header validation during chain insertion.
|
// interface. It only does header validation during chain insertion.
|
||||||
type LightChain struct {
|
type LightChain struct {
|
||||||
hc *core.HeaderChain
|
hc *core.HeaderChain
|
||||||
|
indexerConfig *IndexerConfig
|
||||||
chainDb ethdb.Database
|
chainDb ethdb.Database
|
||||||
odr OdrBackend
|
odr OdrBackend
|
||||||
chainFeed event.Feed
|
chainFeed event.Feed
|
||||||
@ -81,13 +82,14 @@ func NewLightChain(odr OdrBackend, config *params.ChainConfig, engine consensus.
|
|||||||
blockCache, _ := lru.New(blockCacheLimit)
|
blockCache, _ := lru.New(blockCacheLimit)
|
||||||
|
|
||||||
bc := &LightChain{
|
bc := &LightChain{
|
||||||
chainDb: odr.Database(),
|
chainDb: odr.Database(),
|
||||||
odr: odr,
|
indexerConfig: odr.IndexerConfig(),
|
||||||
quit: make(chan struct{}),
|
odr: odr,
|
||||||
bodyCache: bodyCache,
|
quit: make(chan struct{}),
|
||||||
bodyRLPCache: bodyRLPCache,
|
bodyCache: bodyCache,
|
||||||
blockCache: blockCache,
|
bodyRLPCache: bodyRLPCache,
|
||||||
engine: engine,
|
blockCache: blockCache,
|
||||||
|
engine: engine,
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
bc.hc, err = core.NewHeaderChain(odr.Database(), config, bc.engine, bc.getProcInterrupt)
|
bc.hc, err = core.NewHeaderChain(odr.Database(), config, bc.engine, bc.getProcInterrupt)
|
||||||
@ -128,7 +130,7 @@ func (self *LightChain) addTrustedCheckpoint(cp TrustedCheckpoint) {
|
|||||||
if self.odr.BloomIndexer() != nil {
|
if self.odr.BloomIndexer() != nil {
|
||||||
self.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
|
self.odr.BloomIndexer().AddKnownSectionHead(cp.SectionIdx, cp.SectionHead)
|
||||||
}
|
}
|
||||||
log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*CHTFrequencyClient-1, "hash", cp.SectionHead)
|
log.Info("Added trusted checkpoint", "chain", cp.name, "block", (cp.SectionIdx+1)*self.indexerConfig.ChtSize-1, "hash", cp.SectionHead)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *LightChain) getProcInterrupt() bool {
|
func (self *LightChain) getProcInterrupt() bool {
|
||||||
@ -472,7 +474,7 @@ func (self *LightChain) SyncCht(ctx context.Context) bool {
|
|||||||
head := self.CurrentHeader().Number.Uint64()
|
head := self.CurrentHeader().Number.Uint64()
|
||||||
sections, _, _ := self.odr.ChtIndexer().Sections()
|
sections, _, _ := self.odr.ChtIndexer().Sections()
|
||||||
|
|
||||||
latest := sections*CHTFrequencyClient - 1
|
latest := sections*self.indexerConfig.ChtSize - 1
|
||||||
if clique := self.hc.Config().Clique; clique != nil {
|
if clique := self.hc.Config().Clique; clique != nil {
|
||||||
latest -= latest % clique.Epoch // epoch snapshot for clique
|
latest -= latest % clique.Epoch // epoch snapshot for clique
|
||||||
}
|
}
|
||||||
|
@ -55,7 +55,7 @@ func newCanonical(n int) (ethdb.Database, *LightChain, error) {
|
|||||||
db := ethdb.NewMemDatabase()
|
db := ethdb.NewMemDatabase()
|
||||||
gspec := core.Genesis{Config: params.TestChainConfig}
|
gspec := core.Genesis{Config: params.TestChainConfig}
|
||||||
genesis := gspec.MustCommit(db)
|
genesis := gspec.MustCommit(db)
|
||||||
blockchain, _ := NewLightChain(&dummyOdr{db: db}, gspec.Config, ethash.NewFaker())
|
blockchain, _ := NewLightChain(&dummyOdr{db: db, indexerConfig: TestClientIndexerConfig}, gspec.Config, ethash.NewFaker())
|
||||||
|
|
||||||
// Create and inject the requested chain
|
// Create and inject the requested chain
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
@ -265,7 +265,8 @@ func makeHeaderChainWithDiff(genesis *types.Block, d []int, seed byte) []*types.
|
|||||||
|
|
||||||
type dummyOdr struct {
|
type dummyOdr struct {
|
||||||
OdrBackend
|
OdrBackend
|
||||||
db ethdb.Database
|
db ethdb.Database
|
||||||
|
indexerConfig *IndexerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (odr *dummyOdr) Database() ethdb.Database {
|
func (odr *dummyOdr) Database() ethdb.Database {
|
||||||
@ -276,6 +277,10 @@ func (odr *dummyOdr) Retrieve(ctx context.Context, req OdrRequest) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (odr *dummyOdr) IndexerConfig() *IndexerConfig {
|
||||||
|
return odr.indexerConfig
|
||||||
|
}
|
||||||
|
|
||||||
// Tests that reorganizing a long difficult chain after a short easy one
|
// Tests that reorganizing a long difficult chain after a short easy one
|
||||||
// overwrites the canonical numbers and links in the database.
|
// overwrites the canonical numbers and links in the database.
|
||||||
func TestReorgLongHeaders(t *testing.T) {
|
func TestReorgLongHeaders(t *testing.T) {
|
||||||
|
@ -44,6 +44,7 @@ type OdrBackend interface {
|
|||||||
BloomTrieIndexer() *core.ChainIndexer
|
BloomTrieIndexer() *core.ChainIndexer
|
||||||
BloomIndexer() *core.ChainIndexer
|
BloomIndexer() *core.ChainIndexer
|
||||||
Retrieve(ctx context.Context, req OdrRequest) error
|
Retrieve(ctx context.Context, req OdrRequest) error
|
||||||
|
IndexerConfig() *IndexerConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// OdrRequest is an interface for retrieval requests
|
// OdrRequest is an interface for retrieval requests
|
||||||
@ -136,6 +137,7 @@ func (req *ReceiptsRequest) StoreResult(db ethdb.Database) {
|
|||||||
// ChtRequest is the ODR request type for state/storage trie entries
|
// ChtRequest is the ODR request type for state/storage trie entries
|
||||||
type ChtRequest struct {
|
type ChtRequest struct {
|
||||||
OdrRequest
|
OdrRequest
|
||||||
|
Config *IndexerConfig
|
||||||
ChtNum, BlockNum uint64
|
ChtNum, BlockNum uint64
|
||||||
ChtRoot common.Hash
|
ChtRoot common.Hash
|
||||||
Header *types.Header
|
Header *types.Header
|
||||||
@ -155,6 +157,7 @@ func (req *ChtRequest) StoreResult(db ethdb.Database) {
|
|||||||
// BloomRequest is the ODR request type for retrieving bloom filters from a CHT structure
|
// BloomRequest is the ODR request type for retrieving bloom filters from a CHT structure
|
||||||
type BloomRequest struct {
|
type BloomRequest struct {
|
||||||
OdrRequest
|
OdrRequest
|
||||||
|
Config *IndexerConfig
|
||||||
BloomTrieNum uint64
|
BloomTrieNum uint64
|
||||||
BitIdx uint
|
BitIdx uint
|
||||||
SectionIdxList []uint64
|
SectionIdxList []uint64
|
||||||
@ -166,7 +169,7 @@ type BloomRequest struct {
|
|||||||
// StoreResult stores the retrieved data in local database
|
// StoreResult stores the retrieved data in local database
|
||||||
func (req *BloomRequest) StoreResult(db ethdb.Database) {
|
func (req *BloomRequest) StoreResult(db ethdb.Database) {
|
||||||
for i, sectionIdx := range req.SectionIdxList {
|
for i, sectionIdx := range req.SectionIdxList {
|
||||||
sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*BloomTrieFrequency-1)
|
sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*req.Config.BloomTrieSize-1)
|
||||||
// if we don't have the canonical hash stored for this section head number, we'll still store it under
|
// if we don't have the canonical hash stored for this section head number, we'll still store it under
|
||||||
// a key with a zero sectionHead. GetBloomBits will look there too if we still don't have the canonical
|
// a key with a zero sectionHead. GetBloomBits will look there too if we still don't have the canonical
|
||||||
// hash. In the unlikely case we've retrieved the section head hash since then, we'll just retrieve the
|
// hash. In the unlikely case we've retrieved the section head hash since then, we'll just retrieve the
|
||||||
|
@ -55,8 +55,9 @@ var (
|
|||||||
|
|
||||||
type testOdr struct {
|
type testOdr struct {
|
||||||
OdrBackend
|
OdrBackend
|
||||||
sdb, ldb ethdb.Database
|
indexerConfig *IndexerConfig
|
||||||
disable bool
|
sdb, ldb ethdb.Database
|
||||||
|
disable bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (odr *testOdr) Database() ethdb.Database {
|
func (odr *testOdr) Database() ethdb.Database {
|
||||||
@ -92,6 +93,10 @@ func (odr *testOdr) Retrieve(ctx context.Context, req OdrRequest) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (odr *testOdr) IndexerConfig() *IndexerConfig {
|
||||||
|
return odr.indexerConfig
|
||||||
|
}
|
||||||
|
|
||||||
type odrTestFn func(ctx context.Context, db ethdb.Database, bc *core.BlockChain, lc *LightChain, bhash common.Hash) ([]byte, error)
|
type odrTestFn func(ctx context.Context, db ethdb.Database, bc *core.BlockChain, lc *LightChain, bhash common.Hash) ([]byte, error)
|
||||||
|
|
||||||
func TestOdrGetBlockLes1(t *testing.T) { testChainOdr(t, 1, odrGetBlock) }
|
func TestOdrGetBlockLes1(t *testing.T) { testChainOdr(t, 1, odrGetBlock) }
|
||||||
@ -258,7 +263,7 @@ func testChainOdr(t *testing.T, protocol int, fn odrTestFn) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
odr := &testOdr{sdb: sdb, ldb: ldb}
|
odr := &testOdr{sdb: sdb, ldb: ldb, indexerConfig: TestClientIndexerConfig}
|
||||||
lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker())
|
lightchain, err := NewLightChain(odr, params.TestChainConfig, ethash.NewFullFaker())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -53,16 +53,16 @@ func GetHeaderByNumber(ctx context.Context, odr OdrBackend, number uint64) (*typ
|
|||||||
for chtCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) {
|
for chtCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) {
|
||||||
chtCount--
|
chtCount--
|
||||||
if chtCount > 0 {
|
if chtCount > 0 {
|
||||||
sectionHeadNum = chtCount*CHTFrequencyClient - 1
|
sectionHeadNum = chtCount*odr.IndexerConfig().ChtSize - 1
|
||||||
sectionHead = odr.ChtIndexer().SectionHead(chtCount - 1)
|
sectionHead = odr.ChtIndexer().SectionHead(chtCount - 1)
|
||||||
canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum)
|
canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if number >= chtCount*CHTFrequencyClient {
|
if number >= chtCount*odr.IndexerConfig().ChtSize {
|
||||||
return nil, ErrNoTrustedCht
|
return nil, ErrNoTrustedCht
|
||||||
}
|
}
|
||||||
r := &ChtRequest{ChtRoot: GetChtRoot(db, chtCount-1, sectionHead), ChtNum: chtCount - 1, BlockNum: number}
|
r := &ChtRequest{ChtRoot: GetChtRoot(db, chtCount-1, sectionHead), ChtNum: chtCount - 1, BlockNum: number, Config: odr.IndexerConfig()}
|
||||||
if err := odr.Retrieve(ctx, r); err != nil {
|
if err := odr.Retrieve(ctx, r); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -175,9 +175,9 @@ func GetBlockLogs(ctx context.Context, odr OdrBackend, hash common.Hash, number
|
|||||||
|
|
||||||
// GetBloomBits retrieves a batch of compressed bloomBits vectors belonging to the given bit index and section indexes
|
// GetBloomBits retrieves a batch of compressed bloomBits vectors belonging to the given bit index and section indexes
|
||||||
func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxList []uint64) ([][]byte, error) {
|
func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxList []uint64) ([][]byte, error) {
|
||||||
db := odr.Database()
|
|
||||||
result := make([][]byte, len(sectionIdxList))
|
|
||||||
var (
|
var (
|
||||||
|
db = odr.Database()
|
||||||
|
result = make([][]byte, len(sectionIdxList))
|
||||||
reqList []uint64
|
reqList []uint64
|
||||||
reqIdx []int
|
reqIdx []int
|
||||||
)
|
)
|
||||||
@ -193,7 +193,7 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi
|
|||||||
for bloomTrieCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) {
|
for bloomTrieCount > 0 && canonicalHash != sectionHead && canonicalHash != (common.Hash{}) {
|
||||||
bloomTrieCount--
|
bloomTrieCount--
|
||||||
if bloomTrieCount > 0 {
|
if bloomTrieCount > 0 {
|
||||||
sectionHeadNum = bloomTrieCount*BloomTrieFrequency - 1
|
sectionHeadNum = bloomTrieCount*odr.IndexerConfig().BloomTrieSize - 1
|
||||||
sectionHead = odr.BloomTrieIndexer().SectionHead(bloomTrieCount - 1)
|
sectionHead = odr.BloomTrieIndexer().SectionHead(bloomTrieCount - 1)
|
||||||
canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum)
|
canonicalHash = rawdb.ReadCanonicalHash(db, sectionHeadNum)
|
||||||
}
|
}
|
||||||
@ -201,7 +201,7 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, sectionIdx := range sectionIdxList {
|
for i, sectionIdx := range sectionIdxList {
|
||||||
sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*BloomTrieFrequency-1)
|
sectionHead := rawdb.ReadCanonicalHash(db, (sectionIdx+1)*odr.IndexerConfig().BloomSize-1)
|
||||||
// if we don't have the canonical hash stored for this section head number, we'll still look for
|
// if we don't have the canonical hash stored for this section head number, we'll still look for
|
||||||
// an entry with a zero sectionHead (we store it with zero section head too if we don't know it
|
// an entry with a zero sectionHead (we store it with zero section head too if we don't know it
|
||||||
// at the time of the retrieval)
|
// at the time of the retrieval)
|
||||||
@ -209,6 +209,7 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
result[i] = bloomBits
|
result[i] = bloomBits
|
||||||
} else {
|
} else {
|
||||||
|
// TODO(rjl493456442) Convert sectionIndex to BloomTrie relative index
|
||||||
if sectionIdx >= bloomTrieCount {
|
if sectionIdx >= bloomTrieCount {
|
||||||
return nil, ErrNoTrustedBloomTrie
|
return nil, ErrNoTrustedBloomTrie
|
||||||
}
|
}
|
||||||
@ -220,7 +221,8 @@ func GetBloomBits(ctx context.Context, odr OdrBackend, bitIdx uint, sectionIdxLi
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
r := &BloomRequest{BloomTrieRoot: GetBloomTrieRoot(db, bloomTrieCount-1, sectionHead), BloomTrieNum: bloomTrieCount - 1, BitIdx: bitIdx, SectionIdxList: reqList}
|
r := &BloomRequest{BloomTrieRoot: GetBloomTrieRoot(db, bloomTrieCount-1, sectionHead), BloomTrieNum: bloomTrieCount - 1,
|
||||||
|
BitIdx: bitIdx, SectionIdxList: reqList, Config: odr.IndexerConfig()}
|
||||||
if err := odr.Retrieve(ctx, r); err != nil {
|
if err := odr.Retrieve(ctx, r); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else {
|
} else {
|
||||||
|
@ -36,20 +36,75 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// IndexerConfig includes a set of configs for chain indexers.
|
||||||
// CHTFrequencyClient is the block frequency for creating CHTs on the client side.
|
type IndexerConfig struct {
|
||||||
CHTFrequencyClient = 32768
|
// The block frequency for creating CHTs.
|
||||||
|
ChtSize uint64
|
||||||
|
|
||||||
// CHTFrequencyServer is the block frequency for creating CHTs on the server side.
|
// A special auxiliary field represents client's chtsize for server config, otherwise represents server's chtsize.
|
||||||
// Eventually this can be merged back with the client version, but that requires a
|
PairChtSize uint64
|
||||||
// full database upgrade, so that should be left for a suitable moment.
|
|
||||||
CHTFrequencyServer = 4096
|
|
||||||
|
|
||||||
HelperTrieConfirmations = 2048 // number of confirmations before a server is expected to have the given HelperTrie available
|
// The number of confirmations needed to generate/accept a canonical hash help trie.
|
||||||
HelperTrieProcessConfirmations = 256 // number of confirmations before a HelperTrie is generated
|
ChtConfirms uint64
|
||||||
|
|
||||||
|
// The block frequency for creating new bloom bits.
|
||||||
|
BloomSize uint64
|
||||||
|
|
||||||
|
// The number of confirmation needed before a bloom section is considered probably final and its rotated bits
|
||||||
|
// are calculated.
|
||||||
|
BloomConfirms uint64
|
||||||
|
|
||||||
|
// The block frequency for creating BloomTrie.
|
||||||
|
BloomTrieSize uint64
|
||||||
|
|
||||||
|
// The number of confirmations needed to generate/accept a bloom trie.
|
||||||
|
BloomTrieConfirms uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// DefaultServerIndexerConfig wraps a set of configs as a default indexer config for server side.
|
||||||
|
DefaultServerIndexerConfig = &IndexerConfig{
|
||||||
|
ChtSize: params.CHTFrequencyServer,
|
||||||
|
PairChtSize: params.CHTFrequencyClient,
|
||||||
|
ChtConfirms: params.HelperTrieProcessConfirmations,
|
||||||
|
BloomSize: params.BloomBitsBlocks,
|
||||||
|
BloomConfirms: params.BloomConfirms,
|
||||||
|
BloomTrieSize: params.BloomTrieFrequency,
|
||||||
|
BloomTrieConfirms: params.HelperTrieProcessConfirmations,
|
||||||
|
}
|
||||||
|
// DefaultClientIndexerConfig wraps a set of configs as a default indexer config for client side.
|
||||||
|
DefaultClientIndexerConfig = &IndexerConfig{
|
||||||
|
ChtSize: params.CHTFrequencyClient,
|
||||||
|
PairChtSize: params.CHTFrequencyServer,
|
||||||
|
ChtConfirms: params.HelperTrieConfirmations,
|
||||||
|
BloomSize: params.BloomBitsBlocksClient,
|
||||||
|
BloomConfirms: params.HelperTrieConfirmations,
|
||||||
|
BloomTrieSize: params.BloomTrieFrequency,
|
||||||
|
BloomTrieConfirms: params.HelperTrieConfirmations,
|
||||||
|
}
|
||||||
|
// TestServerIndexerConfig wraps a set of configs as a test indexer config for server side.
|
||||||
|
TestServerIndexerConfig = &IndexerConfig{
|
||||||
|
ChtSize: 256,
|
||||||
|
PairChtSize: 2048,
|
||||||
|
ChtConfirms: 16,
|
||||||
|
BloomSize: 256,
|
||||||
|
BloomConfirms: 16,
|
||||||
|
BloomTrieSize: 2048,
|
||||||
|
BloomTrieConfirms: 16,
|
||||||
|
}
|
||||||
|
// TestClientIndexerConfig wraps a set of configs as a test indexer config for client side.
|
||||||
|
TestClientIndexerConfig = &IndexerConfig{
|
||||||
|
ChtSize: 2048,
|
||||||
|
PairChtSize: 256,
|
||||||
|
ChtConfirms: 128,
|
||||||
|
BloomSize: 2048,
|
||||||
|
BloomConfirms: 128,
|
||||||
|
BloomTrieSize: 2048,
|
||||||
|
BloomTrieConfirms: 128,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// TrustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with
|
// trustedCheckpoint represents a set of post-processed trie roots (CHT and BloomTrie) associated with
|
||||||
// the appropriate section index and head hash. It is used to start light syncing from this checkpoint
|
// the appropriate section index and head hash. It is used to start light syncing from this checkpoint
|
||||||
// and avoid downloading the entire header chain while still being able to securely access old headers/logs.
|
// and avoid downloading the entire header chain while still being able to securely access old headers/logs.
|
||||||
type TrustedCheckpoint struct {
|
type TrustedCheckpoint struct {
|
||||||
@ -84,9 +139,9 @@ var trustedCheckpoints = map[common.Hash]TrustedCheckpoint{
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ErrNoTrustedCht = errors.New("No trusted canonical hash trie")
|
ErrNoTrustedCht = errors.New("no trusted canonical hash trie")
|
||||||
ErrNoTrustedBloomTrie = errors.New("No trusted bloom trie")
|
ErrNoTrustedBloomTrie = errors.New("no trusted bloom trie")
|
||||||
ErrNoHeader = errors.New("Header not found")
|
ErrNoHeader = errors.New("header not found")
|
||||||
chtPrefix = []byte("chtRoot-") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
|
chtPrefix = []byte("chtRoot-") // chtPrefix + chtNum (uint64 big endian) -> trie root hash
|
||||||
ChtTablePrefix = "cht-"
|
ChtTablePrefix = "cht-"
|
||||||
)
|
)
|
||||||
@ -97,8 +152,8 @@ type ChtNode struct {
|
|||||||
Td *big.Int
|
Td *big.Int
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetChtRoot reads the CHT root assoctiated to the given section from the database
|
// GetChtRoot reads the CHT root associated to the given section from the database
|
||||||
// Note that sectionIdx is specified according to LES/1 CHT section size
|
// Note that sectionIdx is specified according to LES/1 CHT section size.
|
||||||
func GetChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash {
|
func GetChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash {
|
||||||
var encNumber [8]byte
|
var encNumber [8]byte
|
||||||
binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
|
binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
|
||||||
@ -106,21 +161,15 @@ func GetChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) c
|
|||||||
return common.BytesToHash(data)
|
return common.BytesToHash(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetChtV2Root reads the CHT root assoctiated to the given section from the database
|
// StoreChtRoot writes the CHT root associated to the given section into the database
|
||||||
// Note that sectionIdx is specified according to LES/2 CHT section size
|
// Note that sectionIdx is specified according to LES/1 CHT section size.
|
||||||
func GetChtV2Root(db ethdb.Database, sectionIdx uint64, sectionHead common.Hash) common.Hash {
|
|
||||||
return GetChtRoot(db, (sectionIdx+1)*(CHTFrequencyClient/CHTFrequencyServer)-1, sectionHead)
|
|
||||||
}
|
|
||||||
|
|
||||||
// StoreChtRoot writes the CHT root assoctiated to the given section into the database
|
|
||||||
// Note that sectionIdx is specified according to LES/1 CHT section size
|
|
||||||
func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common.Hash) {
|
func StoreChtRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root common.Hash) {
|
||||||
var encNumber [8]byte
|
var encNumber [8]byte
|
||||||
binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
|
binary.BigEndian.PutUint64(encNumber[:], sectionIdx)
|
||||||
db.Put(append(append(chtPrefix, encNumber[:]...), sectionHead.Bytes()...), root.Bytes())
|
db.Put(append(append(chtPrefix, encNumber[:]...), sectionHead.Bytes()...), root.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChtIndexerBackend implements core.ChainIndexerBackend
|
// ChtIndexerBackend implements core.ChainIndexerBackend.
|
||||||
type ChtIndexerBackend struct {
|
type ChtIndexerBackend struct {
|
||||||
diskdb, trieTable ethdb.Database
|
diskdb, trieTable ethdb.Database
|
||||||
odr OdrBackend
|
odr OdrBackend
|
||||||
@ -130,33 +179,24 @@ type ChtIndexerBackend struct {
|
|||||||
trie *trie.Trie
|
trie *trie.Trie
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBloomTrieIndexer creates a BloomTrie chain indexer
|
// NewChtIndexer creates a Cht chain indexer
|
||||||
func NewChtIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer {
|
func NewChtIndexer(db ethdb.Database, odr OdrBackend, size, confirms uint64) *core.ChainIndexer {
|
||||||
var sectionSize, confirmReq uint64
|
|
||||||
if clientMode {
|
|
||||||
sectionSize = CHTFrequencyClient
|
|
||||||
confirmReq = HelperTrieConfirmations
|
|
||||||
} else {
|
|
||||||
sectionSize = CHTFrequencyServer
|
|
||||||
confirmReq = HelperTrieProcessConfirmations
|
|
||||||
}
|
|
||||||
idb := ethdb.NewTable(db, "chtIndex-")
|
|
||||||
trieTable := ethdb.NewTable(db, ChtTablePrefix)
|
trieTable := ethdb.NewTable(db, ChtTablePrefix)
|
||||||
backend := &ChtIndexerBackend{
|
backend := &ChtIndexerBackend{
|
||||||
diskdb: db,
|
diskdb: db,
|
||||||
odr: odr,
|
odr: odr,
|
||||||
trieTable: trieTable,
|
trieTable: trieTable,
|
||||||
triedb: trie.NewDatabase(trieTable),
|
triedb: trie.NewDatabase(trieTable),
|
||||||
sectionSize: sectionSize,
|
sectionSize: size,
|
||||||
}
|
}
|
||||||
return core.NewChainIndexer(db, idb, backend, sectionSize, confirmReq, time.Millisecond*100, "cht")
|
return core.NewChainIndexer(db, ethdb.NewTable(db, "chtIndex-"), backend, size, confirms, time.Millisecond*100, "cht")
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the
|
// fetchMissingNodes tries to retrieve the last entry of the latest trusted CHT from the
|
||||||
// ODR backend in order to be able to add new entries and calculate subsequent root hashes
|
// ODR backend in order to be able to add new entries and calculate subsequent root hashes
|
||||||
func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
|
func (c *ChtIndexerBackend) fetchMissingNodes(ctx context.Context, section uint64, root common.Hash) error {
|
||||||
batch := c.trieTable.NewBatch()
|
batch := c.trieTable.NewBatch()
|
||||||
r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1}
|
r := &ChtRequest{ChtRoot: root, ChtNum: section - 1, BlockNum: section*c.sectionSize - 1, Config: c.odr.IndexerConfig()}
|
||||||
for {
|
for {
|
||||||
err := c.odr.Retrieve(ctx, r)
|
err := c.odr.Retrieve(ctx, r)
|
||||||
switch err {
|
switch err {
|
||||||
@ -221,18 +261,13 @@ func (c *ChtIndexerBackend) Commit() error {
|
|||||||
}
|
}
|
||||||
c.triedb.Commit(root, false)
|
c.triedb.Commit(root, false)
|
||||||
|
|
||||||
if ((c.section+1)*c.sectionSize)%CHTFrequencyClient == 0 {
|
if ((c.section+1)*c.sectionSize)%params.CHTFrequencyClient == 0 {
|
||||||
log.Info("Storing CHT", "section", c.section*c.sectionSize/CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root))
|
log.Info("Storing CHT", "section", c.section*c.sectionSize/params.CHTFrequencyClient, "head", fmt.Sprintf("%064x", c.lastHash), "root", fmt.Sprintf("%064x", root))
|
||||||
}
|
}
|
||||||
StoreChtRoot(c.diskdb, c.section, c.lastHash, root)
|
StoreChtRoot(c.diskdb, c.section, c.lastHash, root)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
BloomTrieFrequency = 32768
|
|
||||||
ethBloomBitsSection = 4096
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
bloomTriePrefix = []byte("bltRoot-") // bloomTriePrefix + bloomTrieNum (uint64 big endian) -> trie root hash
|
bloomTriePrefix = []byte("bltRoot-") // bloomTriePrefix + bloomTrieNum (uint64 big endian) -> trie root hash
|
||||||
BloomTrieTablePrefix = "blt-"
|
BloomTrieTablePrefix = "blt-"
|
||||||
@ -255,33 +290,31 @@ func StoreBloomTrieRoot(db ethdb.Database, sectionIdx uint64, sectionHead, root
|
|||||||
|
|
||||||
// BloomTrieIndexerBackend implements core.ChainIndexerBackend
|
// BloomTrieIndexerBackend implements core.ChainIndexerBackend
|
||||||
type BloomTrieIndexerBackend struct {
|
type BloomTrieIndexerBackend struct {
|
||||||
diskdb, trieTable ethdb.Database
|
diskdb, trieTable ethdb.Database
|
||||||
odr OdrBackend
|
triedb *trie.Database
|
||||||
triedb *trie.Database
|
odr OdrBackend
|
||||||
section, parentSectionSize, bloomTrieRatio uint64
|
section uint64
|
||||||
trie *trie.Trie
|
parentSize uint64
|
||||||
sectionHeads []common.Hash
|
size uint64
|
||||||
|
bloomTrieRatio uint64
|
||||||
|
trie *trie.Trie
|
||||||
|
sectionHeads []common.Hash
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBloomTrieIndexer creates a BloomTrie chain indexer
|
// NewBloomTrieIndexer creates a BloomTrie chain indexer
|
||||||
func NewBloomTrieIndexer(db ethdb.Database, clientMode bool, odr OdrBackend) *core.ChainIndexer {
|
func NewBloomTrieIndexer(db ethdb.Database, odr OdrBackend, parentSize, size uint64) *core.ChainIndexer {
|
||||||
trieTable := ethdb.NewTable(db, BloomTrieTablePrefix)
|
trieTable := ethdb.NewTable(db, BloomTrieTablePrefix)
|
||||||
backend := &BloomTrieIndexerBackend{
|
backend := &BloomTrieIndexerBackend{
|
||||||
diskdb: db,
|
diskdb: db,
|
||||||
odr: odr,
|
odr: odr,
|
||||||
trieTable: trieTable,
|
trieTable: trieTable,
|
||||||
triedb: trie.NewDatabase(trieTable),
|
triedb: trie.NewDatabase(trieTable),
|
||||||
|
parentSize: parentSize,
|
||||||
|
size: size,
|
||||||
}
|
}
|
||||||
idb := ethdb.NewTable(db, "bltIndex-")
|
backend.bloomTrieRatio = size / parentSize
|
||||||
|
|
||||||
if clientMode {
|
|
||||||
backend.parentSectionSize = BloomTrieFrequency
|
|
||||||
} else {
|
|
||||||
backend.parentSectionSize = ethBloomBitsSection
|
|
||||||
}
|
|
||||||
backend.bloomTrieRatio = BloomTrieFrequency / backend.parentSectionSize
|
|
||||||
backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio)
|
backend.sectionHeads = make([]common.Hash, backend.bloomTrieRatio)
|
||||||
return core.NewChainIndexer(db, idb, backend, BloomTrieFrequency, 0, time.Millisecond*100, "bloomtrie")
|
return core.NewChainIndexer(db, ethdb.NewTable(db, "bltIndex-"), backend, size, 0, time.Millisecond*100, "bloomtrie")
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the
|
// fetchMissingNodes tries to retrieve the last entries of the latest trusted bloom trie from the
|
||||||
@ -296,7 +329,7 @@ func (b *BloomTrieIndexerBackend) fetchMissingNodes(ctx context.Context, section
|
|||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
go func() {
|
go func() {
|
||||||
for bitIndex := range indexCh {
|
for bitIndex := range indexCh {
|
||||||
r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}}
|
r := &BloomRequest{BloomTrieRoot: root, BloomTrieNum: section - 1, BitIdx: bitIndex, SectionIdxList: []uint64{section - 1}, Config: b.odr.IndexerConfig()}
|
||||||
for {
|
for {
|
||||||
if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers {
|
if err := b.odr.Retrieve(ctx, r); err == ErrNoPeers {
|
||||||
// if there are no peers to serve, retry later
|
// if there are no peers to serve, retry later
|
||||||
@ -351,9 +384,9 @@ func (b *BloomTrieIndexerBackend) Reset(ctx context.Context, section uint64, las
|
|||||||
|
|
||||||
// Process implements core.ChainIndexerBackend
|
// Process implements core.ChainIndexerBackend
|
||||||
func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error {
|
func (b *BloomTrieIndexerBackend) Process(ctx context.Context, header *types.Header) error {
|
||||||
num := header.Number.Uint64() - b.section*BloomTrieFrequency
|
num := header.Number.Uint64() - b.section*b.size
|
||||||
if (num+1)%b.parentSectionSize == 0 {
|
if (num+1)%b.parentSize == 0 {
|
||||||
b.sectionHeads[num/b.parentSectionSize] = header.Hash()
|
b.sectionHeads[num/b.parentSize] = header.Hash()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -372,7 +405,7 @@ func (b *BloomTrieIndexerBackend) Commit() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
decompData, err2 := bitutil.DecompressBytes(data, int(b.parentSectionSize/8))
|
decompData, err2 := bitutil.DecompressBytes(data, int(b.parentSize/8))
|
||||||
if err2 != nil {
|
if err2 != nil {
|
||||||
return err2
|
return err2
|
||||||
}
|
}
|
||||||
@ -397,6 +430,5 @@ func (b *BloomTrieIndexerBackend) Commit() error {
|
|||||||
sectionHead := b.sectionHeads[b.bloomTrieRatio-1]
|
sectionHead := b.sectionHeads[b.bloomTrieRatio-1]
|
||||||
log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize))
|
log.Info("Storing bloom trie", "section", b.section, "head", fmt.Sprintf("%064x", sectionHead), "root", fmt.Sprintf("%064x", root), "compression", float64(compSize)/float64(decompSize))
|
||||||
StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root)
|
StoreBloomTrieRoot(b.diskdb, b.section, sectionHead, root)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,7 @@ func TestNodeIterator(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
odr := &testOdr{sdb: fulldb, ldb: lightdb}
|
odr := &testOdr{sdb: fulldb, ldb: lightdb, indexerConfig: TestClientIndexerConfig}
|
||||||
head := blockchain.CurrentHeader()
|
head := blockchain.CurrentHeader()
|
||||||
lightTrie, _ := NewStateDatabase(ctx, head, odr).OpenTrie(head.Root)
|
lightTrie, _ := NewStateDatabase(ctx, head, odr).OpenTrie(head.Root)
|
||||||
fullTrie, _ := state.NewDatabase(fulldb).OpenTrie(head.Root)
|
fullTrie, _ := state.NewDatabase(fulldb).OpenTrie(head.Root)
|
||||||
|
@ -94,7 +94,7 @@ func TestTxPool(t *testing.T) {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
odr := &testOdr{sdb: sdb, ldb: ldb}
|
odr := &testOdr{sdb: sdb, ldb: ldb, indexerConfig: TestClientIndexerConfig}
|
||||||
relay := &testTxRelay{
|
relay := &testTxRelay{
|
||||||
send: make(chan int, 1),
|
send: make(chan int, 1),
|
||||||
discard: make(chan int, 1),
|
discard: make(chan int, 1),
|
||||||
|
@ -17,10 +17,38 @@
|
|||||||
package params
|
package params
|
||||||
|
|
||||||
// These are network parameters that need to be constant between clients, but
|
// These are network parameters that need to be constant between clients, but
|
||||||
// aren't necesarilly consensus related.
|
// aren't necessarily consensus related.
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// BloomBitsBlocks is the number of blocks a single bloom bit section vector
|
// BloomBitsBlocks is the number of blocks a single bloom bit section vector
|
||||||
// contains.
|
// contains on the server side.
|
||||||
BloomBitsBlocks uint64 = 4096
|
BloomBitsBlocks uint64 = 4096
|
||||||
|
|
||||||
|
// BloomBitsBlocksClient is the number of blocks a single bloom bit section vector
|
||||||
|
// contains on the light client side
|
||||||
|
BloomBitsBlocksClient uint64 = 32768
|
||||||
|
|
||||||
|
// BloomConfirms is the number of confirmation blocks before a bloom section is
|
||||||
|
// considered probably final and its rotated bits are calculated.
|
||||||
|
BloomConfirms = 256
|
||||||
|
|
||||||
|
// CHTFrequencyClient is the block frequency for creating CHTs on the client side.
|
||||||
|
CHTFrequencyClient = 32768
|
||||||
|
|
||||||
|
// CHTFrequencyServer is the block frequency for creating CHTs on the server side.
|
||||||
|
// Eventually this can be merged back with the client version, but that requires a
|
||||||
|
// full database upgrade, so that should be left for a suitable moment.
|
||||||
|
CHTFrequencyServer = 4096
|
||||||
|
|
||||||
|
// BloomTrieFrequency is the block frequency for creating BloomTrie on both
|
||||||
|
// server/client sides.
|
||||||
|
BloomTrieFrequency = 32768
|
||||||
|
|
||||||
|
// HelperTrieConfirmations is the number of confirmations before a client is expected
|
||||||
|
// to have the given HelperTrie available.
|
||||||
|
HelperTrieConfirmations = 2048
|
||||||
|
|
||||||
|
// HelperTrieProcessConfirmations is the number of confirmations before a HelperTrie
|
||||||
|
// is generated
|
||||||
|
HelperTrieProcessConfirmations = 256
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user