Compare commits

...

12 Commits

Author SHA1 Message Date
Felix Lange
e5eb32acee params: release geth v1.10.26 stable 2022-11-03 11:59:33 +01:00
Jordan Krage
211dbb7197 rpc: handle wrong HTTP batch response length (#26064) 2022-11-03 01:15:36 +01:00
Martin Holst Swende
27600a5b84 eth/filters: change filter block to be by-ref (#26054)
This PR changes the block field in the filter to be a pointer, to disambiguate between empty hash and no hash
2022-11-03 01:15:36 +01:00
Péter Szilágyi
99bbb33701 eth: fix a rare datarace on CHT challenge reply / shutdown (#25831) 2022-11-03 01:15:27 +01:00
Péter Szilágyi
a32e69a28c trie: check childrens' existence concurrently for snap heal (#25694) 2022-11-03 01:15:11 +01:00
Péter Szilágyi
937ea491f9 eth/protocols/snap: throttle trie heal requests when peers DoS us (#25666)
* eth/protocols/snap: throttle trie heal requests when peers DoS us

* eth/protocols/snap: lower heal throttle log to debug

Co-authored-by: Martin Holst Swende <martin@swende.se>

* eth/protocols/snap: fix comment

Co-authored-by: Martin Holst Swende <martin@swende.se>
2022-11-03 01:15:11 +01:00
Martin Holst Swende
85e469f787 eth/protocols/snap: fix problems due to idle-but-busy peers (#25651) 2022-11-03 01:14:47 +01:00
Péter Szilágyi
69568c5548 params: release Geth v1.10.25 2022-09-15 17:55:58 +02:00
Marius van der Wijden
8f61fc8b73 params: set TerminalTotalDifficultyPassed to true (#25769)
* params: set TerminalTotalDifficultyPassed to true

* Update params/config.go

Co-authored-by: Martin Holst Swende <martin@swende.se>
2022-09-15 17:54:48 +02:00
Péter Szilágyi
972007a517 Release Geth v1.10.24 2022-09-14 11:07:10 +02:00
Sina Mahmoodi
3b41be695e graphql: fixes missing tx logs (#25745)
* graphql: fix tx logs

* graphql: refactor test service setup

* graphql: add test for tx logs
2022-09-13 23:03:49 +02:00
Sina Mahmoodi
d0dc349fd3 graphql: return correct logs for tx (#25612)
* graphql: fix tx logs

* minor

* Use optimized search for selecting tx logs
2022-09-13 23:03:21 +02:00
12 changed files with 416 additions and 205 deletions

View File

@@ -34,8 +34,8 @@ type Filter struct {
addresses []common.Address
topics [][]common.Hash
block common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
block *common.Hash // Block hash if filtering a single block
begin, end int64 // Range interval if filtering multiple blocks
matcher *bloombits.Matcher
}
@@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
// Create a generic filter and convert it into a block filter
filter := newFilter(sys, addresses, topics)
filter.block = block
filter.block = &block
return filter
}
@@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.
// first block that contains matches, updating the start of the filter accordingly.
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
// If we're doing singleton block filtering, execute and return
if f.block != (common.Hash{}) {
header, err := f.sys.backend.HeaderByHash(ctx, f.block)
if f.block != nil {
header, err := f.sys.backend.HeaderByHash(ctx, *f.block)
if err != nil {
return nil, err
}

View File

@@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
if h.checkpointHash != (common.Hash{}) {
// Request the peer's checkpoint header for chain height/weight validation
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil {
req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh)
if err != nil {
return err
}
// Start a timer to disconnect if the peer doesn't reply in time
go func() {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()
timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()
@@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
// If we have any explicit peer required block hashes, request them
for number, hash := range h.requiredBlocks {
resCh := make(chan *eth.Response)
if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil {
req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh)
if err != nil {
return err
}
go func(number uint64, hash common.Hash) {
go func(number uint64, hash common.Hash, req *eth.Request) {
// Ensure the request gets cancelled in case of error/drop
defer req.Close()
timeout := time.NewTimer(syncChallengeTimeout)
defer timeout.Stop()
@@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
h.removePeer(peer.ID())
}
}(number, hash)
}(number, hash, req)
}
// Handle incoming messages until the connection is torn down
return handler(peer)

View File

@@ -21,10 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
gomath "math"
"math/big"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
@@ -78,6 +80,29 @@ const (
// and waste round trip times. If it's too high, we're capping responses and
// waste bandwidth.
maxTrieRequestCount = maxRequestSize / 512
// trienodeHealRateMeasurementImpact is the impact a single measurement has on
// the local node's trienode processing capacity. A value closer to 0 reacts
// slower to sudden changes, but it is also more stable against temporary hiccups.
trienodeHealRateMeasurementImpact = 0.005
// minTrienodeHealThrottle is the minimum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
minTrienodeHealThrottle = 1
// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
// heal requests to avoid overloading the local node and exessively expanding
// the state trie bedth wise.
maxTrienodeHealThrottle = maxTrieRequestCount
// trienodeHealThrottleIncrease is the multiplier for the throttle when the
// rate of arriving data is higher than the rate of processing it.
trienodeHealThrottleIncrease = 1.33
// trienodeHealThrottleDecrease is the divisor for the throttle when the
// rate of arriving data is lower than the rate of processing it.
trienodeHealThrottleDecrease = 1.25
)
var (
@@ -431,6 +456,11 @@ type Syncer struct {
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
trienodeHealRate float64 // Average heal rate for processing trie node data
trienodeHealPend uint64 // Number of trie nodes currently pending for processing
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated
trienodeHealSynced uint64 // Number of state trie nodes downloaded
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
trienodeHealDups uint64 // Number of state trie nodes already processed
@@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
trienodeHealIdlers: make(map[string]struct{}),
bytecodeHealIdlers: make(map[string]struct{}),
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
stateWriter: db.NewBatch(),
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
stateWriter: db.NewBatch(),
extProgress: new(SyncProgress),
}
@@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
if cap > maxTrieRequestCount {
cap = maxTrieRequestCount
}
cap = int(float64(cap) / s.trienodeHealThrottle)
if cap <= 0 {
cap = 1
}
var (
hashes = make([]common.Hash, 0, cap)
paths = make([]string, 0, cap)
@@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
// processTrienodeHealResponse integrates an already validated trienode response
// into the healer tasks.
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
var (
start = time.Now()
fills int
)
for i, hash := range res.hashes {
node := res.nodes[i]
@@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
res.task.trieTasks[res.paths[i]] = res.hashes[i]
continue
}
fills++
// Push the trie node into the state syncer
s.trienodeHealSynced++
s.trienodeHealBytes += common.StorageSize(len(node))
@@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
log.Crit("Failed to persist healing data", "err", err)
}
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
// Calculate the processing rate of one filled trie node
rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))
// Update the currently measured trienode queueing and processing throughput.
//
// The processing rate needs to be updated uniformly independent if we've
// processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
// the face of varying network packets. As such, we cannot just measure the
// time it took to process N trie nodes and update once, we need one update
// per trie node.
//
// Naively, that would be:
//
// for i:=0; i<fills; i++ {
// healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
// }
//
// Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
//
// We can expand that formula for the Nth item as:
// HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
//
// The above is a geometric sequence that can be summed to:
// HR(N) = (1-MI)^N*(OR-NR) + NR
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate
pending := atomic.LoadUint64(&s.trienodeHealPend)
if time.Since(s.trienodeHealThrottled) > time.Second {
// Periodically adjust the trie node throttler
if float64(pending) > 2*s.trienodeHealRate {
s.trienodeHealThrottle *= trienodeHealThrottleIncrease
} else {
s.trienodeHealThrottle /= trienodeHealThrottleDecrease
}
if s.trienodeHealThrottle > maxTrienodeHealThrottle {
s.trienodeHealThrottle = maxTrienodeHealThrottle
} else if s.trienodeHealThrottle < minTrienodeHealThrottle {
s.trienodeHealThrottle = minTrienodeHealThrottle
}
s.trienodeHealThrottled = time.Now()
log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
}
}
// processBytecodeHealResponse integrates an already validated bytecode response
@@ -2248,14 +2333,18 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.accountIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.accountReqs[id]
if !ok {
@@ -2360,14 +2449,18 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.bytecodeReqs[id]
if !ok {
@@ -2469,14 +2562,18 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.storageIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.storageReqs[id]
if !ok {
@@ -2596,14 +2693,18 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.trienodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.trienodeHealReqs[id]
if !ok {
@@ -2639,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
// Cross reference the requested trienodes with the response to find gaps
// that the serving node is missing
hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash := make([]byte, 32)
nodes := make([][]byte, len(req.hashes))
var (
hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
hash = make([]byte, 32)
nodes = make([][]byte, len(req.hashes))
fills uint64
)
for i, j := 0, 0; i < len(trienodes); i++ {
// Find the next hash that we've been served, leaving misses with nils
hasher.Reset()
@@ -2654,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
}
if j < len(req.hashes) {
nodes[j] = trienodes[i]
fills++
j++
continue
}
// We've either ran out of hashes, or got unrequested data
logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
// Signal this request as failed, and ready for rescheduling
s.scheduleRevertTrienodeHealRequest(req)
return errors.New("unexpected healing trienode")
}
// Response validated, send it to the scheduler for filling
atomic.AddUint64(&s.trienodeHealPend, fills)
defer func() {
atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1))
}()
response := &trienodeHealResponse{
paths: req.paths,
task: req.task,
@@ -2691,14 +2800,18 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
// Whether or not the response is valid, we can mark the peer as idle and
// notify the scheduler to assign a new task. If the response is invalid,
// we'll drop the peer in a bit.
defer func() {
s.lock.Lock()
defer s.lock.Unlock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
}()
s.lock.Lock()
if _, ok := s.peers[peer.ID()]; ok {
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
}
select {
case s.update <- struct{}{}:
default:
}
// Ensure the response is for a valid request
req, ok := s.bytecodeHealReqs[id]
if !ok {

View File

@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"math/big"
"sort"
"strconv"
"github.com/ethereum/go-ethereum"
@@ -478,13 +479,16 @@ func (t *Transaction) getLogs(ctx context.Context) (*[]*Log, error) {
if err != nil {
return nil, err
}
ret := make([]*Log, 0, len(logs))
for _, log := range logs {
var ret []*Log
// Select tx logs from all block logs
ix := sort.Search(len(logs), func(i int) bool { return uint64(logs[i].TxIndex) >= t.index })
for ix < len(logs) && uint64(logs[ix].TxIndex) == t.index {
ret = append(ret, &Log{
r: t.r,
transaction: t,
log: log,
log: logs[ix],
})
ix++
}
return &ret, nil
}

View File

@@ -17,6 +17,8 @@
package graphql
import (
"context"
"encoding/json"
"fmt"
"io"
"math/big"
@@ -51,15 +53,21 @@ func TestBuildSchema(t *testing.T) {
}
defer stack.Close()
// Make sure the schema can be parsed and matched up to the object model.
if err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil {
if _, err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil {
t.Errorf("Could not construct GraphQL handler: %v", err)
}
}
// Tests that a graphQL request is successfully handled when graphql is enabled on the specified endpoint
func TestGraphQLBlockSerialization(t *testing.T) {
stack := createNode(t, true, false)
stack := createNode(t)
defer stack.Close()
genesis := &core.Genesis{
Config: params.AllEthashProtocolChanges,
GasLimit: 11500000,
Difficulty: big.NewInt(1048576),
}
newGQLService(t, stack, genesis, 10, func(i int, gen *core.BlockGen) {})
// start node
if err := stack.Start(); err != nil {
t.Fatalf("could not start node: %v", err)
@@ -161,8 +169,55 @@ func TestGraphQLBlockSerialization(t *testing.T) {
}
func TestGraphQLBlockSerializationEIP2718(t *testing.T) {
stack := createNode(t, true, true)
// Account for signing txes
var (
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000000000)
dad = common.HexToAddress("0x0000000000000000000000000000000000000dad")
)
stack := createNode(t)
defer stack.Close()
genesis := &core.Genesis{
Config: params.AllEthashProtocolChanges,
GasLimit: 11500000,
Difficulty: big.NewInt(1048576),
Alloc: core.GenesisAlloc{
address: {Balance: funds},
// The address 0xdad sloads 0x00 and 0x01
dad: {
Code: []byte{byte(vm.PC), byte(vm.PC), byte(vm.SLOAD), byte(vm.SLOAD)},
Nonce: 0,
Balance: big.NewInt(0),
},
},
BaseFee: big.NewInt(params.InitialBaseFee),
}
signer := types.LatestSigner(genesis.Config)
newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) {
gen.SetCoinbase(common.Address{1})
tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{
Nonce: uint64(0),
To: &dad,
Value: big.NewInt(100),
Gas: 50000,
GasPrice: big.NewInt(params.InitialBaseFee),
})
gen.AddTx(tx)
tx, _ = types.SignNewTx(key, signer, &types.AccessListTx{
ChainID: genesis.Config.ChainID,
Nonce: uint64(1),
To: &dad,
Gas: 30000,
GasPrice: big.NewInt(params.InitialBaseFee),
Value: big.NewInt(50),
AccessList: types.AccessList{{
Address: dad,
StorageKeys: []common.Hash{{0}},
}},
})
gen.AddTx(tx)
})
// start node
if err := stack.Start(); err != nil {
t.Fatalf("could not start node: %v", err)
@@ -198,7 +253,7 @@ func TestGraphQLBlockSerializationEIP2718(t *testing.T) {
// Tests that a graphQL request is not handled successfully when graphql is not enabled on the specified endpoint
func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) {
stack := createNode(t, false, false)
stack := createNode(t)
defer stack.Close()
if err := stack.Start(); err != nil {
t.Fatalf("could not start node: %v", err)
@@ -212,7 +267,59 @@ func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) {
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}
func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node {
func TestGraphQLTransactionLogs(t *testing.T) {
var (
key, _ = crypto.GenerateKey()
addr = crypto.PubkeyToAddress(key.PublicKey)
dadStr = "0x0000000000000000000000000000000000000dad"
dad = common.HexToAddress(dadStr)
genesis = &core.Genesis{
Config: params.AllEthashProtocolChanges,
GasLimit: 11500000,
Difficulty: big.NewInt(1048576),
Alloc: core.GenesisAlloc{
addr: {Balance: big.NewInt(params.Ether)},
dad: {
// LOG0(0, 0), LOG0(0, 0), RETURN(0, 0)
Code: common.Hex2Bytes("60006000a060006000a060006000f3"),
Nonce: 0,
Balance: big.NewInt(0),
},
},
}
signer = types.LatestSigner(genesis.Config)
stack = createNode(t)
)
defer stack.Close()
handler := newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) {
tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)})
gen.AddTx(tx)
tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 1, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)})
gen.AddTx(tx)
tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 2, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)})
gen.AddTx(tx)
})
// start node
if err := stack.Start(); err != nil {
t.Fatalf("could not start node: %v", err)
}
query := `{block { transactions { logs { account { address } } } } }`
res := handler.Schema.Exec(context.Background(), query, "", map[string]interface{}{})
if res.Errors != nil {
t.Fatalf("graphql query failed: %v", res.Errors)
}
have, err := json.Marshal(res.Data)
if err != nil {
t.Fatalf("failed to encode graphql response: %s", err)
}
want := fmt.Sprintf(`{"block":{"transactions":[{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]}]}}`, dadStr, dadStr, dadStr, dadStr, dadStr, dadStr)
if string(have) != want {
t.Errorf("response unmatch. expected %s, got %s", want, have)
}
}
func createNode(t *testing.T) *node.Node {
stack, err := node.New(&node.Config{
HTTPHost: "127.0.0.1",
HTTPPort: 0,
@@ -222,25 +329,12 @@ func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node {
if err != nil {
t.Fatalf("could not create node: %v", err)
}
if !gqlEnabled {
return stack
}
if !txEnabled {
createGQLService(t, stack)
} else {
createGQLServiceWithTransactions(t, stack)
}
return stack
}
func createGQLService(t *testing.T, stack *node.Node) {
// create backend
func newGQLService(t *testing.T, stack *node.Node, gspec *core.Genesis, genBlocks int, genfunc func(i int, gen *core.BlockGen)) *handler {
ethConf := &ethconfig.Config{
Genesis: &core.Genesis{
Config: params.AllEthashProtocolChanges,
GasLimit: 11500000,
Difficulty: big.NewInt(1048576),
},
Genesis: gspec,
Ethash: ethash.Config{
PowMode: ethash.ModeFake,
},
@@ -258,101 +352,16 @@ func createGQLService(t *testing.T, stack *node.Node) {
}
// Create some blocks and import them
chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(),
ethash.NewFaker(), ethBackend.ChainDb(), 10, func(i int, gen *core.BlockGen) {})
ethash.NewFaker(), ethBackend.ChainDb(), genBlocks, genfunc)
_, err = ethBackend.BlockChain().InsertChain(chain)
if err != nil {
t.Fatalf("could not create import blocks: %v", err)
}
// create gql service
// Set up handler
filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{})
err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
if err != nil {
t.Fatalf("could not create graphql service: %v", err)
}
}
func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) {
// create backend
key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address := crypto.PubkeyToAddress(key.PublicKey)
funds := big.NewInt(1000000000000000)
dad := common.HexToAddress("0x0000000000000000000000000000000000000dad")
ethConf := &ethconfig.Config{
Genesis: &core.Genesis{
Config: params.AllEthashProtocolChanges,
GasLimit: 11500000,
Difficulty: big.NewInt(1048576),
Alloc: core.GenesisAlloc{
address: {Balance: funds},
// The address 0xdad sloads 0x00 and 0x01
dad: {
Code: []byte{
byte(vm.PC),
byte(vm.PC),
byte(vm.SLOAD),
byte(vm.SLOAD),
},
Nonce: 0,
Balance: big.NewInt(0),
},
},
BaseFee: big.NewInt(params.InitialBaseFee),
},
Ethash: ethash.Config{
PowMode: ethash.ModeFake,
},
NetworkId: 1337,
TrieCleanCache: 5,
TrieCleanCacheJournal: "triecache",
TrieCleanCacheRejournal: 60 * time.Minute,
TrieDirtyCache: 5,
TrieTimeout: 60 * time.Minute,
SnapshotCache: 5,
}
ethBackend, err := eth.New(stack, ethConf)
if err != nil {
t.Fatalf("could not create eth backend: %v", err)
}
signer := types.LatestSigner(ethConf.Genesis.Config)
legacyTx, _ := types.SignNewTx(key, signer, &types.LegacyTx{
Nonce: uint64(0),
To: &dad,
Value: big.NewInt(100),
Gas: 50000,
GasPrice: big.NewInt(params.InitialBaseFee),
})
envelopTx, _ := types.SignNewTx(key, signer, &types.AccessListTx{
ChainID: ethConf.Genesis.Config.ChainID,
Nonce: uint64(1),
To: &dad,
Gas: 30000,
GasPrice: big.NewInt(params.InitialBaseFee),
Value: big.NewInt(50),
AccessList: types.AccessList{{
Address: dad,
StorageKeys: []common.Hash{{0}},
}},
})
// Create some blocks and import them
chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(),
ethash.NewFaker(), ethBackend.ChainDb(), 1, func(i int, b *core.BlockGen) {
b.SetCoinbase(common.Address{1})
b.AddTx(legacyTx)
b.AddTx(envelopTx)
})
_, err = ethBackend.BlockChain().InsertChain(chain)
if err != nil {
t.Fatalf("could not create import blocks: %v", err)
}
// create gql service
filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{})
err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
handler, err := newHandler(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
if err != nil {
t.Fatalf("could not create graphql service: %v", err)
}
return handler
}

View File

@@ -57,17 +57,18 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// New constructs a new GraphQL service instance.
func New(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error {
return newHandler(stack, backend, filterSystem, cors, vhosts)
_, err := newHandler(stack, backend, filterSystem, cors, vhosts)
return err
}
// newHandler returns a new `http.Handler` that will answer GraphQL queries.
// It additionally exports an interactive query browser on the / endpoint.
func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error {
func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) (*handler, error) {
q := Resolver{backend, filterSystem}
s, err := graphql.ParseSchema(schema, &q)
if err != nil {
return err
return nil, err
}
h := handler{Schema: s}
handler := node.NewHTTPHandlerStack(h, cors, vhosts, nil)
@@ -76,5 +77,5 @@ func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.
stack.RegisterHandler("GraphQL", "/graphql", handler)
stack.RegisterHandler("GraphQL", "/graphql/", handler)
return nil
return &h, nil
}

View File

@@ -59,25 +59,26 @@ var (
// MainnetChainConfig is the chain parameters to run a node on the main network.
MainnetChainConfig = &ChainConfig{
ChainID: big.NewInt(1),
HomesteadBlock: big.NewInt(1_150_000),
DAOForkBlock: big.NewInt(1_920_000),
DAOForkSupport: true,
EIP150Block: big.NewInt(2_463_000),
EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
EIP155Block: big.NewInt(2_675_000),
EIP158Block: big.NewInt(2_675_000),
ByzantiumBlock: big.NewInt(4_370_000),
ConstantinopleBlock: big.NewInt(7_280_000),
PetersburgBlock: big.NewInt(7_280_000),
IstanbulBlock: big.NewInt(9_069_000),
MuirGlacierBlock: big.NewInt(9_200_000),
BerlinBlock: big.NewInt(12_244_000),
LondonBlock: big.NewInt(12_965_000),
ArrowGlacierBlock: big.NewInt(13_773_000),
GrayGlacierBlock: big.NewInt(15_050_000),
TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000
Ethash: new(EthashConfig),
ChainID: big.NewInt(1),
HomesteadBlock: big.NewInt(1_150_000),
DAOForkBlock: big.NewInt(1_920_000),
DAOForkSupport: true,
EIP150Block: big.NewInt(2_463_000),
EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
EIP155Block: big.NewInt(2_675_000),
EIP158Block: big.NewInt(2_675_000),
ByzantiumBlock: big.NewInt(4_370_000),
ConstantinopleBlock: big.NewInt(7_280_000),
PetersburgBlock: big.NewInt(7_280_000),
IstanbulBlock: big.NewInt(9_069_000),
MuirGlacierBlock: big.NewInt(9_200_000),
BerlinBlock: big.NewInt(12_244_000),
LondonBlock: big.NewInt(12_965_000),
ArrowGlacierBlock: big.NewInt(13_773_000),
GrayGlacierBlock: big.NewInt(15_050_000),
TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000
TerminalTotalDifficultyPassed: true,
Ethash: new(EthashConfig),
}
// MainnetTrustedCheckpoint contains the light client trusted checkpoint for the main network.

View File

@@ -23,7 +23,7 @@ import (
const (
VersionMajor = 1 // Major version component of the current release
VersionMinor = 10 // Minor version component of the current release
VersionPatch = 23 // Patch version component of the current release
VersionPatch = 26 // Patch version component of the current release
VersionMeta = "stable" // Version metadata to append to the version string
)

View File

@@ -31,6 +31,7 @@ import (
)
var (
ErrBadResult = errors.New("bad result in JSON-RPC response")
ErrClientQuit = errors.New("client is closed")
ErrNoResult = errors.New("no result in JSON-RPC response")
ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")

View File

@@ -19,6 +19,7 @@ package rpc
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"net"
@@ -144,6 +145,53 @@ func TestClientBatchRequest(t *testing.T) {
}
}
func TestClientBatchRequest_len(t *testing.T) {
b, err := json.Marshal([]jsonrpcMessage{
{Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)},
{Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)},
})
if err != nil {
t.Fatal("failed to encode jsonrpc message:", err)
}
s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
_, err := rw.Write(b)
if err != nil {
t.Error("failed to write response:", err)
}
}))
t.Cleanup(s.Close)
client, err := Dial(s.URL)
if err != nil {
t.Fatal("failed to dial test server:", err)
}
defer client.Close()
t.Run("too-few", func(t *testing.T) {
batch := []BatchElem{
{Method: "foo"},
{Method: "bar"},
{Method: "baz"},
}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
defer cancelFn()
if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) {
t.Errorf("expected %q but got: %v", ErrBadResult, err)
}
})
t.Run("too-many", func(t *testing.T) {
batch := []BatchElem{
{Method: "foo"},
}
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
defer cancelFn()
if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) {
t.Errorf("expected %q but got: %v", ErrBadResult, err)
}
})
}
func TestClientNotify(t *testing.T) {
server := newTestServer()
defer server.Stop()

View File

@@ -173,6 +173,9 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr
if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
return err
}
if len(respmsgs) != len(msgs) {
return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult)
}
for i := 0; i < len(respmsgs); i++ {
op.resp <- &respmsgs[i]
}

View File

@@ -19,6 +19,7 @@ package trie
import (
"errors"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/prque"
@@ -381,11 +382,11 @@ func (s *Sync) scheduleCodeRequest(req *codeRequest) {
// retrieval scheduling.
func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// Gather all the children of the node, irrelevant whether known or not
type child struct {
type childNode struct {
path []byte
node node
}
var children []child
var children []childNode
switch node := (object).(type) {
case *shortNode:
@@ -393,14 +394,14 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
if hasTerm(key) {
key = key[:len(key)-1]
}
children = []child{{
children = []childNode{{
node: node.Val,
path: append(append([]byte(nil), req.path...), key...),
}}
case *fullNode:
for i := 0; i < 17; i++ {
if node.Children[i] != nil {
children = append(children, child{
children = append(children, childNode{
node: node.Children[i],
path: append(append([]byte(nil), req.path...), byte(i)),
})
@@ -410,7 +411,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
panic(fmt.Sprintf("unknown node: %+v", node))
}
// Iterate over the children, and request all unknown ones
requests := make([]*nodeRequest, 0, len(children))
var (
missing = make(chan *nodeRequest, len(children))
pending sync.WaitGroup
)
for _, child := range children {
// Notify any external watcher of a new key/value node
if req.callback != nil {
@@ -433,19 +437,36 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
if s.membatch.hasNode(child.path) {
continue
}
// If database says duplicate, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code.
chash := common.BytesToHash(node)
if rawdb.HasTrieNode(s.database, chash) {
continue
}
// Locally unknown node, schedule for retrieval
requests = append(requests, &nodeRequest{
path: child.path,
hash: chash,
parent: req,
callback: req.callback,
})
// Check the presence of children concurrently
pending.Add(1)
go func(child childNode) {
defer pending.Done()
// If database says duplicate, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code.
chash := common.BytesToHash(node)
if rawdb.HasTrieNode(s.database, chash) {
return
}
// Locally unknown node, schedule for retrieval
missing <- &nodeRequest{
path: child.path,
hash: chash,
parent: req,
callback: req.callback,
}
}(child)
}
}
pending.Wait()
requests := make([]*nodeRequest, 0, len(children))
for done := false; !done; {
select {
case miss := <-missing:
requests = append(requests, miss)
default:
done = true
}
}
return requests, nil