Compare commits
2 Commits
ca4e4b24e0
...
9555e15352
Author | SHA1 | Date | |
---|---|---|---|
9555e15352 | |||
75ef474167 |
@ -6,7 +6,7 @@ ARG GITVERSION=docker
|
|||||||
|
|
||||||
RUN apk add make jq git gcc musl-dev linux-headers
|
RUN apk add make jq git gcc musl-dev linux-headers
|
||||||
|
|
||||||
COPY ./proxyd /app
|
COPY . /app
|
||||||
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
@ -16,7 +16,7 @@ FROM alpine:3.18
|
|||||||
|
|
||||||
RUN apk add bind-tools jq curl bash git redis
|
RUN apk add bind-tools jq curl bash git redis
|
||||||
|
|
||||||
COPY ./proxyd/entrypoint.sh /bin/entrypoint.sh
|
COPY ./entrypoint.sh /bin/entrypoint.sh
|
||||||
|
|
||||||
RUN apk update && \
|
RUN apk update && \
|
||||||
apk add ca-certificates && \
|
apk add ca-certificates && \
|
||||||
@ -24,9 +24,11 @@ RUN apk update && \
|
|||||||
|
|
||||||
EXPOSE 8080
|
EXPOSE 8080
|
||||||
|
|
||||||
VOLUME /etc/proxyd
|
# VOLUME /etc/proxyd
|
||||||
|
|
||||||
|
ADD ./docker.toml /proxyd.toml
|
||||||
|
|
||||||
COPY --from=builder /app/bin/proxyd /bin/proxyd
|
COPY --from=builder /app/bin/proxyd /bin/proxyd
|
||||||
|
|
||||||
ENTRYPOINT ["/bin/entrypoint.sh"]
|
ENTRYPOINT ["/bin/entrypoint.sh"]
|
||||||
CMD ["/bin/proxyd", "/etc/proxyd/proxyd.toml"]
|
CMD ["/bin/proxyd", "/proxyd.toml"]
|
@ -13,12 +13,16 @@ import (
|
|||||||
type ServerConfig struct {
|
type ServerConfig struct {
|
||||||
RPCHost string `toml:"rpc_host"`
|
RPCHost string `toml:"rpc_host"`
|
||||||
RPCPort int `toml:"rpc_port"`
|
RPCPort int `toml:"rpc_port"`
|
||||||
|
EnableWS bool `toml:"enable_ws"`
|
||||||
WSHost string `toml:"ws_host"`
|
WSHost string `toml:"ws_host"`
|
||||||
WSPort int `toml:"ws_port"`
|
WSPort int `toml:"ws_port"`
|
||||||
MaxBodySizeBytes int64 `toml:"max_body_size_bytes"`
|
MaxBodySizeBytes int64 `toml:"max_body_size_bytes"`
|
||||||
MaxConcurrentRPCs int64 `toml:"max_concurrent_rpcs"`
|
MaxConcurrentRPCs int64 `toml:"max_concurrent_rpcs"`
|
||||||
LogLevel string `toml:"log_level"`
|
LogLevel string `toml:"log_level"`
|
||||||
|
|
||||||
|
// Allow direct client connection without x_forwarded_for header for local tests
|
||||||
|
AllowDirect bool `toml:"allow_direct"`
|
||||||
|
|
||||||
// TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections
|
// TimeoutSeconds specifies the maximum time spent serving an HTTP request. Note that isn't used for websocket connections
|
||||||
TimeoutSeconds int `toml:"timeout_seconds"`
|
TimeoutSeconds int `toml:"timeout_seconds"`
|
||||||
|
|
||||||
|
20
proxyd/docker-compose.yml
Normal file
20
proxyd/docker-compose.yml
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
services:
|
||||||
|
redis:
|
||||||
|
container_name: redis
|
||||||
|
image: redis:6.2-alpine
|
||||||
|
restart: always
|
||||||
|
#networks:
|
||||||
|
# - tornado_net
|
||||||
|
command: ["redis-server"]
|
||||||
|
proxyd:
|
||||||
|
container_name: proxyd
|
||||||
|
image: proxyd
|
||||||
|
restart: always
|
||||||
|
#networks:
|
||||||
|
# - tornado_net
|
||||||
|
environment:
|
||||||
|
- REDIS_URL=redis://redis:6379
|
||||||
|
- RPC_URL=
|
||||||
|
- WS_URL=
|
||||||
|
ports:
|
||||||
|
- '127.0.0.1:8544:8544'
|
171
proxyd/docker.toml
Normal file
171
proxyd/docker.toml
Normal file
@ -0,0 +1,171 @@
|
|||||||
|
ws_method_whitelist = [
|
||||||
|
"eth_subscribe",
|
||||||
|
"eth_unsubscribe",
|
||||||
|
"eth_blobBaseFee",
|
||||||
|
"eth_blockNumber",
|
||||||
|
"eth_call",
|
||||||
|
"eth_chainId",
|
||||||
|
"eth_estimateGas",
|
||||||
|
"eth_feeHistory",
|
||||||
|
"eth_gasPrice",
|
||||||
|
"eth_getAccount",
|
||||||
|
"eth_getBalance",
|
||||||
|
"eth_getBlockByHash",
|
||||||
|
"eth_getBlockByNumber",
|
||||||
|
"eth_getBlockReceipts",
|
||||||
|
"eth_getBlockTransactionCountByHash",
|
||||||
|
"eth_getBlockTransactionCountByNumber",
|
||||||
|
"eth_getCode",
|
||||||
|
"eth_getFilterChanges",
|
||||||
|
"eth_getFilterLogs",
|
||||||
|
"eth_getLogs",
|
||||||
|
"eth_getProof",
|
||||||
|
"eth_getStorageAt",
|
||||||
|
"eth_getTransactionByBlockHashAndIndex",
|
||||||
|
"eth_getTransactionByBlockNumberAndIndex",
|
||||||
|
"eth_getTransactionByHash",
|
||||||
|
"eth_getTransactionCount",
|
||||||
|
"eth_getTransactionReceipt",
|
||||||
|
"eth_getUncleCountByBlockHash",
|
||||||
|
"eth_getUncleCountByBlockNumber",
|
||||||
|
"eth_maxPriorityFeePerGas",
|
||||||
|
"eth_newBlockFilter",
|
||||||
|
"eth_newFilter",
|
||||||
|
"eth_newPendingTransactionFilter",
|
||||||
|
"eth_syncing",
|
||||||
|
"eth_uninstallFilter",
|
||||||
|
"eth_sendRawTransaction",
|
||||||
|
"net_version",
|
||||||
|
"web3_clientVersion",
|
||||||
|
"web3_sha3",
|
||||||
|
# tracers for archive nodes
|
||||||
|
"trace_block",
|
||||||
|
"trace_call",
|
||||||
|
"trace_callMany",
|
||||||
|
"trace_filter",
|
||||||
|
"trace_rawTransaction",
|
||||||
|
"trace_replayBlockTransactions",
|
||||||
|
"trace_replayTransaction",
|
||||||
|
"trace_transaction",
|
||||||
|
"debug_getBadBlocks",
|
||||||
|
"debug_storageRangeAt",
|
||||||
|
"debug_getTrieFlushInterval",
|
||||||
|
"debug_traceBlock",
|
||||||
|
"debug_traceBlockByHash",
|
||||||
|
"debug_traceBlockByNumber",
|
||||||
|
"debug_traceCall",
|
||||||
|
"debug_traceTransaction",
|
||||||
|
]
|
||||||
|
ws_backend_group = "main"
|
||||||
|
|
||||||
|
[rpc_method_mappings]
|
||||||
|
eth_blobBaseFee = "main"
|
||||||
|
eth_blockNumber = "main"
|
||||||
|
eth_call = "main"
|
||||||
|
eth_chainId = "main"
|
||||||
|
eth_estimateGas = "main"
|
||||||
|
eth_feeHistory = "main"
|
||||||
|
eth_gasPrice = "main"
|
||||||
|
eth_getAccount = "main"
|
||||||
|
eth_getBalance = "main"
|
||||||
|
eth_getBlockByHash = "main"
|
||||||
|
eth_getBlockByNumber = "main"
|
||||||
|
eth_getBlockReceipts = "main"
|
||||||
|
eth_getBlockTransactionCountByHash = "main"
|
||||||
|
eth_getBlockTransactionCountByNumber = "main"
|
||||||
|
eth_getCode = "main"
|
||||||
|
eth_getFilterChanges = "main"
|
||||||
|
eth_getFilterLogs = "main"
|
||||||
|
eth_getLogs = "main"
|
||||||
|
eth_getProof = "main"
|
||||||
|
eth_getStorageAt = "main"
|
||||||
|
eth_getTransactionByBlockHashAndIndex = "main"
|
||||||
|
eth_getTransactionByBlockNumberAndIndex = "main"
|
||||||
|
eth_getTransactionByHash = "main"
|
||||||
|
eth_getTransactionCount = "main"
|
||||||
|
eth_getTransactionReceipt = "main"
|
||||||
|
eth_getUncleCountByBlockHash = "main"
|
||||||
|
eth_getUncleCountByBlockNumber = "main"
|
||||||
|
eth_maxPriorityFeePerGas = "main"
|
||||||
|
eth_newBlockFilter = "main"
|
||||||
|
eth_newFilter = "main"
|
||||||
|
eth_newPendingTransactionFilter = "main"
|
||||||
|
eth_syncing = "main"
|
||||||
|
eth_uninstallFilter = "main"
|
||||||
|
eth_sendRawTransaction = "main"
|
||||||
|
net_version = "main"
|
||||||
|
web3_clientVersion = "main"
|
||||||
|
web3_sha3 = "main"
|
||||||
|
trace_block = "main"
|
||||||
|
trace_call = "main"
|
||||||
|
trace_callMany = "main"
|
||||||
|
trace_filter = "main"
|
||||||
|
trace_rawTransaction = "main"
|
||||||
|
trace_replayBlockTransactions = "main"
|
||||||
|
trace_replayTransaction = "main"
|
||||||
|
trace_transaction = "main"
|
||||||
|
debug_getBadBlocks = "main"
|
||||||
|
debug_storageRangeAt = "main"
|
||||||
|
debug_getTrieFlushInterval = "main"
|
||||||
|
debug_traceBlock = "main"
|
||||||
|
debug_traceBlockByHash = "main"
|
||||||
|
debug_traceBlockByNumber = "main"
|
||||||
|
debug_traceCall = "main"
|
||||||
|
debug_traceTransaction = "main"
|
||||||
|
|
||||||
|
[server]
|
||||||
|
rpc_host = "0.0.0.0"
|
||||||
|
rpc_port = 8544
|
||||||
|
enable_ws = true
|
||||||
|
max_body_size_bytes = 10485760
|
||||||
|
max_concurrent_rpcs = 1000
|
||||||
|
log_level = "info"
|
||||||
|
allow_direct = true
|
||||||
|
|
||||||
|
[cache]
|
||||||
|
enabled = true
|
||||||
|
ttl = "14s"
|
||||||
|
|
||||||
|
[redis]
|
||||||
|
url = "$REDIS_URL"
|
||||||
|
|
||||||
|
[metrics]
|
||||||
|
enabled = true
|
||||||
|
host = "0.0.0.0"
|
||||||
|
port = 9761
|
||||||
|
|
||||||
|
[rate_limit]
|
||||||
|
use_redis = true
|
||||||
|
base_rate = 2000
|
||||||
|
base_interval = "60s"
|
||||||
|
|
||||||
|
[backend]
|
||||||
|
response_timeout_seconds = 120
|
||||||
|
max_response_size_bytes = 5242880
|
||||||
|
max_retries = 3
|
||||||
|
out_of_service_seconds = 600
|
||||||
|
max_latency_threshold = "30s"
|
||||||
|
max_degraded_latency_threshold = "10s"
|
||||||
|
max_error_rate_threshold = 0.3
|
||||||
|
|
||||||
|
[backends]
|
||||||
|
[backends.main]
|
||||||
|
rpc_url = "$RPC_URL"
|
||||||
|
ws_url = "$WS_URL"
|
||||||
|
max_rps = 100
|
||||||
|
max_ws_conns = 100
|
||||||
|
consensus_skip_peer_count = true
|
||||||
|
consensus_receipts_target = "eth_getBlockReceipts"
|
||||||
|
|
||||||
|
[backend_groups]
|
||||||
|
[backend_groups.main]
|
||||||
|
backends = ["main"]
|
||||||
|
consensus_aware = false
|
||||||
|
# consensus_aware = true
|
||||||
|
consensus_ban_period = "1m"
|
||||||
|
consensus_max_update_threshold = "20s"
|
||||||
|
consensus_max_block_lag = 16
|
||||||
|
# Maximum block range (for eth_getLogs method), no default
|
||||||
|
# consensus_max_block_range = 20000
|
||||||
|
# Minimum peer count, default 3
|
||||||
|
# consensus_min_peer_count = 4
|
@ -328,7 +328,7 @@ func Start(config *Config) (*Server, func(), error) {
|
|||||||
|
|
||||||
if config.Server.RPCPort != 0 {
|
if config.Server.RPCPort != 0 {
|
||||||
go func() {
|
go func() {
|
||||||
if err := srv.RPCListenAndServe(config.Server.RPCHost, config.Server.RPCPort); err != nil {
|
if err := srv.RPCListenAndServe(config.Server); err != nil {
|
||||||
if errors.Is(err, http.ErrServerClosed) {
|
if errors.Is(err, http.ErrServerClosed) {
|
||||||
log.Info("RPC server shut down")
|
log.Info("RPC server shut down")
|
||||||
return
|
return
|
||||||
@ -348,7 +348,7 @@ func Start(config *Config) (*Server, func(), error) {
|
|||||||
log.Crit("error starting WS server", "err", err)
|
log.Crit("error starting WS server", "err", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
} else {
|
} else if !config.Server.EnableWS {
|
||||||
log.Info("WS server not enabled (ws_port is set to 0)")
|
log.Info("WS server not enabled (ws_port is set to 0)")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,7 +64,9 @@ func RewriteRequest(rctx RewriteContext, req *RPCReq, res *RPCRes) (RewriteResul
|
|||||||
switch req.Method {
|
switch req.Method {
|
||||||
case "eth_getLogs",
|
case "eth_getLogs",
|
||||||
"eth_newFilter":
|
"eth_newFilter":
|
||||||
return rewriteRange(rctx, req, res, 0)
|
// return rewriteRange(rctx, req, res, 0)
|
||||||
|
// Tornado: disable range check unti the UI is fixed
|
||||||
|
return RewriteNone, nil
|
||||||
case "debug_getRawReceipts", "consensus_getReceipts":
|
case "debug_getRawReceipts", "consensus_getReceipts":
|
||||||
return rewriteParam(rctx, req, res, 0, true, false)
|
return rewriteParam(rctx, req, res, 0, true, false)
|
||||||
case "eth_getBalance",
|
case "eth_getBalance",
|
||||||
@ -307,4 +309,4 @@ func rewriteTagBlockNumberOrHash(rctx RewriteContext, current *rpc.BlockNumberOr
|
|||||||
}
|
}
|
||||||
|
|
||||||
return current, false, nil
|
return current, false, nil
|
||||||
}
|
}
|
@ -200,12 +200,23 @@ func NewServer(
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) RPCListenAndServe(host string, port int) error {
|
func (s *Server) RPCListenAndServe(serverConfig ServerConfig) error {
|
||||||
|
host := serverConfig.RPCHost
|
||||||
|
port := serverConfig.RPCPort
|
||||||
|
enableWS := serverConfig.EnableWS
|
||||||
|
|
||||||
|
var handleRpc ReqHandle = s.GetRPCHandle(serverConfig)
|
||||||
|
|
||||||
s.srvMu.Lock()
|
s.srvMu.Lock()
|
||||||
hdlr := mux.NewRouter()
|
hdlr := mux.NewRouter()
|
||||||
hdlr.HandleFunc("/healthz", s.HandleHealthz).Methods("GET")
|
hdlr.HandleFunc("/healthz", s.HandleHealthz).Methods("GET")
|
||||||
hdlr.HandleFunc("/", s.HandleRPC).Methods("POST")
|
hdlr.HandleFunc("/", handleRpc).Methods("POST")
|
||||||
hdlr.HandleFunc("/{authorization}", s.HandleRPC).Methods("POST")
|
hdlr.HandleFunc("/{authorization}", handleRpc).Methods("POST")
|
||||||
|
if enableWS {
|
||||||
|
var handleWS ReqHandle = s.GetWSHandle(true)
|
||||||
|
hdlr.HandleFunc("/", handleWS)
|
||||||
|
hdlr.HandleFunc("/{authorization}", handleWS)
|
||||||
|
}
|
||||||
c := cors.New(cors.Options{
|
c := cors.New(cors.Options{
|
||||||
AllowedOrigins: []string{"*"},
|
AllowedOrigins: []string{"*"},
|
||||||
})
|
})
|
||||||
@ -215,15 +226,20 @@ func (s *Server) RPCListenAndServe(host string, port int) error {
|
|||||||
Addr: addr,
|
Addr: addr,
|
||||||
}
|
}
|
||||||
log.Info("starting HTTP server", "addr", addr)
|
log.Info("starting HTTP server", "addr", addr)
|
||||||
|
if enableWS {
|
||||||
|
log.Info("starting WS server", "addr", addr)
|
||||||
|
}
|
||||||
s.srvMu.Unlock()
|
s.srvMu.Unlock()
|
||||||
return s.rpcServer.ListenAndServe()
|
return s.rpcServer.ListenAndServe()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) WSListenAndServe(host string, port int) error {
|
func (s *Server) WSListenAndServe(host string, port int) error {
|
||||||
s.srvMu.Lock()
|
s.srvMu.Lock()
|
||||||
|
var handleWS ReqHandle = s.GetWSHandle(false)
|
||||||
|
|
||||||
hdlr := mux.NewRouter()
|
hdlr := mux.NewRouter()
|
||||||
hdlr.HandleFunc("/", s.HandleWS)
|
hdlr.HandleFunc("/", handleWS)
|
||||||
hdlr.HandleFunc("/{authorization}", s.HandleWS)
|
hdlr.HandleFunc("/{authorization}", handleWS)
|
||||||
c := cors.New(cors.Options{
|
c := cors.New(cors.Options{
|
||||||
AllowedOrigins: []string{"*"},
|
AllowedOrigins: []string{"*"},
|
||||||
})
|
})
|
||||||
@ -255,7 +271,15 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
|
|||||||
_, _ = w.Write([]byte("OK"))
|
_, _ = w.Write([]byte("OK"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
type ReqHandle func(http.ResponseWriter, *http.Request)
|
||||||
|
|
||||||
|
func (s *Server) GetRPCHandle(serverConfig ServerConfig) ReqHandle {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.HandleRPC(w, r, serverConfig)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request, serverConfig ServerConfig) {
|
||||||
ctx := s.populateContext(w, r)
|
ctx := s.populateContext(w, r)
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
return
|
return
|
||||||
@ -272,8 +296,13 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|||||||
isUnlimitedUserAgent := s.isUnlimitedUserAgent(userAgent)
|
isUnlimitedUserAgent := s.isUnlimitedUserAgent(userAgent)
|
||||||
|
|
||||||
if xff == "" {
|
if xff == "" {
|
||||||
writeRPCError(ctx, w, nil, ErrInvalidRequest("request does not include a remote IP"))
|
// Just use remote addr from socket when the request doesn't have x_forwarded_for header
|
||||||
return
|
if (serverConfig.AllowDirect) {
|
||||||
|
xff = r.RemoteAddr
|
||||||
|
} else {
|
||||||
|
writeRPCError(ctx, w, nil, ErrInvalidRequest("request does not include a remote IP"))
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
isLimited := func(method string) bool {
|
isLimited := func(method string) bool {
|
||||||
@ -354,7 +383,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(ctx, reqs, isLimited, true)
|
batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(xff, r, ctx, reqs, isLimited, true)
|
||||||
if err == context.DeadlineExceeded {
|
if err == context.DeadlineExceeded {
|
||||||
writeRPCError(ctx, w, nil, ErrGatewayTimeout)
|
writeRPCError(ctx, w, nil, ErrGatewayTimeout)
|
||||||
return
|
return
|
||||||
@ -377,7 +406,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rawBody := json.RawMessage(body)
|
rawBody := json.RawMessage(body)
|
||||||
backendRes, cached, servedBy, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false)
|
backendRes, cached, servedBy, err := s.handleBatchRPC(xff, r, ctx, []json.RawMessage{rawBody}, isLimited, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
|
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
|
||||||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
|
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
|
||||||
@ -394,7 +423,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeRPCRes(ctx, w, backendRes[0])
|
writeRPCRes(ctx, w, backendRes[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) {
|
func (s *Server) handleBatchRPC(xff string, r *http.Request, ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) {
|
||||||
// A request set is transformed into groups of batches.
|
// A request set is transformed into groups of batches.
|
||||||
// Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints)
|
// Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints)
|
||||||
// A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's
|
// A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's
|
||||||
@ -406,6 +435,10 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|||||||
backendGroup string
|
backendGroup string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Retrieve info from header
|
||||||
|
origin := r.Header.Get("Origin")
|
||||||
|
userAgent := r.Header.Get("User-Agent")
|
||||||
|
|
||||||
responses := make([]*RPCRes, len(reqs))
|
responses := make([]*RPCRes, len(reqs))
|
||||||
batches := make(map[batchGroup][]batchElem)
|
batches := make(map[batchGroup][]batchElem)
|
||||||
ids := make(map[string]int, len(reqs))
|
ids := make(map[string]int, len(reqs))
|
||||||
@ -418,6 +451,15 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug(
|
||||||
|
"received RPC method",
|
||||||
|
"req_id", GetReqID(ctx),
|
||||||
|
"method", parsedReq.Method,
|
||||||
|
"user_agent", userAgent,
|
||||||
|
"origin", origin,
|
||||||
|
"remote_ip", xff,
|
||||||
|
)
|
||||||
|
|
||||||
// Simple health check
|
// Simple health check
|
||||||
if len(reqs) == 1 && parsedReq.Method == proxydHealthzMethod {
|
if len(reqs) == 1 && parsedReq.Method == proxydHealthzMethod {
|
||||||
res := &RPCRes{
|
res := &RPCRes{
|
||||||
@ -449,6 +491,9 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|||||||
"source", "rpc",
|
"source", "rpc",
|
||||||
"req_id", GetReqID(ctx),
|
"req_id", GetReqID(ctx),
|
||||||
"method", parsedReq.Method,
|
"method", parsedReq.Method,
|
||||||
|
"user_agent", userAgent,
|
||||||
|
"origin", origin,
|
||||||
|
"remote_ip", xff,
|
||||||
)
|
)
|
||||||
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrMethodNotWhitelisted)
|
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrMethodNotWhitelisted)
|
||||||
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrMethodNotWhitelisted)
|
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrMethodNotWhitelisted)
|
||||||
@ -475,6 +520,9 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|||||||
"source", "rpc",
|
"source", "rpc",
|
||||||
"req_id", GetReqID(ctx),
|
"req_id", GetReqID(ctx),
|
||||||
"method", parsedReq.Method,
|
"method", parsedReq.Method,
|
||||||
|
"user_agent", userAgent,
|
||||||
|
"origin", origin,
|
||||||
|
"remote_ip", xff,
|
||||||
)
|
)
|
||||||
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit)
|
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit)
|
||||||
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit)
|
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit)
|
||||||
@ -579,12 +627,31 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|||||||
return responses, cached, servedByString, nil
|
return responses, cached, servedByString, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) GetWSHandle(fromRpc bool) ReqHandle {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.HandleWS(w, r, fromRpc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request, fromRpc bool) {
|
||||||
ctx := s.populateContext(w, r)
|
ctx := s.populateContext(w, r)
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle upgrade header request
|
||||||
|
upgrade := false
|
||||||
|
for _, header := range r.Header["Upgrade"] {
|
||||||
|
if header == "websocket" {
|
||||||
|
upgrade = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Filter out non websocket requests
|
||||||
|
if fromRpc && !upgrade {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("received WS connection", "req_id", GetReqID(ctx))
|
log.Info("received WS connection", "req_id", GetReqID(ctx))
|
||||||
|
|
||||||
clientConn, err := s.upgrader.Upgrade(w, r, nil)
|
clientConn, err := s.upgrader.Upgrade(w, r, nil)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user