From ab3b5de5d151d206eb5a5793803097558ddac5d6 Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Wed, 12 Jul 2023 12:23:52 -0700 Subject: [PATCH] add . --- op-ufm/op-ufm/pkg/provider/common.go | 11 +++++ op-ufm/op-ufm/pkg/provider/heartbeat.go | 54 +++++++++++++++++++++++++ op-ufm/op-ufm/pkg/provider/tx_pool.go | 34 ++++++++++++++++ 3 files changed, 99 insertions(+) create mode 100644 op-ufm/op-ufm/pkg/provider/common.go create mode 100644 op-ufm/op-ufm/pkg/provider/heartbeat.go create mode 100644 op-ufm/op-ufm/pkg/provider/tx_pool.go diff --git a/op-ufm/op-ufm/pkg/provider/common.go b/op-ufm/op-ufm/pkg/provider/common.go new file mode 100644 index 0000000..96ec90f --- /dev/null +++ b/op-ufm/op-ufm/pkg/provider/common.go @@ -0,0 +1,11 @@ +package provider + +import ( + "context" + + "github.com/ethereum/go-ethereum/ethclient" +) + +func (p *Provider) dial(ctx context.Context) (*ethclient.Client, error) { + return ethclient.Dial(p.config.URL) +} diff --git a/op-ufm/op-ufm/pkg/provider/heartbeat.go b/op-ufm/op-ufm/pkg/provider/heartbeat.go new file mode 100644 index 0000000..223be9a --- /dev/null +++ b/op-ufm/op-ufm/pkg/provider/heartbeat.go @@ -0,0 +1,54 @@ +package provider + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/log" + "github.com/pkg/errors" +) + +// Heartbeat polls for expected in-flight transactions +func (p *Provider) Heartbeat(ctx context.Context) { + log.Debug("heartbeat", "provider", p.name) + + if len(p.txPool.Transactions) == 0 { + log.Debug("no in-flight txs", "provider", p.name) + return + } + + ethClient, err := p.dial(ctx) + if err != nil { + log.Error("cant dial to provider", "provider", p.name, "url", p.config.URL, "err", err) + } + + log.Debug("checking in-flight tx", "count", len(p.txPool.Transactions), "provider", p.name) + for hash, st := range p.txPool.Transactions { + log.Debug(hash, "st", st) + + _, isPending, err := ethClient.TransactionByHash(ctx, st.Hash) + if err != nil && !errors.Is(err, ethereum.NotFound) { + log.Error("cant check transaction", "provider", p.name, "url", p.config.URL, "err", err) + continue + } + + log.Debug("got transaction", "provider", p.name, "hash", hash, "isPending", isPending) + st.M.Lock() + if st.FirstSeen.IsZero() { + st.FirstSeen = time.Now() + } + if _, exist := st.SeenBy[p.name]; !exist { + st.SeenBy[p.name] = time.Now() + } + st.M.Unlock() + + p.txPool.M.Lock() + // every provider has seen this transaction + if len(st.SeenBy) == p.txPool.Expected { + log.Debug("transaction seen by all", "hash", hash) + delete(p.txPool.Transactions, hash) + } + p.txPool.M.Unlock() + } +} diff --git a/op-ufm/op-ufm/pkg/provider/tx_pool.go b/op-ufm/op-ufm/pkg/provider/tx_pool.go new file mode 100644 index 0000000..562d77a --- /dev/null +++ b/op-ufm/op-ufm/pkg/provider/tx_pool.go @@ -0,0 +1,34 @@ +package provider + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" +) + +// TransactionPool is used locally to share transactions between providers under the same pool +type TransactionPool map[string]*NetworkTransactionPool + +// NetworkTransactionPool is used locally to share transactions between providers under the same network +type NetworkTransactionPool struct { + M sync.Mutex + Transactions map[string]*TransactionState + Expected int +} + +type TransactionState struct { + // Transaction hash + Hash common.Hash + + // Mutex + M sync.Mutex + + SentAt time.Time + + FirstSeen time.Time + + // Map of providers that have seen this transaction, and when + // Once all providers have seen the transaction it is removed from the pool + SeenBy map[string]time.Time +}