From 56df3a68924dd2e656c0b476cc784f3b390efc4f Mon Sep 17 00:00:00 2001 From: Felipe Andrade Date: Thu, 19 Oct 2023 12:48:03 -0700 Subject: [PATCH] add x-served-by header --- proxyd/proxyd/backend.go | 11 ++++++----- proxyd/proxyd/config.go | 2 ++ proxyd/proxyd/proxyd.go | 1 + proxyd/proxyd/server.go | 34 ++++++++++++++++++++++++++-------- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 24b2897..cd57979 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -665,9 +665,9 @@ type BackendGroup struct { Consensus *ConsensusPoller } -func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { +func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { if len(rpcReqs) == 0 { - return nil, nil + return nil, "", nil } backends := bg.Backends @@ -731,7 +731,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) || errors.Is(err, ErrMethodNotWhitelisted) { - return nil, err + return nil, "", err } if errors.Is(err, ErrBackendOffline) { log.Warn( @@ -773,11 +773,12 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch } } - return res, nil + servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name) + return res, servedBy, nil } RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP) - return nil, ErrNoBackends + return nil, "", ErrNoBackends } func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) { diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index 14e3ece..561c04e 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -24,6 +24,8 @@ type ServerConfig struct { EnableRequestLog bool `toml:"enable_request_log"` MaxRequestBodyLogLen int `toml:"max_request_body_log_len"` + + EnableXServedByHeader bool `toml:"enable_served_by_header"` } type CacheConfig struct { diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index 853a38a..84051ab 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -235,6 +235,7 @@ func Start(config *Config) (*Server, func(), error) { resolvedAuth, secondsToDuration(config.Server.TimeoutSeconds), config.Server.MaxUpstreamBatchSize, + config.Server.EnableXServedByHeader, rpcCache, config.RateLimit, config.SenderRateLimit, diff --git a/proxyd/proxyd/server.go b/proxyd/proxyd/server.go index c03d6de..4f165f0 100644 --- a/proxyd/proxyd/server.go +++ b/proxyd/proxyd/server.go @@ -60,6 +60,7 @@ type Server struct { timeout time.Duration maxUpstreamBatchSize int maxBatchSize int + enableServedByHeader bool upgrader *websocket.Upgrader mainLim FrontendRateLimiter overrideLims map[string]FrontendRateLimiter @@ -85,6 +86,7 @@ func NewServer( authenticatedPaths map[string]string, timeout time.Duration, maxUpstreamBatchSize int, + enableServedByHeader bool, cache RPCCache, rateLimitConfig RateLimitConfig, senderRateLimitConfig SenderRateLimitConfig, @@ -175,6 +177,7 @@ func NewServer( authenticatedPaths: authenticatedPaths, timeout: timeout, maxUpstreamBatchSize: maxUpstreamBatchSize, + enableServedByHeader: enableServedByHeader, cache: cache, enableRequestLog: enableRequestLog, maxRequestBodyLogLen: maxRequestBodyLogLen, @@ -354,7 +357,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return } - batchRes, batchContainsCached, err := s.handleBatchRPC(ctx, reqs, isLimited, true) + batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(ctx, reqs, isLimited, true) if err == context.DeadlineExceeded { writeRPCError(ctx, w, nil, ErrGatewayTimeout) return @@ -368,14 +371,16 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { writeRPCError(ctx, w, nil, ErrInternal) return } - + if s.enableServedByHeader { + w.Header().Set("x-served-by", servedBy) + } setCacheHeader(w, batchContainsCached) writeBatchRPCRes(ctx, w, batchRes) return } rawBody := json.RawMessage(body) - backendRes, cached, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false) + backendRes, cached, servedBy, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false) if err != nil { if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) { @@ -385,11 +390,14 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { writeRPCError(ctx, w, nil, ErrInternal) return } + if s.enableServedByHeader { + w.Header().Set("x-served-by", servedBy) + } setCacheHeader(w, cached) writeRPCRes(ctx, w, backendRes[0]) } -func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, error) { +func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) { // A request set is transformed into groups of batches. // Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints) // A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's @@ -475,6 +483,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL batches[batchGroup] = append(batches[batchGroup], batchElem{parsedReq, i}) } + servedBy := make(map[string]bool, 0) var cached bool for group, batch := range batches { var cacheMisses []batchElem @@ -499,17 +508,18 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL "batch_index", i, ) batchRPCShortCircuitsTotal.Inc() - return nil, false, context.DeadlineExceeded + return nil, false, "", context.DeadlineExceeded } start := i * s.maxUpstreamBatchSize end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses)))) elems := cacheMisses[start:end] - res, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) + res, sb, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch) + servedBy[sb] = true if err != nil { if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) || errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) { - return nil, false, err + return nil, false, "", err } log.Error( "error forwarding RPC batch", @@ -541,7 +551,15 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL } } - return responses, cached, nil + servedByString := "" + for sb, _ := range servedBy { + if servedByString != "" { + servedByString += ", " + } + servedByString += sb + } + + return responses, cached, servedByString, nil } func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {