From 4ea6a054c374a3c59d7624fb74d1ef0e72c3642c Mon Sep 17 00:00:00 2001 From: Matthew Slipper Date: Wed, 3 Aug 2022 19:36:07 -0600 Subject: [PATCH] proxyd: Unwrap single RPC batches (#3165) * proxyd: Unwrap single RPC batches * Update backend.go --- proxyd/proxyd/backend.go | 32 +++++++++++++++---- .../integration_tests/mock_backend_test.go | 6 +++- proxyd/proxyd/integration_tests/ws_test.go | 5 +++ 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 46e5225..f4d7d93 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -349,7 +349,17 @@ func (b *Backend) setOffline() { } func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { - body := mustMarshalJSON(rpcReqs) + isSingleElementBatch := len(rpcReqs) == 1 + + // Single element batches are unwrapped before being sent + // since Alchemy handles single requests better than batches. + + var body []byte + if isSingleElementBatch { + body = mustMarshalJSON(rpcReqs[0]) + } else { + body = mustMarshalJSON(rpcReqs) + } httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) if err != nil { @@ -402,12 +412,22 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool } var res []*RPCRes - if err := json.Unmarshal(resB, &res); err != nil { - // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method - if responseIsNotBatched(resB) { - return nil, ErrBackendUnexpectedJSONRPC + if isSingleElementBatch { + var singleRes RPCRes + if err := json.Unmarshal(resB, &singleRes); err != nil { + return nil, ErrBackendBadResponse + } + res = []*RPCRes{ + &singleRes, + } + } else { + if err := json.Unmarshal(resB, &res); err != nil { + // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method + if responseIsNotBatched(resB) { + return nil, ErrBackendUnexpectedJSONRPC + } + return nil, ErrBackendBadResponse } - return nil, ErrBackendBadResponse } if len(rpcReqs) != len(res) { diff --git a/proxyd/proxyd/integration_tests/mock_backend_test.go b/proxyd/proxyd/integration_tests/mock_backend_test.go index b84b7d7..aa8f0ba 100644 --- a/proxyd/proxyd/integration_tests/mock_backend_test.go +++ b/proxyd/proxyd/integration_tests/mock_backend_test.go @@ -35,8 +35,12 @@ func SingleResponseHandler(code int, response string) http.HandlerFunc { } func BatchedResponseHandler(code int, responses ...string) http.HandlerFunc { - // all proxyd upstream requests are batched return func(w http.ResponseWriter, r *http.Request) { + if len(responses) == 1 { + SingleResponseHandler(code, responses[0])(w, r) + return + } + var body string body += "[" for i, response := range responses { diff --git a/proxyd/proxyd/integration_tests/ws_test.go b/proxyd/proxyd/integration_tests/ws_test.go index 6a63f71..d93d3a7 100644 --- a/proxyd/proxyd/integration_tests/ws_test.go +++ b/proxyd/proxyd/integration_tests/ws_test.go @@ -44,11 +44,14 @@ func TestConcurrentWSPanic(t *testing.T) { <-readyCh + var wg sync.WaitGroup + wg.Add(2) // spam messages go func() { for { select { case <-quitC: + wg.Done() return default: _ = backendToProxyConn.WriteMessage(websocket.TextMessage, []byte("garbage")) @@ -61,6 +64,7 @@ func TestConcurrentWSPanic(t *testing.T) { for { select { case <-quitC: + wg.Done() return default: _ = client.WriteMessage(websocket.TextMessage, []byte("{\"id\": 1, \"method\": \"eth_foo\", \"params\": [\"newHeads\"]}")) @@ -72,6 +76,7 @@ func TestConcurrentWSPanic(t *testing.T) { // concurrent write to websocket connection time.Sleep(time.Second) close(quitC) + wg.Wait() } type backendHandler struct {