infra/proxyd/integration_tests/ws_test.go

242 lines
6.1 KiB
Go

package integration_tests
import (
"os"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/ethereum-optimism/infra/proxyd"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/require"
)
type backendHandler struct {
msgCB atomic.Value
closeCB atomic.Value
}
func (b *backendHandler) MsgCB(conn *websocket.Conn, msgType int, data []byte) {
cb := b.msgCB.Load()
if cb == nil {
return
}
cb.(MockWSBackendOnMessage)(conn, msgType, data)
}
func (b *backendHandler) SetMsgCB(cb MockWSBackendOnMessage) {
b.msgCB.Store(cb)
}
func (b *backendHandler) CloseCB(conn *websocket.Conn, err error) {
cb := b.closeCB.Load()
if cb == nil {
return
}
cb.(MockWSBackendOnClose)(conn, err)
}
func (b *backendHandler) SetCloseCB(cb MockWSBackendOnClose) {
b.closeCB.Store(cb)
}
type clientHandler struct {
msgCB atomic.Value
}
func (c *clientHandler) MsgCB(msgType int, data []byte) {
cb := c.msgCB.Load().(ProxydWSClientOnMessage)
if cb == nil {
return
}
cb(msgType, data)
}
func (c *clientHandler) SetMsgCB(cb ProxydWSClientOnMessage) {
c.msgCB.Store(cb)
}
func TestWS(t *testing.T) {
backendHdlr := new(backendHandler)
clientHdlr := new(clientHandler)
backend := NewMockWSBackend(nil, func(conn *websocket.Conn, msgType int, data []byte) {
backendHdlr.MsgCB(conn, msgType, data)
}, func(conn *websocket.Conn, err error) {
backendHdlr.CloseCB(conn, err)
})
defer backend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) {
clientHdlr.MsgCB(msgType, data)
}, nil)
defer client.HardClose()
require.NoError(t, err)
defer shutdown()
tests := []struct {
name string
backendRes string
expRes string
clientReq string
}{
{
"ok response",
"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0xcd0c3e8af590364c09d0fa6a1210faf5\"}",
"{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0xcd0c3e8af590364c09d0fa6a1210faf5\"}",
"{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"newHeads\"]}",
},
{
"garbage backend response",
"gibblegabble",
"{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32013,\"message\":\"backend returned an invalid response\"},\"id\":null}",
"{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"newHeads\"]}",
},
{
"blacklisted RPC",
"}",
"{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32601,\"message\":\"rpc method is not whitelisted\"},\"id\":1}",
"{\"id\": 1, \"method\": \"eth_whatever\", \"params\": []}",
},
{
"garbage client request",
"{}",
"{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32700,\"message\":\"parse error\"},\"id\":null}",
"barf",
},
{
"invalid client request",
"{}",
"{\"jsonrpc\":\"2.0\",\"error\":{\"code\":-32700,\"message\":\"parse error\"},\"id\":null}",
"{\"jsonrpc\": \"2.0\", \"method\": true}",
},
{
"eth_accounts",
"{}",
"{\"jsonrpc\":\"2.0\",\"result\":[],\"id\":1}",
"{\"jsonrpc\": \"2.0\", \"method\": \"eth_accounts\", \"id\": 1}",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
timeout := time.NewTicker(10 * time.Second)
doneCh := make(chan struct{}, 1)
backendHdlr.SetMsgCB(func(conn *websocket.Conn, msgType int, data []byte) {
require.NoError(t, conn.WriteMessage(websocket.TextMessage, []byte(tt.backendRes)))
})
clientHdlr.SetMsgCB(func(msgType int, data []byte) {
require.Equal(t, tt.expRes, string(data))
doneCh <- struct{}{}
})
require.NoError(t, client.WriteMessage(
websocket.TextMessage,
[]byte(tt.clientReq),
))
select {
case <-timeout.C:
t.Fatalf("timed out")
case <-doneCh:
return
}
})
}
}
func TestWSClientClosure(t *testing.T) {
backendHdlr := new(backendHandler)
clientHdlr := new(clientHandler)
backend := NewMockWSBackend(nil, func(conn *websocket.Conn, msgType int, data []byte) {
backendHdlr.MsgCB(conn, msgType, data)
}, func(conn *websocket.Conn, err error) {
backendHdlr.CloseCB(conn, err)
})
defer backend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
for _, closeType := range []string{"soft", "hard"} {
t.Run(closeType, func(t *testing.T) {
client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) {
clientHdlr.MsgCB(msgType, data)
}, nil)
require.NoError(t, err)
timeout := time.NewTicker(30 * time.Second)
doneCh := make(chan struct{}, 1)
backendHdlr.SetCloseCB(func(conn *websocket.Conn, err error) {
doneCh <- struct{}{}
})
if closeType == "soft" {
require.NoError(t, client.SoftClose())
} else {
client.HardClose()
}
select {
case <-timeout.C:
t.Fatalf("timed out")
case <-doneCh:
return
}
})
}
}
func TestWSClientExceedReadLimit(t *testing.T) {
backendHdlr := new(backendHandler)
clientHdlr := new(clientHandler)
backend := NewMockWSBackend(nil, func(conn *websocket.Conn, msgType int, data []byte) {
backendHdlr.MsgCB(conn, msgType, data)
}, func(conn *websocket.Conn, err error) {
backendHdlr.CloseCB(conn, err)
})
defer backend.Close()
require.NoError(t, os.Setenv("GOOD_BACKEND_RPC_URL", backend.URL()))
config := ReadConfig("ws")
_, shutdown, err := proxyd.Start(config)
require.NoError(t, err)
defer shutdown()
client, err := NewProxydWSClient("ws://127.0.0.1:8546", func(msgType int, data []byte) {
clientHdlr.MsgCB(msgType, data)
}, nil)
require.NoError(t, err)
closed := false
originalHandler := client.conn.CloseHandler()
client.conn.SetCloseHandler(func(code int, text string) error {
closed = true
return originalHandler(code, text)
})
backendHdlr.SetMsgCB(func(conn *websocket.Conn, msgType int, data []byte) {
t.Fatalf("backend should not get the large message")
})
payload := strings.Repeat("barf", 1024*1024)
clientReq := "{\"id\": 1, \"method\": \"eth_subscribe\", \"params\": [\"" + payload + "\"]}"
err = client.WriteMessage(
websocket.TextMessage,
[]byte(clientReq),
)
require.Error(t, err)
require.True(t, closed)
}