proxyd: Unwrap single RPC batches (#3165)

* proxyd: Unwrap single RPC batches

* Update backend.go
This commit is contained in:
Matthew Slipper 2022-08-03 19:36:07 -06:00 committed by GitHub
parent cd0afa3176
commit 4ea6a054c3
3 changed files with 36 additions and 7 deletions

@ -349,7 +349,17 @@ func (b *Backend) setOffline() {
} }
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
body := mustMarshalJSON(rpcReqs) isSingleElementBatch := len(rpcReqs) == 1
// Single element batches are unwrapped before being sent
// since Alchemy handles single requests better than batches.
var body []byte
if isSingleElementBatch {
body = mustMarshalJSON(rpcReqs[0])
} else {
body = mustMarshalJSON(rpcReqs)
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil { if err != nil {
@ -402,12 +412,22 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
} }
var res []*RPCRes var res []*RPCRes
if err := json.Unmarshal(resB, &res); err != nil { if isSingleElementBatch {
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method var singleRes RPCRes
if responseIsNotBatched(resB) { if err := json.Unmarshal(resB, &singleRes); err != nil {
return nil, ErrBackendUnexpectedJSONRPC return nil, ErrBackendBadResponse
}
res = []*RPCRes{
&singleRes,
}
} else {
if err := json.Unmarshal(resB, &res); err != nil {
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) {
return nil, ErrBackendUnexpectedJSONRPC
}
return nil, ErrBackendBadResponse
} }
return nil, ErrBackendBadResponse
} }
if len(rpcReqs) != len(res) { if len(rpcReqs) != len(res) {

@ -35,8 +35,12 @@ func SingleResponseHandler(code int, response string) http.HandlerFunc {
} }
func BatchedResponseHandler(code int, responses ...string) http.HandlerFunc { func BatchedResponseHandler(code int, responses ...string) http.HandlerFunc {
// all proxyd upstream requests are batched
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
if len(responses) == 1 {
SingleResponseHandler(code, responses[0])(w, r)
return
}
var body string var body string
body += "[" body += "["
for i, response := range responses { for i, response := range responses {

@ -44,11 +44,14 @@ func TestConcurrentWSPanic(t *testing.T) {
<-readyCh <-readyCh
var wg sync.WaitGroup
wg.Add(2)
// spam messages // spam messages
go func() { go func() {
for { for {
select { select {
case <-quitC: case <-quitC:
wg.Done()
return return
default: default:
_ = backendToProxyConn.WriteMessage(websocket.TextMessage, []byte("garbage")) _ = backendToProxyConn.WriteMessage(websocket.TextMessage, []byte("garbage"))
@ -61,6 +64,7 @@ func TestConcurrentWSPanic(t *testing.T) {
for { for {
select { select {
case <-quitC: case <-quitC:
wg.Done()
return return
default: default:
_ = client.WriteMessage(websocket.TextMessage, []byte("{\"id\": 1, \"method\": \"eth_foo\", \"params\": [\"newHeads\"]}")) _ = client.WriteMessage(websocket.TextMessage, []byte("{\"id\": 1, \"method\": \"eth_foo\", \"params\": [\"newHeads\"]}"))
@ -72,6 +76,7 @@ func TestConcurrentWSPanic(t *testing.T) {
// concurrent write to websocket connection // concurrent write to websocket connection
time.Sleep(time.Second) time.Sleep(time.Second)
close(quitC) close(quitC)
wg.Wait()
} }
type backendHandler struct { type backendHandler struct {