From 7de9c9838ae665e37c6a39c6bb0a5cbfd554388d Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Wed, 12 Jul 2023 12:20:23 -0700 Subject: [PATCH] tx_pool, heartbeat --- op-ufm/op-ufm/pkg/config/config.go | 1 + op-ufm/op-ufm/pkg/provider/provider.go | 5 ++- .../provider/{handlers.go => roundtrip.go} | 31 +++++++++---------- op-ufm/op-ufm/pkg/service/service.go | 26 +++++++++++++++- 4 files changed, 45 insertions(+), 18 deletions(-) rename op-ufm/op-ufm/pkg/provider/{handlers.go => roundtrip.go} (87%) diff --git a/op-ufm/op-ufm/pkg/config/config.go b/op-ufm/op-ufm/pkg/config/config.go index 1c24274..ee125e5 100644 --- a/op-ufm/op-ufm/pkg/config/config.go +++ b/op-ufm/op-ufm/pkg/config/config.go @@ -55,6 +55,7 @@ type WalletConfig struct { type ProviderConfig struct { Disabled bool `toml:"disabled"` + Network string `toml:"network"` URL string `toml:"url"` ReadOnly bool `toml:"read_only"` ReadInterval TOMLDuration `toml:"read_interval"` diff --git a/op-ufm/op-ufm/pkg/provider/provider.go b/op-ufm/op-ufm/pkg/provider/provider.go index f3a553e..e7afd82 100644 --- a/op-ufm/op-ufm/pkg/provider/provider.go +++ b/op-ufm/op-ufm/pkg/provider/provider.go @@ -12,6 +12,7 @@ type Provider struct { config *config.ProviderConfig signerConfig *config.SignerServiceConfig walletConfig *config.WalletConfig + txPool *NetworkTransactionPool cancelFunc context.CancelFunc client *http.Client @@ -19,12 +20,14 @@ type Provider struct { func New(name string, cfg *config.ProviderConfig, signerConfig *config.SignerServiceConfig, - walletConfig *config.WalletConfig) *Provider { + walletConfig *config.WalletConfig, + txPool *NetworkTransactionPool) *Provider { p := &Provider{ name: name, config: cfg, signerConfig: signerConfig, walletConfig: walletConfig, + txPool: txPool, client: http.DefaultClient, } diff --git a/op-ufm/op-ufm/pkg/provider/handlers.go b/op-ufm/op-ufm/pkg/provider/roundtrip.go similarity index 87% rename from op-ufm/op-ufm/pkg/provider/handlers.go rename to op-ufm/op-ufm/pkg/provider/roundtrip.go index adb60e4..684af58 100644 --- a/op-ufm/op-ufm/pkg/provider/handlers.go +++ b/op-ufm/op-ufm/pkg/provider/roundtrip.go @@ -16,9 +16,9 @@ import ( "github.com/ethereum/go-ethereum/log" ) -// Heartbeat poll for expected transactions -func (p *Provider) Heartbeat(ctx context.Context) { - log.Debug("heartbeat", "provider", p.name) +// Roundtrip send a new transaction to measure round trip latency +func (p *Provider) Roundtrip(ctx context.Context) { + log.Debug("roundtrip", "provider", p.name) ethClient, err := p.dial(ctx) if err != nil { @@ -44,17 +44,26 @@ func (p *Provider) Heartbeat(ctx context.Context) { txHash := signedTx.Hash() log.Info("transaction sent", "hash", txHash.Hex()) + // add to pool sentAt := time.Now() + p.txPool.M.Lock() + p.txPool.Transactions[txHash.Hex()] = &TransactionState{ + Hash: txHash, + SentAt: sentAt, + SeenBy: make(map[string]time.Time), + } + p.txPool.M.Unlock() + var receipt *types.Receipt attempt := 0 for receipt == nil { if time.Since(sentAt) >= time.Duration(p.config.ReceiptRetrievalTimeout) { - log.Error("receipt retrieval timedout", "provider", p.name, "hash", "ellapsed", time.Since(sentAt)) + log.Error("receipt retrieval timed out", "provider", p.name, "hash", "elapsed", time.Since(sentAt)) break } time.Sleep(time.Duration(p.config.ReceiptRetrievalInterval)) if attempt%10 == 0 { - log.Debug("checking for receipt...", "attempt", attempt, "ellapsed", time.Since(sentAt)) + log.Debug("checking for receipt...", "attempt", attempt, "elapsed", time.Since(sentAt)) } receipt, err = ethClient.TransactionReceipt(ctx, txHash) if err != nil && !errors.Is(err, ethereum.NotFound) { @@ -72,10 +81,6 @@ func (p *Provider) Heartbeat(ctx context.Context) { "gasUsed", receipt.GasUsed) } -func (p *Provider) dial(ctx context.Context) (*ethclient.Client, error) { - return ethclient.Dial(p.config.URL) -} - func (p *Provider) createTx(nonce uint64) *types.Transaction { toAddress := common.HexToAddress(p.walletConfig.Address) var data []byte @@ -89,7 +94,7 @@ func (p *Provider) createTx(nonce uint64) *types.Transaction { Value: &p.walletConfig.TxValue, Data: data, }) - log.Debug("tx", "tx", tx) + // log.Debug("tx", "tx", tx) return tx } @@ -118,7 +123,6 @@ func (p *Provider) sign(ctx context.Context, tx *types.Transaction) (*types.Tran } signedTx, err := client.SignTransaction(ctx, &p.walletConfig.ChainID, tx) - log.Debug("signedtx", "tx", signedTx, "err", err) if err != nil { return nil, err } @@ -133,8 +137,3 @@ func (p *Provider) nonce(ctx context.Context, client *ethclient.Client) (uint64, fromAddress := common.HexToAddress(p.walletConfig.Address) return client.PendingNonceAt(ctx, fromAddress) } - -// Roundtrip send a new transaction to measure round trip latency -func (p *Provider) Roundtrip(ctx context.Context) { - log.Debug("roundtrip", "provider", p.name) -} diff --git a/op-ufm/op-ufm/pkg/service/service.go b/op-ufm/op-ufm/pkg/service/service.go index 055ce1a..17f0b56 100644 --- a/op-ufm/op-ufm/pkg/service/service.go +++ b/op-ufm/op-ufm/pkg/service/service.go @@ -29,12 +29,36 @@ func (s *Service) Start(ctx context.Context) { s.Healthz.Start(ctx, s.Config.Healthz.Host, s.Config.Healthz.Port) log.Info("healthz started") } + + // map networks to its providers + networks := make(map[string][]string) + for name, providerConfig := range s.Config.Providers { + if providerConfig.Disabled { + continue + } + networks[providerConfig.Network] = append(networks[providerConfig.Network], name) + } + + txpool := &provider.TransactionPool{} + for name, providers := range networks { + if len(providers) == 1 { + log.Warn("can't measure first seen for network, please another provider", "network", name) + } + (*txpool)[name] = &provider.NetworkTransactionPool{} + (*txpool)[name].Transactions = make(map[string]*provider.TransactionState) + (*txpool)[name].Expected = len(providers) + } + for name, providerConfig := range s.Config.Providers { if providerConfig.Disabled { log.Info("provider is disabled", "provider", name) continue } - s.Providers[name] = provider.New(name, providerConfig, &s.Config.Signer, s.Config.Wallets[providerConfig.Wallet]) + s.Providers[name] = provider.New(name, + providerConfig, + &s.Config.Signer, + s.Config.Wallets[providerConfig.Wallet], + (*txpool)[providerConfig.Network]) s.Providers[name].Start(ctx) log.Info("provider started", "provider", name) }