diff --git a/beacon/engine/types.go b/beacon/engine/types.go index 34365ecfa8..9f41aa04ca 100644 --- a/beacon/engine/types.go +++ b/beacon/engine/types.go @@ -118,6 +118,11 @@ type BlobsBundleV1 struct { Blobs []hexutil.Bytes `json:"blobs"` } +type BlobAndProofV1 struct { + Blob hexutil.Bytes `json:"blob"` + Proof hexutil.Bytes `json:"proof"` +} + // JSON type overrides for ExecutionPayloadEnvelope. type executionPayloadEnvelopeMarshaling struct { BlockValue *hexutil.Big diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 82df09a4bf..76cb6801fa 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -36,6 +36,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -88,9 +89,11 @@ const ( // bare minimum needed fields to keep the size down (and thus number of entries // larger with the same memory consumption). type blobTxMeta struct { - hash common.Hash // Transaction hash to maintain the lookup table - id uint64 // Storage ID in the pool's persistent store - size uint32 // Byte size in the pool's persistent store + hash common.Hash // Transaction hash to maintain the lookup table + vhashes []common.Hash // Blob versioned hashes to maintain the lookup table + + id uint64 // Storage ID in the pool's persistent store + size uint32 // Byte size in the pool's persistent store nonce uint64 // Needed to prioritize inclusion order within an account costCap *uint256.Int // Needed to validate cumulative balance sufficiency @@ -113,6 +116,7 @@ type blobTxMeta struct { func newBlobTxMeta(id uint64, size uint32, tx *types.Transaction) *blobTxMeta { meta := &blobTxMeta{ hash: tx.Hash(), + vhashes: tx.BlobHashes(), id: id, size: size, nonce: tx.Nonce(), @@ -306,7 +310,7 @@ type BlobPool struct { state *state.StateDB // Current state at the head of the chain gasTip *uint256.Int // Currently accepted minimum gas tip - lookup map[common.Hash]uint64 // Lookup table mapping hashes to tx billy entries + lookup *lookup // Lookup table mapping blobs to txs and txs to billy entries index map[common.Address][]*blobTxMeta // Blob transactions grouped by accounts, sorted by nonce spent map[common.Address]*uint256.Int // Expenditure tracking for individual accounts evict *evictHeap // Heap of cheapest accounts for eviction when full @@ -328,7 +332,7 @@ func New(config Config, chain BlockChain) *BlobPool { config: config, signer: types.LatestSigner(chain.Config()), chain: chain, - lookup: make(map[common.Hash]uint64), + lookup: newLookup(), index: make(map[common.Address][]*blobTxMeta), spent: make(map[common.Address]*uint256.Int), } @@ -471,7 +475,7 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { } meta := newBlobTxMeta(id, size, tx) - if _, exists := p.lookup[meta.hash]; exists { + if p.lookup.exists(meta.hash) { // This path is only possible after a crash, where deleted items are not // removed via the normal shutdown-startup procedure and thus may get // partially resurrected. @@ -496,9 +500,8 @@ func (p *BlobPool) parseTransaction(id uint64, size uint32, blob []byte) error { p.index[sender] = append(p.index[sender], meta) p.spent[sender] = new(uint256.Int).Add(p.spent[sender], meta.costCap) - p.lookup[meta.hash] = meta.id + p.lookup.track(meta) p.stored += uint64(meta.size) - return nil } @@ -531,7 +534,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 nonces = append(nonces, txs[i].nonce) p.stored -= uint64(txs[i].size) - delete(p.lookup, txs[i].hash) + p.lookup.untrack(txs[i]) // Included transactions blobs need to be moved to the limbo if filled && inclusions != nil { @@ -572,7 +575,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[0].costCap) p.stored -= uint64(txs[0].size) - delete(p.lookup, txs[0].hash) + p.lookup.untrack(txs[0]) // Included transactions blobs need to be moved to the limbo if inclusions != nil { @@ -621,14 +624,14 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 // crash would result in previously deleted entities being resurrected. // That could potentially cause a duplicate nonce to appear. if txs[i].nonce == txs[i-1].nonce { - id := p.lookup[txs[i].hash] + id, _ := p.lookup.storeidOfTx(txs[i].hash) log.Error("Dropping repeat nonce blob transaction", "from", addr, "nonce", txs[i].nonce, "id", id) dropRepeatedMeter.Mark(1) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) p.stored -= uint64(txs[i].size) - delete(p.lookup, txs[i].hash) + p.lookup.untrack(txs[i]) if err := p.store.Delete(id); err != nil { log.Error("Failed to delete blob transaction", "from", addr, "id", id, "err", err) @@ -650,7 +653,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[j].costCap) p.stored -= uint64(txs[j].size) - delete(p.lookup, txs[j].hash) + p.lookup.untrack(txs[j]) } txs = txs[:i] @@ -688,7 +691,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap) p.stored -= uint64(last.size) - delete(p.lookup, last.hash) + p.lookup.untrack(last) } if len(txs) == 0 { delete(p.index, addr) @@ -728,7 +731,7 @@ func (p *BlobPool) recheck(addr common.Address, inclusions map[common.Hash]uint6 p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], last.costCap) p.stored -= uint64(last.size) - delete(p.lookup, last.hash) + p.lookup.untrack(last) } p.index[addr] = txs @@ -1006,7 +1009,7 @@ func (p *BlobPool) reinject(addr common.Address, txhash common.Hash) error { p.index[addr] = append(p.index[addr], meta) p.spent[addr] = new(uint256.Int).Add(p.spent[addr], meta.costCap) } - p.lookup[meta.hash] = meta.id + p.lookup.track(meta) p.stored += uint64(meta.size) return nil } @@ -1033,7 +1036,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { ) p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], txs[i].costCap) p.stored -= uint64(tx.size) - delete(p.lookup, tx.hash) + p.lookup.untrack(tx) txs[i] = nil // Drop everything afterwards, no gaps allowed @@ -1043,7 +1046,7 @@ func (p *BlobPool) SetGasTip(tip *big.Int) { p.spent[addr] = new(uint256.Int).Sub(p.spent[addr], tx.costCap) p.stored -= uint64(tx.size) - delete(p.lookup, tx.hash) + p.lookup.untrack(tx) txs[i+1+j] = nil } // Clear out the dropped transactions from the index @@ -1171,8 +1174,7 @@ func (p *BlobPool) Has(hash common.Hash) bool { p.lock.RLock() defer p.lock.RUnlock() - _, ok := p.lookup[hash] - return ok + return p.lookup.exists(hash) } // Get returns a transaction if it is contained in the pool, or nil otherwise. @@ -1189,7 +1191,7 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { }(time.Now()) // Pull the blob from disk and return an assembled response - id, ok := p.lookup[hash] + id, ok := p.lookup.storeidOfTx(hash) if !ok { return nil } @@ -1206,6 +1208,58 @@ func (p *BlobPool) Get(hash common.Hash) *types.Transaction { return item } +// GetBlobs returns a number of blobs are proofs for the given versioned hashes. +// This is a utility method for the engine API, enabling consensus clients to +// retrieve blobs from the pools directly instead of the network. +func (p *BlobPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) { + // Create a map of the blob hash to indices for faster fills + var ( + blobs = make([]*kzg4844.Blob, len(vhashes)) + proofs = make([]*kzg4844.Proof, len(vhashes)) + ) + index := make(map[common.Hash]int) + for i, vhash := range vhashes { + index[vhash] = i + } + // Iterate over the blob hashes, pulling transactions that fill it. Take care + // to also fill anything else the transaction might include (probably will). + for i, vhash := range vhashes { + // If already filled by a previous fetch, skip + if blobs[i] != nil { + continue + } + // Unfilled, retrieve the datastore item (in a short lock) + p.lock.RLock() + id, exists := p.lookup.storeidOfBlob(vhash) + if !exists { + p.lock.RUnlock() + continue + } + data, err := p.store.Get(id) + p.lock.RUnlock() + + // After releasing the lock, try to fill any blobs requested + if err != nil { + log.Error("Tracked blob transaction missing from store", "id", id, "err", err) + continue + } + item := new(types.Transaction) + if err = rlp.DecodeBytes(data, item); err != nil { + log.Error("Blobs corrupted for traced transaction", "id", id, "err", err) + continue + } + // Fill anything requested, not just the current versioned hash + sidecar := item.BlobTxSidecar() + for j, blobhash := range item.BlobHashes() { + if idx, ok := index[blobhash]; ok { + blobs[idx] = &sidecar.Blobs[j] + proofs[idx] = &sidecar.Proofs[j] + } + } + } + return blobs, proofs +} + // Add inserts a set of blob transactions into the pool if they pass validation (both // consensus validity and pool restrictions). func (p *BlobPool) Add(txs []*types.Transaction, local bool, sync bool) []error { @@ -1319,8 +1373,8 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { p.spent[from] = new(uint256.Int).Sub(p.spent[from], prev.costCap) p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap) - delete(p.lookup, prev.hash) - p.lookup[meta.hash] = meta.id + p.lookup.untrack(prev) + p.lookup.track(meta) p.stored += uint64(meta.size) - uint64(prev.size) } else { // Transaction extends previously scheduled ones @@ -1330,7 +1384,7 @@ func (p *BlobPool) add(tx *types.Transaction) (err error) { newacc = true } p.spent[from] = new(uint256.Int).Add(p.spent[from], meta.costCap) - p.lookup[meta.hash] = meta.id + p.lookup.track(meta) p.stored += uint64(meta.size) } // Recompute the rolling eviction fields. In case of a replacement, this will @@ -1419,7 +1473,7 @@ func (p *BlobPool) drop() { p.spent[from] = new(uint256.Int).Sub(p.spent[from], drop.costCap) } p.stored -= uint64(drop.size) - delete(p.lookup, drop.hash) + p.lookup.untrack(drop) // Remove the transaction from the pool's eviction heap: // - If the entire account was dropped, pop off the address diff --git a/core/txpool/blobpool/blobpool_test.go b/core/txpool/blobpool/blobpool_test.go index 9c36cf39b3..721f7c6c2e 100644 --- a/core/txpool/blobpool/blobpool_test.go +++ b/core/txpool/blobpool/blobpool_test.go @@ -45,12 +45,28 @@ import ( ) var ( - emptyBlob = new(kzg4844.Blob) - emptyBlobCommit, _ = kzg4844.BlobToCommitment(emptyBlob) - emptyBlobProof, _ = kzg4844.ComputeBlobProof(emptyBlob, emptyBlobCommit) - emptyBlobVHash = kzg4844.CalcBlobHashV1(sha256.New(), &emptyBlobCommit) + testBlobs []*kzg4844.Blob + testBlobCommits []kzg4844.Commitment + testBlobProofs []kzg4844.Proof + testBlobVHashes [][32]byte ) +func init() { + for i := 0; i < 10; i++ { + testBlob := &kzg4844.Blob{byte(i)} + testBlobs = append(testBlobs, testBlob) + + testBlobCommit, _ := kzg4844.BlobToCommitment(testBlob) + testBlobCommits = append(testBlobCommits, testBlobCommit) + + testBlobProof, _ := kzg4844.ComputeBlobProof(testBlob, testBlobCommit) + testBlobProofs = append(testBlobProofs, testBlobProof) + + testBlobVHash := kzg4844.CalcBlobHashV1(sha256.New(), &testBlobCommit) + testBlobVHashes = append(testBlobVHashes, testBlobVHash) + } +} + // testBlockChain is a mock of the live chain for testing the pool. type testBlockChain struct { config *params.ChainConfig @@ -181,6 +197,12 @@ func makeTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, // makeUnsignedTx is a utility method to construct a random blob transaction // without signing it. func makeUnsignedTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64) *types.BlobTx { + return makeUnsignedTxWithTestBlob(nonce, gasTipCap, gasFeeCap, blobFeeCap, rand.Intn(len(testBlobs))) +} + +// makeUnsignedTx is a utility method to construct a random blob transaction +// without signing it. +func makeUnsignedTxWithTestBlob(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap uint64, blobIdx int) *types.BlobTx { return &types.BlobTx{ ChainID: uint256.MustFromBig(params.MainnetChainConfig.ChainID), Nonce: nonce, @@ -188,12 +210,12 @@ func makeUnsignedTx(nonce uint64, gasTipCap uint64, gasFeeCap uint64, blobFeeCap GasFeeCap: uint256.NewInt(gasFeeCap), Gas: 21000, BlobFeeCap: uint256.NewInt(blobFeeCap), - BlobHashes: []common.Hash{emptyBlobVHash}, + BlobHashes: []common.Hash{testBlobVHashes[blobIdx]}, Value: uint256.NewInt(100), Sidecar: &types.BlobTxSidecar{ - Blobs: []kzg4844.Blob{*emptyBlob}, - Commitments: []kzg4844.Commitment{emptyBlobCommit}, - Proofs: []kzg4844.Proof{emptyBlobProof}, + Blobs: []kzg4844.Blob{*testBlobs[blobIdx]}, + Commitments: []kzg4844.Commitment{testBlobCommits[blobIdx]}, + Proofs: []kzg4844.Proof{testBlobProofs[blobIdx]}, }, } } @@ -204,7 +226,7 @@ func verifyPoolInternals(t *testing.T, pool *BlobPool) { // Mark this method as a helper to remove from stack traces t.Helper() - // Verify that all items in the index are present in the lookup and nothing more + // Verify that all items in the index are present in the tx lookup and nothing more seen := make(map[common.Hash]struct{}) for addr, txs := range pool.index { for _, tx := range txs { @@ -214,14 +236,40 @@ func verifyPoolInternals(t *testing.T, pool *BlobPool) { seen[tx.hash] = struct{}{} } } - for hash, id := range pool.lookup { + for hash, id := range pool.lookup.txIndex { if _, ok := seen[hash]; !ok { - t.Errorf("lookup entry missing from transaction index: hash #%x, id %d", hash, id) + t.Errorf("tx lookup entry missing from transaction index: hash #%x, id %d", hash, id) } delete(seen, hash) } for hash := range seen { - t.Errorf("indexed transaction hash #%x missing from lookup table", hash) + t.Errorf("indexed transaction hash #%x missing from tx lookup table", hash) + } + // Verify that all blobs in the index are present in the blob lookup and nothing more + blobs := make(map[common.Hash]map[common.Hash]struct{}) + for _, txs := range pool.index { + for _, tx := range txs { + for _, vhash := range tx.vhashes { + if blobs[vhash] == nil { + blobs[vhash] = make(map[common.Hash]struct{}) + } + blobs[vhash][tx.hash] = struct{}{} + } + } + } + for vhash, txs := range pool.lookup.blobIndex { + for txhash := range txs { + if _, ok := blobs[vhash][txhash]; !ok { + t.Errorf("blob lookup entry missing from transaction index: blob hash #%x, tx hash #%x", vhash, txhash) + } + delete(blobs[vhash], txhash) + if len(blobs[vhash]) == 0 { + delete(blobs, vhash) + } + } + } + for vhash := range blobs { + t.Errorf("indexed transaction blob hash #%x missing from blob lookup table", vhash) } // Verify that transactions are sorted per account and contain no nonce gaps, // and that the first nonce is the next expected one based on the state. @@ -294,6 +342,53 @@ func verifyPoolInternals(t *testing.T, pool *BlobPool) { } // Verify the price heap internals verifyHeapInternals(t, pool.evict) + + // Verify that all the blobs can be retrieved + verifyBlobRetrievals(t, pool) +} + +// verifyBlobRetrievals attempts to retrieve all testing blobs and checks that +// whatever is in the pool, it can be retrieved correctly. +func verifyBlobRetrievals(t *testing.T, pool *BlobPool) { + // Collect all the blobs tracked by the pool + known := make(map[common.Hash]struct{}) + for _, txs := range pool.index { + for _, tx := range txs { + for _, vhash := range tx.vhashes { + known[vhash] = struct{}{} + } + } + } + // Attempt to retrieve all test blobs + hashes := make([]common.Hash, len(testBlobVHashes)) + for i := range testBlobVHashes { + copy(hashes[i][:], testBlobVHashes[i][:]) + } + blobs, proofs := pool.GetBlobs(hashes) + + // Cross validate what we received vs what we wanted + if len(blobs) != len(hashes) || len(proofs) != len(hashes) { + t.Errorf("retrieved blobs/proofs size mismatch: have %d/%d, want %d", len(blobs), len(proofs), len(hashes)) + return + } + for i, hash := range hashes { + // If an item is missing, but shouldn't, error + if blobs[i] == nil || proofs[i] == nil { + if _, ok := known[hash]; ok { + t.Errorf("tracked blob retrieval failed: item %d, hash %x", i, hash) + } + continue + } + // Item retrieved, make sure it matches the expectation + if *blobs[i] != *testBlobs[i] || *proofs[i] != testBlobProofs[i] { + t.Errorf("retrieved blob or proof mismatch: item %d, hash %x", i, hash) + continue + } + delete(known, hash) + } + for hash := range known { + t.Errorf("indexed blob #%x missing from retrieval", hash) + } } // Tests that transactions can be loaded from disk on startup and that they are @@ -969,21 +1064,21 @@ func TestAdd(t *testing.T) { "alice": { balance: 1000000, txs: []*types.BlobTx{ - makeUnsignedTx(0, 1, 1, 1), + makeUnsignedTxWithTestBlob(0, 1, 1, 1, 0), }, }, "bob": { balance: 1000000, nonce: 1, txs: []*types.BlobTx{ - makeUnsignedTx(1, 1, 1, 1), + makeUnsignedTxWithTestBlob(1, 1, 1, 1, 1), }, }, }, adds: []addtx{ { // New account, 1 tx pending: reject duplicate nonce 0 from: "alice", - tx: makeUnsignedTx(0, 1, 1, 1), + tx: makeUnsignedTxWithTestBlob(0, 1, 1, 1, 0), err: txpool.ErrAlreadyKnown, }, { // New account, 1 tx pending: reject replacement nonce 0 (ignore price for now) @@ -1013,7 +1108,7 @@ func TestAdd(t *testing.T) { }, { // Old account, 1 tx in chain, 1 tx pending: reject duplicate nonce 1 from: "bob", - tx: makeUnsignedTx(1, 1, 1, 1), + tx: makeUnsignedTxWithTestBlob(1, 1, 1, 1, 1), err: txpool.ErrAlreadyKnown, }, { // Old account, 1 tx in chain, 1 tx pending: accept nonce 2 (ignore price for now) diff --git a/core/txpool/blobpool/lookup.go b/core/txpool/blobpool/lookup.go new file mode 100644 index 0000000000..2d8d0fd2bf --- /dev/null +++ b/core/txpool/blobpool/lookup.go @@ -0,0 +1,91 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package blobpool + +import ( + "github.com/ethereum/go-ethereum/common" +) + +// lookup maps blob versioned hashes to transaction hashes that include them, +// and transaction hashes to billy entries that include them. +type lookup struct { + blobIndex map[common.Hash]map[common.Hash]struct{} + txIndex map[common.Hash]uint64 +} + +// newLookup creates a new index for tracking blob to tx; and tx to billy mappings. +func newLookup() *lookup { + return &lookup{ + blobIndex: make(map[common.Hash]map[common.Hash]struct{}), + txIndex: make(map[common.Hash]uint64), + } +} + +// exists returns whether a transaction is already tracked or not. +func (l *lookup) exists(txhash common.Hash) bool { + _, exists := l.txIndex[txhash] + return exists +} + +// storeidOfTx returns the datastore storage item id of a transaction. +func (l *lookup) storeidOfTx(txhash common.Hash) (uint64, bool) { + id, ok := l.txIndex[txhash] + return id, ok +} + +// storeidOfBlob returns the datastore storage item id of a blob. +func (l *lookup) storeidOfBlob(vhash common.Hash) (uint64, bool) { + // If the blob is unknown, return a miss + txs, ok := l.blobIndex[vhash] + if !ok { + return 0, false + } + // If the blob is known, return any tx for it + for tx := range txs { + return l.storeidOfTx(tx) + } + return 0, false // Weird, don't choke +} + +// track inserts a new set of mappings from blob versioned hashes to transaction +// hashes; and from transaction hashes to datastore storage item ids. +func (l *lookup) track(tx *blobTxMeta) { + // Map all the blobs to the transaction hash + for _, vhash := range tx.vhashes { + if _, ok := l.blobIndex[vhash]; !ok { + l.blobIndex[vhash] = make(map[common.Hash]struct{}) + } + l.blobIndex[vhash][tx.hash] = struct{}{} // may be double mapped if a tx contains the same blob twice + } + // Map the transaction hash to the datastore id + l.txIndex[tx.hash] = tx.id +} + +// untrack removes a set of mappings from blob versioned hashes to transaction +// hashes from the blob index. +func (l *lookup) untrack(tx *blobTxMeta) { + // Unmap the transaction hash from the datastore id + delete(l.txIndex, tx.hash) + + // Unmap all the blobs from the transaction hash + for _, vhash := range tx.vhashes { + delete(l.blobIndex[vhash], tx.hash) // may be double deleted if a tx contains the same blob twice + if len(l.blobIndex[vhash]) == 0 { + delete(l.blobIndex, vhash) + } + } +} diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 5506ecc31f..f7495dd39f 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -33,6 +33,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -1077,6 +1078,12 @@ func (pool *LegacyPool) get(hash common.Hash) *types.Transaction { return pool.all.Get(hash) } +// GetBlobs is not supported by the legacy transaction pool, it is just here to +// implement the txpool.SubPool interface. +func (pool *LegacyPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) { + return nil, nil +} + // Has returns an indicator whether txpool has a transaction cached with the // given hash. func (pool *LegacyPool) Has(hash common.Hash) bool { diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 9881ed1b8f..180facd217 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" "github.com/holiman/uint256" ) @@ -123,6 +124,11 @@ type SubPool interface { // Get returns a transaction if it is contained in the pool, or nil otherwise. Get(hash common.Hash) *types.Transaction + // GetBlobs returns a number of blobs are proofs for the given versioned hashes. + // This is a utility method for the engine API, enabling consensus clients to + // retrieve blobs from the pools directly instead of the network. + GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) + // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 5ce69e3763..363fa29c02 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto/kzg4844" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -305,6 +306,22 @@ func (p *TxPool) Get(hash common.Hash) *types.Transaction { return nil } +// GetBlobs returns a number of blobs are proofs for the given versioned hashes. +// This is a utility method for the engine API, enabling consensus clients to +// retrieve blobs from the pools directly instead of the network. +func (p *TxPool) GetBlobs(vhashes []common.Hash) ([]*kzg4844.Blob, []*kzg4844.Proof) { + for _, subpool := range p.subpools { + // It's an ugly to assume that only one pool will be capable of returning + // anything meaningful for this call, but anythingh else requires merging + // partial responses and that's too annoying to do until we get a second + // blobpool (probably never). + if blobs, proofs := subpool.GetBlobs(vhashes); blobs != nil { + return blobs, proofs + } + } + return nil, nil +} + // Add enqueues a batch of transactions into the pool if they are valid. Due // to the large transaction churn, add may postpone fully integrating the tx // to a later point to batch multiple ones together. diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index c91b4fe546..75b49bf336 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -93,6 +93,7 @@ var caps = []string{ "engine_getPayloadV2", "engine_getPayloadV3", "engine_getPayloadV4", + "engine_getBlobsV1", "engine_newPayloadV1", "engine_newPayloadV2", "engine_newPayloadV3", @@ -536,6 +537,25 @@ func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool) (*eng return data, nil } +// GetBlobsV1 returns a blob from the transaction pool. +func (api *ConsensusAPI) GetBlobsV1(hashes []common.Hash) ([]*engine.BlobAndProofV1, error) { + if len(hashes) > 128 { + return nil, engine.TooLargeRequest.With(fmt.Errorf("requested blob count too large: %v", len(hashes))) + } + res := make([]*engine.BlobAndProofV1, len(hashes)) + + blobs, proofs := api.eth.TxPool().GetBlobs(hashes) + for i := 0; i < len(blobs); i++ { + if blobs[i] != nil { + res[i] = &engine.BlobAndProofV1{ + Blob: (*blobs[i])[:], + Proof: (*proofs[i])[:], + } + } + } + return res, nil +} + // NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain. func (api *ConsensusAPI) NewPayloadV1(params engine.ExecutableData) (engine.PayloadStatusV1, error) { if params.Withdrawals != nil {