tx_pool, heartbeat

This commit is contained in:
Felipe Andrade 2023-07-12 12:20:23 -07:00
parent 660539b3bf
commit 7de9c9838a
4 changed files with 45 additions and 18 deletions

@ -55,6 +55,7 @@ type WalletConfig struct {
type ProviderConfig struct {
Disabled bool `toml:"disabled"`
Network string `toml:"network"`
URL string `toml:"url"`
ReadOnly bool `toml:"read_only"`
ReadInterval TOMLDuration `toml:"read_interval"`

@ -12,6 +12,7 @@ type Provider struct {
config *config.ProviderConfig
signerConfig *config.SignerServiceConfig
walletConfig *config.WalletConfig
txPool *NetworkTransactionPool
cancelFunc context.CancelFunc
client *http.Client
@ -19,12 +20,14 @@ type Provider struct {
func New(name string, cfg *config.ProviderConfig,
signerConfig *config.SignerServiceConfig,
walletConfig *config.WalletConfig) *Provider {
walletConfig *config.WalletConfig,
txPool *NetworkTransactionPool) *Provider {
p := &Provider{
name: name,
config: cfg,
signerConfig: signerConfig,
walletConfig: walletConfig,
txPool: txPool,
client: http.DefaultClient,
}

@ -16,9 +16,9 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// Heartbeat poll for expected transactions
func (p *Provider) Heartbeat(ctx context.Context) {
log.Debug("heartbeat", "provider", p.name)
// Roundtrip send a new transaction to measure round trip latency
func (p *Provider) Roundtrip(ctx context.Context) {
log.Debug("roundtrip", "provider", p.name)
ethClient, err := p.dial(ctx)
if err != nil {
@ -44,17 +44,26 @@ func (p *Provider) Heartbeat(ctx context.Context) {
txHash := signedTx.Hash()
log.Info("transaction sent", "hash", txHash.Hex())
// add to pool
sentAt := time.Now()
p.txPool.M.Lock()
p.txPool.Transactions[txHash.Hex()] = &TransactionState{
Hash: txHash,
SentAt: sentAt,
SeenBy: make(map[string]time.Time),
}
p.txPool.M.Unlock()
var receipt *types.Receipt
attempt := 0
for receipt == nil {
if time.Since(sentAt) >= time.Duration(p.config.ReceiptRetrievalTimeout) {
log.Error("receipt retrieval timedout", "provider", p.name, "hash", "ellapsed", time.Since(sentAt))
log.Error("receipt retrieval timed out", "provider", p.name, "hash", "elapsed", time.Since(sentAt))
break
}
time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval))
if attempt%10 == 0 {
log.Debug("checking for receipt...", "attempt", attempt, "ellapsed", time.Since(sentAt))
log.Debug("checking for receipt...", "attempt", attempt, "elapsed", time.Since(sentAt))
}
receipt, err = ethClient.TransactionReceipt(ctx, txHash)
if err != nil && !errors.Is(err, ethereum.NotFound) {
@ -72,10 +81,6 @@ func (p *Provider) Heartbeat(ctx context.Context) {
"gasUsed", receipt.GasUsed)
}
func (p *Provider) dial(ctx context.Context) (*ethclient.Client, error) {
return ethclient.Dial(p.config.URL)
}
func (p *Provider) createTx(nonce uint64) *types.Transaction {
toAddress := common.HexToAddress(p.walletConfig.Address)
var data []byte
@ -89,7 +94,7 @@ func (p *Provider) createTx(nonce uint64) *types.Transaction {
Value: &p.walletConfig.TxValue,
Data: data,
})
log.Debug("tx", "tx", tx)
// log.Debug("tx", "tx", tx)
return tx
}
@ -118,7 +123,6 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran
}
signedTx, err := client.SignTransaction(ctx, &p.walletConfig.ChainID, tx)
log.Debug("signedtx", "tx", signedTx, "err", err)
if err != nil {
return nil, err
}
@ -133,8 +137,3 @@ func (p *Provider) nonce(ctx context.Context, client *ethclient.Client) (uint64,
fromAddress := common.HexToAddress(p.walletConfig.Address)
return client.PendingNonceAt(ctx, fromAddress)
}
// Roundtrip send a new transaction to measure round trip latency
func (p *Provider) Roundtrip(ctx context.Context) {
log.Debug("roundtrip", "provider", p.name)
}

@ -29,12 +29,36 @@ func (s *Service) Start(ctx context.Context) {
s.Healthz.Start(ctx, s.Config.Healthz.Host, s.Config.Healthz.Port)
log.Info("healthz started")
}
// map networks to its providers
networks := make(map[string][]string)
for name, providerConfig := range s.Config.Providers {
if providerConfig.Disabled {
continue
}
networks[providerConfig.Network] = append(networks[providerConfig.Network], name)
}
txpool := &provider.TransactionPool{}
for name, providers := range networks {
if len(providers) == 1 {
log.Warn("can't measure first seen for network, please another provider", "network", name)
}
(*txpool)[name] = &provider.NetworkTransactionPool{}
(*txpool)[name].Transactions = make(map[string]*provider.TransactionState)
(*txpool)[name].Expected = len(providers)
}
for name, providerConfig := range s.Config.Providers {
if providerConfig.Disabled {
log.Info("provider is disabled", "provider", name)
continue
}
s.Providers[name] = provider.New(name, providerConfig, &s.Config.Signer, s.Config.Wallets[providerConfig.Wallet])
s.Providers[name] = provider.New(name,
providerConfig,
&s.Config.Signer,
s.Config.Wallets[providerConfig.Wallet],
(*txpool)[providerConfig.Network])
s.Providers[name].Start(ctx)
log.Info("provider started", "provider", name)
}