diff --git a/proxyd/proxyd/backend.go b/proxyd/proxyd/backend.go index 8445af3..f50926b 100644 --- a/proxyd/proxyd/backend.go +++ b/proxyd/proxyd/backend.go @@ -202,6 +202,12 @@ func WithProxydIP(ip string) BackendOpt { } } +func WithMaxDegradedLatencyThreshold(maxDegradedLatencyThreshold time.Duration) BackendOpt { + return func(b *Backend) { + b.maxDegradedLatencyThreshold = maxDegradedLatencyThreshold + } +} + func WithMaxLatencyThreshold(maxLatencyThreshold time.Duration) BackendOpt { return func(b *Backend) { b.maxLatencyThreshold = maxLatencyThreshold diff --git a/proxyd/proxyd/config.go b/proxyd/proxyd/config.go index 94c1f83..0222a83 100644 --- a/proxyd/proxyd/config.go +++ b/proxyd/proxyd/config.go @@ -71,10 +71,13 @@ func (t *TOMLDuration) UnmarshalText(b []byte) error { } type BackendOptions struct { - ResponseTimeoutSeconds int `toml:"response_timeout_seconds"` - MaxResponseSizeBytes int64 `toml:"max_response_size_bytes"` - MaxRetries int `toml:"max_retries"` - OutOfServiceSeconds int `toml:"out_of_service_seconds"` + ResponseTimeoutSeconds int `toml:"response_timeout_seconds"` + MaxResponseSizeBytes int64 `toml:"max_response_size_bytes"` + MaxRetries int `toml:"max_retries"` + OutOfServiceSeconds int `toml:"out_of_service_seconds"` + MaxDegradedLatencyThreshold TOMLDuration `toml:"max_degraded_latency_threshold"` + MaxLatencyThreshold TOMLDuration `toml:"max_latency_threshold"` + MaxErrorRateThreshold float64 `toml:"max_error_rate_threshold"` } type BackendConfig struct { @@ -94,9 +97,14 @@ type BackendConfig struct { type BackendsConfig map[string]*BackendConfig type BackendGroupConfig struct { - Backends []string `toml:"backends"` - ConsensusAware bool `toml:"consensus_aware"` - ConsensusAsyncHandler string `toml:"consensus_handler"` + Backends []string `toml:"backends"` + + ConsensusAware bool `toml:"consensus_aware"` + ConsensusAsyncHandler string `toml:"consensus_handler"` + + ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"` + ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"` + ConsensusMinPeerCount int `toml:"consensus_min_peer_count"` } type BackendGroupsConfig map[string]*BackendGroupConfig diff --git a/proxyd/proxyd/example.config.toml b/proxyd/proxyd/example.config.toml index 16408e8..053b5fd 100644 --- a/proxyd/proxyd/example.config.toml +++ b/proxyd/proxyd/example.config.toml @@ -44,6 +44,12 @@ max_response_size_bytes = 5242880 max_retries = 3 # Number of seconds to wait before trying an unhealthy backend again. out_of_service_seconds = 600 +# Maximum latency accepted to serve requests, default 10s +max_latency_threshold = "30s" +# Maximum latency accepted to serve requests before degraded, default 5s +max_degraded_latency_threshold = "10s" +# Maximum error rate accepted to serve requests, default 0.5 (i.e. 50%) +max_error_rate_threshold = 0.3 [backends] # A map of backends by name. @@ -78,6 +84,14 @@ max_ws_conns = 1 [backend_groups] [backend_groups.main] backends = ["infura"] +# Enable consensus awareness for backend group, making it act as a load balancer, default false +# consensus_aware = true +# Period in which the backend wont serve requests if banned, default 5m +# consensus_ban_period = "1m" +# Maximum delay for update the backend, default 30s +# consensus_max_update_threshold = "20s" +# Minimum peer count, default 3 +# consensus_min_peer_count = 4 [backend_groups.alchemy] backends = ["alchemy"] diff --git a/proxyd/proxyd/integration_tests/testdata/consensus.toml b/proxyd/proxyd/integration_tests/testdata/consensus.toml index bc9b43e..d26b9dc 100644 --- a/proxyd/proxyd/integration_tests/testdata/consensus.toml +++ b/proxyd/proxyd/integration_tests/testdata/consensus.toml @@ -16,6 +16,9 @@ rpc_url = "$NODE2_URL" backends = ["node1", "node2"] consensus_aware = true consensus_handler = "noop" # allow more control over the consensus poller for tests +consensus_ban_period = "1m" +consensus_max_update_threshold = "2m" +consensus_min_peer_count = 4 [rpc_method_mappings] eth_call = "node" diff --git a/proxyd/proxyd/proxyd.go b/proxyd/proxyd/proxyd.go index 5a34fa6..f682272 100644 --- a/proxyd/proxyd/proxyd.go +++ b/proxyd/proxyd/proxyd.go @@ -123,6 +123,15 @@ func Start(config *Config) (*Server, func(), error) { if config.BackendOptions.OutOfServiceSeconds != 0 { opts = append(opts, WithOutOfServiceDuration(secondsToDuration(config.BackendOptions.OutOfServiceSeconds))) } + if config.BackendOptions.MaxDegradedLatencyThreshold > 0 { + opts = append(opts, WithMaxDegradedLatencyThreshold(time.Duration(config.BackendOptions.MaxDegradedLatencyThreshold))) + } + if config.BackendOptions.MaxLatencyThreshold > 0 { + opts = append(opts, WithMaxLatencyThreshold(time.Duration(config.BackendOptions.MaxLatencyThreshold))) + } + if config.BackendOptions.MaxErrorRateThreshold > 0 { + opts = append(opts, WithMaxErrorRateThreshold(config.BackendOptions.MaxErrorRateThreshold)) + } if cfg.MaxRPS != 0 { opts = append(opts, WithMaxRPS(cfg.MaxRPS)) } @@ -148,6 +157,7 @@ func Start(config *Config) (*Server, func(), error) { opts = append(opts, WithStrippedTrailingXFF()) } opts = append(opts, WithProxydIP(os.Getenv("PROXYD_IP"))) + back := NewBackend(name, rpcURL, wsURL, lim, rpcRequestSemaphore, opts...) backendNames = append(backendNames, name) backendsByName[name] = back @@ -302,14 +312,25 @@ func Start(config *Config) (*Server, func(), error) { } for bgName, bg := range backendGroups { - if config.BackendGroups[bgName].ConsensusAware { + bgcfg := config.BackendGroups[bgName] + if bgcfg.ConsensusAware { log.Info("creating poller for consensus aware backend_group", "name", bgName) copts := make([]ConsensusOpt, 0) - if config.BackendGroups[bgName].ConsensusAsyncHandler == "noop" { + if bgcfg.ConsensusAsyncHandler == "noop" { copts = append(copts, WithAsyncHandler(NewNoopAsyncHandler())) } + if bgcfg.ConsensusBanPeriod > 0 { + copts = append(copts, WithBanPeriod(time.Duration(bgcfg.ConsensusBanPeriod))) + } + if bgcfg.ConsensusMaxUpdateThreshold > 0 { + copts = append(copts, WithMaxUpdateThreshold(time.Duration(bgcfg.ConsensusMaxUpdateThreshold))) + } + if bgcfg.ConsensusMinPeerCount > 0 { + copts = append(copts, WithMinPeerCount(uint64(bgcfg.ConsensusMinPeerCount))) + } + cp := NewConsensusPoller(bg, copts...) bg.Consensus = cp }