add x-served-by header
This commit is contained in:
parent
c82a9a08de
commit
56df3a6892
@ -665,9 +665,9 @@ type BackendGroup struct {
|
||||
Consensus *ConsensusPoller
|
||||
}
|
||||
|
||||
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
|
||||
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
|
||||
if len(rpcReqs) == 0 {
|
||||
return nil, nil
|
||||
return nil, "", nil
|
||||
}
|
||||
|
||||
backends := bg.Backends
|
||||
@ -731,7 +731,7 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
|
||||
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
|
||||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) ||
|
||||
errors.Is(err, ErrMethodNotWhitelisted) {
|
||||
return nil, err
|
||||
return nil, "", err
|
||||
}
|
||||
if errors.Is(err, ErrBackendOffline) {
|
||||
log.Warn(
|
||||
@ -773,11 +773,12 @@ func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch
|
||||
}
|
||||
}
|
||||
|
||||
return res, nil
|
||||
servedBy := fmt.Sprintf("%s/%s", bg.Name, back.Name)
|
||||
return res, servedBy, nil
|
||||
}
|
||||
|
||||
RecordUnserviceableRequest(ctx, RPCRequestSourceHTTP)
|
||||
return nil, ErrNoBackends
|
||||
return nil, "", ErrNoBackends
|
||||
}
|
||||
|
||||
func (bg *BackendGroup) ProxyWS(ctx context.Context, clientConn *websocket.Conn, methodWhitelist *StringSet) (*WSProxier, error) {
|
||||
|
@ -24,6 +24,8 @@ type ServerConfig struct {
|
||||
|
||||
EnableRequestLog bool `toml:"enable_request_log"`
|
||||
MaxRequestBodyLogLen int `toml:"max_request_body_log_len"`
|
||||
|
||||
EnableXServedByHeader bool `toml:"enable_served_by_header"`
|
||||
}
|
||||
|
||||
type CacheConfig struct {
|
||||
|
@ -235,6 +235,7 @@ func Start(config *Config) (*Server, func(), error) {
|
||||
resolvedAuth,
|
||||
secondsToDuration(config.Server.TimeoutSeconds),
|
||||
config.Server.MaxUpstreamBatchSize,
|
||||
config.Server.EnableXServedByHeader,
|
||||
rpcCache,
|
||||
config.RateLimit,
|
||||
config.SenderRateLimit,
|
||||
|
@ -60,6 +60,7 @@ type Server struct {
|
||||
timeout time.Duration
|
||||
maxUpstreamBatchSize int
|
||||
maxBatchSize int
|
||||
enableServedByHeader bool
|
||||
upgrader *websocket.Upgrader
|
||||
mainLim FrontendRateLimiter
|
||||
overrideLims map[string]FrontendRateLimiter
|
||||
@ -85,6 +86,7 @@ func NewServer(
|
||||
authenticatedPaths map[string]string,
|
||||
timeout time.Duration,
|
||||
maxUpstreamBatchSize int,
|
||||
enableServedByHeader bool,
|
||||
cache RPCCache,
|
||||
rateLimitConfig RateLimitConfig,
|
||||
senderRateLimitConfig SenderRateLimitConfig,
|
||||
@ -175,6 +177,7 @@ func NewServer(
|
||||
authenticatedPaths: authenticatedPaths,
|
||||
timeout: timeout,
|
||||
maxUpstreamBatchSize: maxUpstreamBatchSize,
|
||||
enableServedByHeader: enableServedByHeader,
|
||||
cache: cache,
|
||||
enableRequestLog: enableRequestLog,
|
||||
maxRequestBodyLogLen: maxRequestBodyLogLen,
|
||||
@ -354,7 +357,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
batchRes, batchContainsCached, err := s.handleBatchRPC(ctx, reqs, isLimited, true)
|
||||
batchRes, batchContainsCached, servedBy, err := s.handleBatchRPC(ctx, reqs, isLimited, true)
|
||||
if err == context.DeadlineExceeded {
|
||||
writeRPCError(ctx, w, nil, ErrGatewayTimeout)
|
||||
return
|
||||
@ -368,14 +371,16 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
||||
writeRPCError(ctx, w, nil, ErrInternal)
|
||||
return
|
||||
}
|
||||
|
||||
if s.enableServedByHeader {
|
||||
w.Header().Set("x-served-by", servedBy)
|
||||
}
|
||||
setCacheHeader(w, batchContainsCached)
|
||||
writeBatchRPCRes(ctx, w, batchRes)
|
||||
return
|
||||
}
|
||||
|
||||
rawBody := json.RawMessage(body)
|
||||
backendRes, cached, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false)
|
||||
backendRes, cached, servedBy, err := s.handleBatchRPC(ctx, []json.RawMessage{rawBody}, isLimited, false)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
|
||||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
|
||||
@ -385,11 +390,14 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) {
|
||||
writeRPCError(ctx, w, nil, ErrInternal)
|
||||
return
|
||||
}
|
||||
if s.enableServedByHeader {
|
||||
w.Header().Set("x-served-by", servedBy)
|
||||
}
|
||||
setCacheHeader(w, cached)
|
||||
writeRPCRes(ctx, w, backendRes[0])
|
||||
}
|
||||
|
||||
func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, error) {
|
||||
func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isLimited limiterFunc, isBatch bool) ([]*RPCRes, bool, string, error) {
|
||||
// A request set is transformed into groups of batches.
|
||||
// Each batch group maps to a forwarded JSON-RPC batch request (subject to maxUpstreamBatchSize constraints)
|
||||
// A groupID is used to decouple Requests that have duplicate ID so they're not part of the same batch that's
|
||||
@ -475,6 +483,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
||||
batches[batchGroup] = append(batches[batchGroup], batchElem{parsedReq, i})
|
||||
}
|
||||
|
||||
servedBy := make(map[string]bool, 0)
|
||||
var cached bool
|
||||
for group, batch := range batches {
|
||||
var cacheMisses []batchElem
|
||||
@ -499,17 +508,18 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
||||
"batch_index", i,
|
||||
)
|
||||
batchRPCShortCircuitsTotal.Inc()
|
||||
return nil, false, context.DeadlineExceeded
|
||||
return nil, false, "", context.DeadlineExceeded
|
||||
}
|
||||
|
||||
start := i * s.maxUpstreamBatchSize
|
||||
end := int(math.Min(float64(start+s.maxUpstreamBatchSize), float64(len(cacheMisses))))
|
||||
elems := cacheMisses[start:end]
|
||||
res, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch)
|
||||
res, sb, err := s.BackendGroups[group.backendGroup].Forward(ctx, createBatchRequest(elems), isBatch)
|
||||
servedBy[sb] = true
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrConsensusGetReceiptsCantBeBatched) ||
|
||||
errors.Is(err, ErrConsensusGetReceiptsInvalidTarget) {
|
||||
return nil, false, err
|
||||
return nil, false, "", err
|
||||
}
|
||||
log.Error(
|
||||
"error forwarding RPC batch",
|
||||
@ -541,7 +551,15 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL
|
||||
}
|
||||
}
|
||||
|
||||
return responses, cached, nil
|
||||
servedByString := ""
|
||||
for sb, _ := range servedBy {
|
||||
if servedByString != "" {
|
||||
servedByString += ", "
|
||||
}
|
||||
servedByString += sb
|
||||
}
|
||||
|
||||
return responses, cached, servedByString, nil
|
||||
}
|
||||
|
||||
func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) {
|
||||
|
Loading…
Reference in New Issue
Block a user