node: shut down all node-related HTTP servers gracefully (#20956)

Rather than just closing the underlying network listener to stop our
HTTP servers, use the graceful shutdown procedure, waiting for any
in-process requests to finish.
This commit is contained in:
Steven E. Harris 2020-04-27 05:16:00 -04:00 committed by GitHub
parent a070e23178
commit 40283d0522
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 42 deletions

@ -592,15 +592,16 @@ func signer(c *cli.Context) error {
// start http server // start http server
httpEndpoint := fmt.Sprintf("%s:%d", c.GlobalString(utils.RPCListenAddrFlag.Name), c.Int(rpcPortFlag.Name)) httpEndpoint := fmt.Sprintf("%s:%d", c.GlobalString(utils.RPCListenAddrFlag.Name), c.Int(rpcPortFlag.Name))
listener, err := node.StartHTTPEndpoint(httpEndpoint, rpc.DefaultHTTPTimeouts, handler) httpServer, addr, err := node.StartHTTPEndpoint(httpEndpoint, rpc.DefaultHTTPTimeouts, handler)
if err != nil { if err != nil {
utils.Fatalf("Could not start RPC api: %v", err) utils.Fatalf("Could not start RPC api: %v", err)
} }
extapiURL = fmt.Sprintf("http://%v/", listener.Addr()) extapiURL = fmt.Sprintf("http://%v/", addr)
log.Info("HTTP endpoint opened", "url", extapiURL) log.Info("HTTP endpoint opened", "url", extapiURL)
defer func() { defer func() {
listener.Close() // Don't bother imposing a timeout here.
httpServer.Shutdown(context.Background())
log.Info("HTTP endpoint closed", "url", extapiURL) log.Info("HTTP endpoint closed", "url", extapiURL)
}() }()
} }

@ -905,7 +905,7 @@ func retesteth(ctx *cli.Context) error {
IdleTimeout: 120 * time.Second, IdleTimeout: 120 * time.Second,
} }
httpEndpoint := fmt.Sprintf("%s:%d", ctx.GlobalString(utils.RPCListenAddrFlag.Name), ctx.Int(rpcPortFlag.Name)) httpEndpoint := fmt.Sprintf("%s:%d", ctx.GlobalString(utils.RPCListenAddrFlag.Name), ctx.Int(rpcPortFlag.Name))
listener, err := node.StartHTTPEndpoint(httpEndpoint, RetestethHTTPTimeouts, handler) httpServer, _, err := node.StartHTTPEndpoint(httpEndpoint, RetestethHTTPTimeouts, handler)
if err != nil { if err != nil {
utils.Fatalf("Could not start RPC api: %v", err) utils.Fatalf("Could not start RPC api: %v", err)
} }
@ -913,7 +913,8 @@ func retesteth(ctx *cli.Context) error {
log.Info("HTTP endpoint opened", "url", extapiURL) log.Info("HTTP endpoint opened", "url", extapiURL)
defer func() { defer func() {
listener.Close() // Don't bother imposing a timeout here.
httpServer.Shutdown(context.Background())
log.Info("HTTP endpoint closed", "url", httpEndpoint) log.Info("HTTP endpoint closed", "url", httpEndpoint)
}() }()

@ -26,14 +26,14 @@ import (
) )
// StartHTTPEndpoint starts the HTTP RPC endpoint. // StartHTTPEndpoint starts the HTTP RPC endpoint.
func StartHTTPEndpoint(endpoint string, timeouts rpc.HTTPTimeouts, handler http.Handler) (net.Listener, error) { func StartHTTPEndpoint(endpoint string, timeouts rpc.HTTPTimeouts, handler http.Handler) (*http.Server, net.Addr, error) {
// start the HTTP listener // start the HTTP listener
var ( var (
listener net.Listener listener net.Listener
err error err error
) )
if listener, err = net.Listen("tcp", endpoint); err != nil { if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, err return nil, nil, err
} }
// make sure timeout values are meaningful // make sure timeout values are meaningful
CheckTimeouts(&timeouts) CheckTimeouts(&timeouts)
@ -45,22 +45,22 @@ func StartHTTPEndpoint(endpoint string, timeouts rpc.HTTPTimeouts, handler http.
IdleTimeout: timeouts.IdleTimeout, IdleTimeout: timeouts.IdleTimeout,
} }
go httpSrv.Serve(listener) go httpSrv.Serve(listener)
return listener, err return httpSrv, listener.Addr(), err
} }
// startWSEndpoint starts a websocket endpoint. // startWSEndpoint starts a websocket endpoint.
func startWSEndpoint(endpoint string, handler http.Handler) (net.Listener, error) { func startWSEndpoint(endpoint string, handler http.Handler) (*http.Server, net.Addr, error) {
// start the HTTP listener // start the HTTP listener
var ( var (
listener net.Listener listener net.Listener
err error err error
) )
if listener, err = net.Listen("tcp", endpoint); err != nil { if listener, err = net.Listen("tcp", endpoint); err != nil {
return nil, err return nil, nil, err
} }
wsSrv := &http.Server{Handler: handler} wsSrv := &http.Server{Handler: handler}
go wsSrv.Serve(listener) go wsSrv.Serve(listener)
return listener, err return wsSrv, listener.Addr(), err
} }
// checkModuleAvailability checks that all names given in modules are actually // checkModuleAvailability checks that all names given in modules are actually

@ -17,9 +17,11 @@
package node package node
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"net" "net"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
@ -59,14 +61,16 @@ type Node struct {
ipcListener net.Listener // IPC RPC listener socket to serve API requests ipcListener net.Listener // IPC RPC listener socket to serve API requests
ipcHandler *rpc.Server // IPC RPC request handler to process the API requests ipcHandler *rpc.Server // IPC RPC request handler to process the API requests
httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled) httpEndpoint string // HTTP endpoint (interface + port) to listen at (empty = HTTP disabled)
httpWhitelist []string // HTTP RPC modules to allow through this endpoint httpWhitelist []string // HTTP RPC modules to allow through this endpoint
httpListener net.Listener // HTTP RPC listener socket to server API requests httpListenerAddr net.Addr // Address of HTTP RPC listener socket serving API requests
httpHandler *rpc.Server // HTTP RPC request handler to process the API requests httpServer *http.Server // HTTP RPC HTTP server
httpHandler *rpc.Server // HTTP RPC request handler to process the API requests
wsEndpoint string // Websocket endpoint (interface + port) to listen at (empty = websocket disabled) wsEndpoint string // WebSocket endpoint (interface + port) to listen at (empty = WebSocket disabled)
wsListener net.Listener // Websocket RPC listener socket to server API requests wsListenerAddr net.Addr // Address of WebSocket RPC listener socket serving API requests
wsHandler *rpc.Server // Websocket RPC request handler to process the API requests wsHTTPServer *http.Server // WebSocket RPC HTTP server
wsHandler *rpc.Server // WebSocket RPC request handler to process the API requests
stop chan struct{} // Channel to wait for termination notifications stop chan struct{} // Channel to wait for termination notifications
lock sync.RWMutex lock sync.RWMutex
@ -375,23 +379,24 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
return err return err
} }
handler := NewHTTPHandlerStack(srv, cors, vhosts) handler := NewHTTPHandlerStack(srv, cors, vhosts)
// wrap handler in websocket handler only if websocket port is the same as http rpc // wrap handler in WebSocket handler only if WebSocket port is the same as http rpc
if n.httpEndpoint == n.wsEndpoint { if n.httpEndpoint == n.wsEndpoint {
handler = NewWebsocketUpgradeHandler(handler, srv.WebsocketHandler(wsOrigins)) handler = NewWebsocketUpgradeHandler(handler, srv.WebsocketHandler(wsOrigins))
} }
listener, err := StartHTTPEndpoint(endpoint, timeouts, handler) httpServer, addr, err := StartHTTPEndpoint(endpoint, timeouts, handler)
if err != nil { if err != nil {
return err return err
} }
n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", listener.Addr()), n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", addr),
"cors", strings.Join(cors, ","), "cors", strings.Join(cors, ","),
"vhosts", strings.Join(vhosts, ",")) "vhosts", strings.Join(vhosts, ","))
if n.httpEndpoint == n.wsEndpoint { if n.httpEndpoint == n.wsEndpoint {
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", listener.Addr())) n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", addr))
} }
// All listeners booted successfully // All listeners booted successfully
n.httpEndpoint = endpoint n.httpEndpoint = endpoint
n.httpListener = listener n.httpListenerAddr = addr
n.httpServer = httpServer
n.httpHandler = srv n.httpHandler = srv
return nil return nil
@ -399,11 +404,10 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
// stopHTTP terminates the HTTP RPC endpoint. // stopHTTP terminates the HTTP RPC endpoint.
func (n *Node) stopHTTP() { func (n *Node) stopHTTP() {
if n.httpListener != nil { if n.httpServer != nil {
url := fmt.Sprintf("http://%v/", n.httpListener.Addr()) // Don't bother imposing a timeout here.
n.httpListener.Close() n.httpServer.Shutdown(context.Background())
n.httpListener = nil n.log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%v/", n.httpListenerAddr))
n.log.Info("HTTP endpoint closed", "url", url)
} }
if n.httpHandler != nil { if n.httpHandler != nil {
n.httpHandler.Stop() n.httpHandler.Stop()
@ -411,7 +415,7 @@ func (n *Node) stopHTTP() {
} }
} }
// startWS initializes and starts the websocket RPC endpoint. // startWS initializes and starts the WebSocket RPC endpoint.
func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error { func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
// Short circuit if the WS endpoint isn't being exposed // Short circuit if the WS endpoint isn't being exposed
if endpoint == "" { if endpoint == "" {
@ -424,26 +428,26 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
if err != nil { if err != nil {
return err return err
} }
listener, err := startWSEndpoint(endpoint, handler) httpServer, addr, err := startWSEndpoint(endpoint, handler)
if err != nil { if err != nil {
return err return err
} }
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr())) n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%v", addr))
// All listeners booted successfully // All listeners booted successfully
n.wsEndpoint = endpoint n.wsEndpoint = endpoint
n.wsListener = listener n.wsListenerAddr = addr
n.wsHTTPServer = httpServer
n.wsHandler = srv n.wsHandler = srv
return nil return nil
} }
// stopWS terminates the websocket RPC endpoint. // stopWS terminates the WebSocket RPC endpoint.
func (n *Node) stopWS() { func (n *Node) stopWS() {
if n.wsListener != nil { if n.wsHTTPServer != nil {
n.wsListener.Close() // Don't bother imposing a timeout here.
n.wsListener = nil n.wsHTTPServer.Shutdown(context.Background())
n.log.Info("WebSocket endpoint closed", "url", fmt.Sprintf("ws://%v", n.wsListenerAddr))
n.log.Info("WebSocket endpoint closed", "url", fmt.Sprintf("ws://%s", n.wsEndpoint))
} }
if n.wsHandler != nil { if n.wsHandler != nil {
n.wsHandler.Stop() n.wsHandler.Stop()
@ -607,8 +611,8 @@ func (n *Node) HTTPEndpoint() string {
n.lock.Lock() n.lock.Lock()
defer n.lock.Unlock() defer n.lock.Unlock()
if n.httpListener != nil { if n.httpListenerAddr != nil {
return n.httpListener.Addr().String() return n.httpListenerAddr.String()
} }
return n.httpEndpoint return n.httpEndpoint
} }
@ -618,8 +622,8 @@ func (n *Node) WSEndpoint() string {
n.lock.Lock() n.lock.Lock()
defer n.lock.Unlock() defer n.lock.Unlock()
if n.wsListener != nil { if n.wsListenerAddr != nil {
return n.wsListener.Addr().String() return n.wsListenerAddr.String()
} }
return n.wsEndpoint return n.wsEndpoint
} }