fix: allow multicall channel to handle multiple items to avoid blocking after inital response is returned. Improve metrics reporting (#35)

This commit is contained in:
Jacob Elias 2024-07-30 13:28:59 -05:00 committed by GitHub
parent ef202f9167
commit ec84826c4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 7 deletions

@ -835,7 +835,7 @@ func (bg *BackendGroup) ExecuteMulticall(ctx context.Context, rpcReqs []*RPCReq)
"auth", GetAuthCtx(bgCtx), "auth", GetAuthCtx(bgCtx),
) )
var wg sync.WaitGroup var wg sync.WaitGroup
ch := make(chan *multicallTuple) ch := make(chan *multicallTuple, len(bg.Backends))
for _, backend := range bg.Backends { for _, backend := range bg.Backends {
wg.Add(1) wg.Add(1)
go bg.MulticallRequest(backend, rpcReqs, &wg, bgCtx, ch) go bg.MulticallRequest(backend, rpcReqs, &wg, bgCtx, ch)
@ -843,6 +843,10 @@ func (bg *BackendGroup) ExecuteMulticall(ctx context.Context, rpcReqs []*RPCReq)
go func() { go func() {
wg.Wait() wg.Wait()
log.Debug("closing multicall channel",
"req_id", GetReqID(bgCtx),
"auth", GetAuthCtx(bgCtx),
)
close(ch) close(ch)
}() }()
@ -865,13 +869,20 @@ func (bg *BackendGroup) MulticallRequest(backend *Backend, rpcReqs []*RPCReq, wg
backendName: backend.Name, backendName: backend.Name,
} }
ch <- multicallResp log.Debug("placing multicall response into channel",
log.Debug("received multicall response from backend",
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx), "auth", GetAuthCtx(ctx),
"backend", backend.Name, "backend", backend.Name,
) )
ch <- multicallResp
log.Trace("placed multicall response into channel",
"req_id", GetReqID(ctx),
"auth", GetAuthCtx(ctx),
"backend", backend.Name,
)
if backendResp.error != nil { if backendResp.error != nil {
log.Error("received multicall error response from backend", log.Error("received multicall error response from backend",
"req_id", GetReqID(ctx), "req_id", GetReqID(ctx),
@ -879,6 +890,9 @@ func (bg *BackendGroup) MulticallRequest(backend *Backend, rpcReqs []*RPCReq, wg
"backend", backend.Name, "backend", backend.Name,
"error", backendResp.error.Error(), "error", backendResp.error.Error(),
) )
RecordBackendGroupMulticallCompletion(bg, backend.Name, backendResp.error.Error())
} else {
RecordBackendGroupMulticallCompletion(bg, backend.Name, "nil")
} }
} }
@ -906,7 +920,6 @@ func (bg *BackendGroup) ProcessMulticallResponses(ch chan *multicallTuple, ctx c
i++ i++
resp := multicallResp.response resp := multicallResp.response
backendName := multicallResp.backendName backendName := multicallResp.backendName
RecordBackendGroupMulticallCompletion(bg, backendName)
if resp.error != nil { if resp.error != nil {
log.Error("received error response from multicall channel", log.Error("received error response from multicall channel",

@ -445,6 +445,7 @@ var (
}, []string{ }, []string{
"backend_group", "backend_group",
"backend_name", "backend_name",
"error",
}) })
) )
@ -615,8 +616,8 @@ func RecordBackendGroupMulticallRequest(bg *BackendGroup, backendName string) {
backendGroupMulticallCounter.WithLabelValues(bg.Name, backendName).Inc() backendGroupMulticallCounter.WithLabelValues(bg.Name, backendName).Inc()
} }
func RecordBackendGroupMulticallCompletion(bg *BackendGroup, backendName string) { func RecordBackendGroupMulticallCompletion(bg *BackendGroup, backendName string, error string) {
backendGroupMulticallCompletionCounter.WithLabelValues(bg.Name, backendName).Inc() backendGroupMulticallCompletionCounter.WithLabelValues(bg.Name, backendName, error).Inc()
} }
func boolToFloat64(b bool) float64 { func boolToFloat64(b bool) float64 {