|
|
@ -200,12 +200,23 @@ func NewServer(
|
|
|
|
}, nil
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) RPCListenAndServe(host string, port int) error {
|
|
|
|
func (s *Server) RPCListenAndServe(serverConfig ServerConfig) error {
|
|
|
|
|
|
|
|
host := serverConfig.RPCHost
|
|
|
|
|
|
|
|
port := serverConfig.RPCPort
|
|
|
|
|
|
|
|
enableWS := serverConfig.EnableWS
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var handleRpc ReqHandle = s.GetRPCHandle(serverConfig)
|
|
|
|
|
|
|
|
|
|
|
|
s.srvMu.Lock()
|
|
|
|
s.srvMu.Lock()
|
|
|
|
hdlr := mux.NewRouter()
|
|
|
|
hdlr := mux.NewRouter()
|
|
|
|
hdlr.HandleFunc("/healthz", s.HandleHealthz).Methods("GET")
|
|
|
|
hdlr.HandleFunc("/healthz", s.HandleHealthz).Methods("GET")
|
|
|
|
hdlr.HandleFunc("/", s.HandleRPC).Methods("POST")
|
|
|
|
hdlr.HandleFunc("/", handleRpc).Methods("POST")
|
|
|
|
hdlr.HandleFunc("/{authorization}", s.HandleRPC).Methods("POST")
|
|
|
|
hdlr.HandleFunc("/{authorization}", handleRpc).Methods("POST")
|
|
|
|
|
|
|
|
if enableWS {
|
|
|
|
|
|
|
|
var handleWS ReqHandle = s.GetWSHandle(true)
|
|
|
|
|
|
|
|
hdlr.HandleFunc("/", handleWS)
|
|
|
|
|
|
|
|
hdlr.HandleFunc("/{authorization}", handleWS)
|
|
|
|
|
|
|
|
}
|
|
|
|
c := cors.New(cors.Options{
|
|
|
|
c := cors.New(cors.Options{
|
|
|
|
AllowedOrigins: []string{"*"},
|
|
|
|
AllowedOrigins: []string{"*"},
|
|
|
|
})
|
|
|
|
})
|
|
|
@ -215,15 +226,20 @@ func (s *Server) RPCListenAndServe(host string, port int) error {
|
|
|
|
Addr: addr,
|
|
|
|
Addr: addr,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.Info("starting HTTP server", "addr", addr)
|
|
|
|
log.Info("starting HTTP server", "addr", addr)
|
|
|
|
|
|
|
|
if enableWS {
|
|
|
|
|
|
|
|
log.Info("starting WS server", "addr", addr)
|
|
|
|
|
|
|
|
}
|
|
|
|
s.srvMu.Unlock()
|
|
|
|
s.srvMu.Unlock()
|
|
|
|
return s.rpcServer.ListenAndServe()
|
|
|
|
return s.rpcServer.ListenAndServe()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) WSListenAndServe(host string, port int) error {
|
|
|
|
func (s *Server) WSListenAndServe(host string, port int) error {
|
|
|
|
s.srvMu.Lock()
|
|
|
|
s.srvMu.Lock()
|
|
|
|
|
|
|
|
var handleWS ReqHandle = s.GetWSHandle(false)
|
|
|
|
|
|
|
|
|
|
|
|
hdlr := mux.NewRouter()
|
|
|
|
hdlr := mux.NewRouter()
|
|
|
|
hdlr.HandleFunc("/", s.HandleWS)
|
|
|
|
hdlr.HandleFunc("/", handleWS)
|
|
|
|
hdlr.HandleFunc("/{authorization}", s.HandleWS)
|
|
|
|
hdlr.HandleFunc("/{authorization}", handleWS)
|
|
|
|
c := cors.New(cors.Options{
|
|
|
|
c := cors.New(cors.Options{
|
|
|
|
AllowedOrigins: []string{"*"},
|
|
|
|
AllowedOrigins: []string{"*"},
|
|
|
|
})
|
|
|
|
})
|
|
|
@ -255,7 +271,15 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
|
|
|
|
_, _ = w.Write([]byte("OK"))
|
|
|
|
_, _ = w.Write([]byte("OK"))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|
|
|
type ReqHandle func(http.ResponseWriter, *http.Request)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) GetRPCHandle(serverConfig ServerConfig) ReqHandle {
|
|
|
|
|
|
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
|
|
|
s.HandleRPC(w, r, serverConfig)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request, serverConfig ServerConfig) {
|
|
|
|
ctx := s.populateContext(w, r)
|
|
|
|
ctx := s.populateContext(w, r)
|
|
|
|
if ctx == nil {
|
|
|
|
if ctx == nil {
|
|
|
|
return
|
|
|
|
return
|
|
|
@ -272,9 +296,14 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|
|
|
isUnlimitedUserAgent := s.isUnlimitedUserAgent(userAgent)
|
|
|
|
isUnlimitedUserAgent := s.isUnlimitedUserAgent(userAgent)
|
|
|
|
|
|
|
|
|
|
|
|
if xff == "" {
|
|
|
|
if xff == "" {
|
|
|
|
|
|
|
|
// Just use remote addr from socket when the request doesn't have x_forwarded_for header
|
|
|
|
|
|
|
|
if (serverConfig.AllowDirect) {
|
|
|
|
|
|
|
|
xff = r.RemoteAddr
|
|
|
|
|
|
|
|
} else {
|
|
|
|
writeRPCError(ctx, w, nil, ErrInvalidRequest("request does not include a remote IP"))
|
|
|
|
writeRPCError(ctx, w, nil, ErrInvalidRequest("request does not include a remote IP"))
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
isLimited := func(method string) bool {
|
|
|
|
isLimited := func(method string) bool {
|
|
|
|
isGloballyLimitedMethod := s.isGlobalLimit(method)
|
|
|
|
isGloballyLimitedMethod := s.isGlobalLimit(method)
|
|
|
@ -368,7 +397,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(ctx, reqs, isLimited, true)
|
|
|
|
batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(xff, r, ctx, reqs, isLimited, true)
|
|
|
|
if err == context.DeadlineExceeded {
|
|
|
|
if err == context.DeadlineExceeded {
|
|
|
|
writeRPCError(ctx, w, nil, ErrGatewayTimeout)
|
|
|
|
writeRPCError(ctx, w, nil, ErrGatewayTimeout)
|
|
|
|
return
|
|
|
|
return
|
|
|
@ -391,7 +420,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
rawBody := json.RawMessage(body)
|
|
|
|
rawBody := json.RawMessage(body)
|
|
|
|
backendRes, cached, servedBy, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false)
|
|
|
|
backendRes, cached, servedBy, err := s.handleBatchRPC(xff, r, ctx, []json.RawMessage{rawBody}, isLimited, false)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
|
|
|
|
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
|
|
|
|
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
|
|
|
|
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
|
|
|
@ -408,7 +437,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
|
|
|
writeRPCRes(ctx, w, backendRes[0])
|
|
|
|
writeRPCRes(ctx, w, backendRes[0])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) {
|
|
|
|
func (s *Server) handleBatchRPC(xff string, r *http.Request, ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) {
|
|
|
|
// A request set is transformed into groups of batches.
|
|
|
|
// A request set is transformed into groups of batches.
|
|
|
|
// Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints)
|
|
|
|
// Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints)
|
|
|
|
// A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's
|
|
|
|
// A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's
|
|
|
@ -420,6 +449,10 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|
|
|
backendGroup string
|
|
|
|
backendGroup string
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Retrieve info from header
|
|
|
|
|
|
|
|
origin := r.Header.Get("Origin")
|
|
|
|
|
|
|
|
userAgent := r.Header.Get("User-Agent")
|
|
|
|
|
|
|
|
|
|
|
|
responses := make([]*RPCRes, len(reqs))
|
|
|
|
responses := make([]*RPCRes, len(reqs))
|
|
|
|
batches := make(map[batchGroup][]batchElem)
|
|
|
|
batches := make(map[batchGroup][]batchElem)
|
|
|
|
ids := make(map[string]int, len(reqs))
|
|
|
|
ids := make(map[string]int, len(reqs))
|
|
|
@ -432,6 +465,15 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
log.Debug(
|
|
|
|
|
|
|
|
"received RPC method",
|
|
|
|
|
|
|
|
"req_id", GetReqID(ctx),
|
|
|
|
|
|
|
|
"method", parsedReq.Method,
|
|
|
|
|
|
|
|
"user_agent", userAgent,
|
|
|
|
|
|
|
|
"origin", origin,
|
|
|
|
|
|
|
|
"remote_ip", xff,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
// Simple health check
|
|
|
|
// Simple health check
|
|
|
|
if len(reqs) == 1 && parsedReq.Method == proxydHealthzMethod {
|
|
|
|
if len(reqs) == 1 && parsedReq.Method == proxydHealthzMethod {
|
|
|
|
res := &RPCRes{
|
|
|
|
res := &RPCRes{
|
|
|
@ -463,6 +505,9 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|
|
|
"source", "rpc",
|
|
|
|
"source", "rpc",
|
|
|
|
"req_id", GetReqID(ctx),
|
|
|
|
"req_id", GetReqID(ctx),
|
|
|
|
"method", parsedReq.Method,
|
|
|
|
"method", parsedReq.Method,
|
|
|
|
|
|
|
|
"user_agent", userAgent,
|
|
|
|
|
|
|
|
"origin", origin,
|
|
|
|
|
|
|
|
"remote_ip", xff,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrMethodNotWhitelisted)
|
|
|
|
RecordRPCError(ctx, BackendProxyd, MethodUnknown, ErrMethodNotWhitelisted)
|
|
|
|
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrMethodNotWhitelisted)
|
|
|
|
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrMethodNotWhitelisted)
|
|
|
@ -479,6 +524,9 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|
|
|
"source", "rpc",
|
|
|
|
"source", "rpc",
|
|
|
|
"req_id", GetReqID(ctx),
|
|
|
|
"req_id", GetReqID(ctx),
|
|
|
|
"method", parsedReq.Method,
|
|
|
|
"method", parsedReq.Method,
|
|
|
|
|
|
|
|
"user_agent", userAgent,
|
|
|
|
|
|
|
|
"origin", origin,
|
|
|
|
|
|
|
|
"remote_ip", xff,
|
|
|
|
)
|
|
|
|
)
|
|
|
|
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit)
|
|
|
|
RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrOverRateLimit)
|
|
|
|
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit)
|
|
|
|
responses[i] = NewRPCErrorRes(parsedReq.ID, ErrOverRateLimit)
|
|
|
@ -583,12 +631,31 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
|
|
|
return responses, cached, servedByString, nil
|
|
|
|
return responses, cached, servedByString, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
|
|
|
|
func (s *Server) GetWSHandle(fromRpc bool) ReqHandle {
|
|
|
|
|
|
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
|
|
|
s.HandleWS(w, r, fromRpc)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request, fromRpc bool) {
|
|
|
|
ctx := s.populateContext(w, r)
|
|
|
|
ctx := s.populateContext(w, r)
|
|
|
|
if ctx == nil {
|
|
|
|
if ctx == nil {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Handle upgrade header request
|
|
|
|
|
|
|
|
upgrade := false
|
|
|
|
|
|
|
|
for _, header := range r.Header["Upgrade"] {
|
|
|
|
|
|
|
|
if header == "websocket" {
|
|
|
|
|
|
|
|
upgrade = true
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// Filter out non websocket requests
|
|
|
|
|
|
|
|
if fromRpc && !upgrade {
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
log.Info("received WS connection", "req_id", GetReqID(ctx))
|
|
|
|
log.Info("received WS connection", "req_id", GetReqID(ctx))
|
|
|
|
|
|
|
|
|
|
|
|
clientConn, err := s.upgrader.Upgrade(w, r, nil)
|
|
|
|
clientConn, err := s.upgrader.Upgrade(w, r, nil)
|
|
|
|