infra/op-ufm/pkg/provider/heartbeat.go

105 lines
2.7 KiB
Go
Raw Normal View History

2023-07-12 22:23:52 +03:00
package provider
import (
"context"
"time"
2023-07-15 01:17:02 +03:00
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics"
"github.com/ethereum-optimism/optimism/op-ufm/pkg/metrics/clients"
2023-07-12 22:23:52 +03:00
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/log"
"github.com/pkg/errors"
)
// Heartbeat polls for expected in-flight transactions
func (p *Provider) Heartbeat(ctx context.Context) {
2023-08-31 23:31:34 +03:00
log.Debug("heartbeat",
"provider", p.name,
"count", len(p.txPool.Transactions))
2023-07-12 22:23:52 +03:00
metrics.RecordTransactionsInFlight(p.config.Network, len(p.txPool.Transactions))
// let's exclude transactions already seen by this provider, or originated by it
expectedTransactions := make([]*TransactionState, 0, len(p.txPool.Transactions))
alreadySeen := 0
for _, st := range p.txPool.Transactions {
2023-07-18 20:21:19 +03:00
if st.ProviderSource == p.name {
continue
}
if _, exist := st.SeenBy[p.name]; exist {
alreadySeen++
continue
}
expectedTransactions = append(expectedTransactions, st)
}
if len(expectedTransactions) == 0 {
2023-08-31 23:31:34 +03:00
log.Debug("no expected txs",
"count", len(p.txPool.Transactions),
"provider", p.name,
"alreadySeen", alreadySeen)
2023-07-12 22:23:52 +03:00
return
}
client, err := clients.Dial(p.name, p.config.URL)
2023-07-12 22:23:52 +03:00
if err != nil {
2023-08-31 23:31:34 +03:00
log.Error("cant dial to provider",
"provider", p.name,
"url", p.config.URL,
"err", err)
2023-07-12 22:23:52 +03:00
}
2023-08-31 23:31:34 +03:00
log.Debug("checking in-flight tx",
"count", len(p.txPool.Transactions),
"provider", p.name,
"alreadySeen", alreadySeen)
for _, st := range expectedTransactions {
hash := st.Hash.Hex()
2023-07-12 22:23:52 +03:00
_, isPending, err := client.TransactionByHash(ctx, st.Hash)
2023-07-12 22:23:52 +03:00
if err != nil && !errors.Is(err, ethereum.NotFound) {
2023-08-31 23:31:34 +03:00
log.Error("cant check transaction",
"provider", p.name,
"hash", hash,
"url", p.config.URL,
"err", err)
2023-07-12 22:23:52 +03:00
continue
}
2023-08-31 23:31:34 +03:00
log.Debug("got transaction",
"provider", p.name,
"hash", hash,
"isPending", isPending)
// mark transaction as seen by this provider
2023-07-12 22:23:52 +03:00
st.M.Lock()
2023-07-15 00:08:02 +03:00
latency := time.Since(st.SentAt)
2023-07-12 22:23:52 +03:00
if st.FirstSeen.IsZero() {
st.FirstSeen = time.Now()
2023-07-18 20:21:19 +03:00
metrics.RecordFirstSeenLatency(st.ProviderSource, p.name, latency)
log.Info("transaction first seen",
"hash", hash,
2023-07-15 00:08:02 +03:00
"firstSeenLatency", latency,
2023-07-18 20:21:19 +03:00
"providerSource", st.ProviderSource,
2023-07-15 00:08:02 +03:00
"providerSeen", p.name)
2023-07-12 22:23:52 +03:00
}
if _, exist := st.SeenBy[p.name]; !exist {
st.SeenBy[p.name] = time.Now()
2023-07-18 20:21:19 +03:00
metrics.RecordProviderToProviderLatency(st.ProviderSource, p.name, latency)
2023-07-12 22:23:52 +03:00
}
st.M.Unlock()
// check if transaction have been seen by all providers
2023-07-12 22:23:52 +03:00
p.txPool.M.Lock()
if len(st.SeenBy) == p.txPool.Expected {
2023-08-31 23:31:34 +03:00
log.Debug("transaction seen by all",
"hash", hash,
"expected", p.txPool.Expected,
"seenBy", len(st.SeenBy))
delete(p.txPool.Transactions, st.Hash.Hex())
2023-07-12 22:23:52 +03:00
}
p.txPool.M.Unlock()
}
}