feat(proxyd): customise poller interval and enable cross origin check on web socket upgrader (#10107)

* feat: poller interval arg

* fix: websocket check origin

* fix: check origin behind flag

* fix: rename poller default interval
This commit is contained in:
tdot 2024-04-11 16:07:16 -06:00 committed by GitHub
parent 3a1b2633bb
commit 094edc4fc6
3 changed files with 27 additions and 6 deletions

@ -26,6 +26,7 @@ type ServerConfig struct {
MaxRequestBodyLogLen int `toml:"max_request_body_log_len"` MaxRequestBodyLogLen int `toml:"max_request_body_log_len"`
EnablePprof bool `toml:"enable_pprof"` EnablePprof bool `toml:"enable_pprof"`
EnableXServedByHeader bool `toml:"enable_served_by_header"` EnableXServedByHeader bool `toml:"enable_served_by_header"`
AllowAllOrigins bool `toml:"allow_all_origins"`
} }
type CacheConfig struct { type CacheConfig struct {
@ -113,6 +114,7 @@ type BackendGroupConfig struct {
ConsensusAware bool `toml:"consensus_aware"` ConsensusAware bool `toml:"consensus_aware"`
ConsensusAsyncHandler string `toml:"consensus_handler"` ConsensusAsyncHandler string `toml:"consensus_handler"`
ConsensusPollerInterval TOMLDuration `toml:"consensus_poller_interval"`
ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"` ConsensusBanPeriod TOMLDuration `toml:"consensus_ban_period"`
ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"` ConsensusMaxUpdateThreshold TOMLDuration `toml:"consensus_max_update_threshold"`

@ -14,7 +14,7 @@ import (
) )
const ( const (
PollerInterval = 1 * time.Second DefaultPollerInterval = 1 * time.Second
) )
type OnConsensusBroken func() type OnConsensusBroken func()
@ -40,6 +40,7 @@ type ConsensusPoller struct {
maxUpdateThreshold time.Duration maxUpdateThreshold time.Duration
maxBlockLag uint64 maxBlockLag uint64
maxBlockRange uint64 maxBlockRange uint64
interval time.Duration
} }
type backendState struct { type backendState struct {
@ -125,7 +126,7 @@ func (ah *PollerAsyncHandler) Init() {
for _, be := range ah.cp.backendGroup.Backends { for _, be := range ah.cp.backendGroup.Backends {
go func(be *Backend) { go func(be *Backend) {
for { for {
timer := time.NewTimer(PollerInterval) timer := time.NewTimer(ah.cp.interval)
ah.cp.UpdateBackend(ah.ctx, be) ah.cp.UpdateBackend(ah.ctx, be)
select { select {
@ -141,7 +142,7 @@ func (ah *PollerAsyncHandler) Init() {
// create the group consensus poller // create the group consensus poller
go func() { go func() {
for { for {
timer := time.NewTimer(PollerInterval) timer := time.NewTimer(ah.cp.interval)
ah.cp.UpdateBackendGroupConsensus(ah.ctx) ah.cp.UpdateBackendGroupConsensus(ah.ctx)
select { select {
@ -215,6 +216,12 @@ func WithMinPeerCount(minPeerCount uint64) ConsensusOpt {
} }
} }
func WithPollerInterval(interval time.Duration) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.interval = interval
}
}
func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller { func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller {
ctx, cancelFunc := context.WithCancel(context.Background()) ctx, cancelFunc := context.WithCancel(context.Background())
@ -230,6 +237,7 @@ func NewConsensusPoller(bg *BackendGroup, opts ...ConsensusOpt) *ConsensusPoller
maxUpdateThreshold: 30 * time.Second, maxUpdateThreshold: 30 * time.Second,
maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes maxBlockLag: 8, // 8*12 seconds = 96 seconds ~ 1.6 minutes
minPeerCount: 3, minPeerCount: 3,
interval: DefaultPollerInterval,
} }
for _, opt := range opts { for _, opt := range opts {

@ -272,6 +272,14 @@ func Start(config *Config) (*Server, func(), error) {
return nil, nil, fmt.Errorf("error creating server: %w", err) return nil, nil, fmt.Errorf("error creating server: %w", err)
} }
// Enable to support browser websocket connections.
// See https://pkg.go.dev/github.com/gorilla/websocket#hdr-Origin_Considerations
if config.Server.AllowAllOrigins {
srv.upgrader.CheckOrigin = func(r *http.Request) bool {
return true
}
}
if config.Metrics.Enabled { if config.Metrics.Enabled {
addr := fmt.Sprintf("%s:%d", config.Metrics.Host, config.Metrics.Port) addr := fmt.Sprintf("%s:%d", config.Metrics.Host, config.Metrics.Port)
log.Info("starting metrics server", "addr", addr) log.Info("starting metrics server", "addr", addr)
@ -338,6 +346,9 @@ func Start(config *Config) (*Server, func(), error) {
if bgcfg.ConsensusMaxBlockRange > 0 { if bgcfg.ConsensusMaxBlockRange > 0 {
copts = append(copts, WithMaxBlockRange(bgcfg.ConsensusMaxBlockRange)) copts = append(copts, WithMaxBlockRange(bgcfg.ConsensusMaxBlockRange))
} }
if bgcfg.ConsensusPollerInterval > 0 {
copts = append(copts, WithPollerInterval(time.Duration(bgcfg.ConsensusPollerInterval)))
}
var tracker ConsensusTracker var tracker ConsensusTracker
if bgcfg.ConsensusHA { if bgcfg.ConsensusHA {
@ -349,7 +360,7 @@ func Start(config *Config) (*Server, func(), error) {
topts = append(topts, WithLockPeriod(time.Duration(bgcfg.ConsensusHALockPeriod))) topts = append(topts, WithLockPeriod(time.Duration(bgcfg.ConsensusHALockPeriod)))
} }
if bgcfg.ConsensusHAHeartbeatInterval > 0 { if bgcfg.ConsensusHAHeartbeatInterval > 0 {
topts = append(topts, WithLockPeriod(time.Duration(bgcfg.ConsensusHAHeartbeatInterval))) topts = append(topts, WithHeartbeatInterval(time.Duration(bgcfg.ConsensusHAHeartbeatInterval)))
} }
consensusHARedisClient, err := NewRedisClient(bgcfg.ConsensusHARedis.URL) consensusHARedisClient, err := NewRedisClient(bgcfg.ConsensusHARedis.URL)
if err != nil { if err != nil {