bsc/rpc/handler.go

424 lines
12 KiB
Go
Raw Permalink Normal View History

2019-07-22 12:17:27 +03:00
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"context"
"encoding/json"
"reflect"
"strconv"
"strings"
"sync"
"time"
[R4R] performance improvement in many aspects (#257) * focus on performance improvement in many aspects. 1. Do BlockBody verification concurrently; 2. Do calculation of intermediate root concurrently; 3. Preload accounts before processing blocks; 4. Make the snapshot layers configurable. 5. Reuse some object to reduce GC. add * rlp: improve decoder stream implementation (#22858) This commit makes various cleanup changes to rlp.Stream. * rlp: shrink Stream struct This removes a lot of unused padding space in Stream by reordering the fields. The size of Stream changes from 120 bytes to 88 bytes. Stream instances are internally cached and reused using sync.Pool, so this does not improve performance. * rlp: simplify list stack The list stack kept track of the size of the current list context as well as the current offset into it. The size had to be stored in the stack in order to subtract it from the remaining bytes of any enclosing list in ListEnd. It seems that this can be implemented in a simpler way: just subtract the size from the enclosing list context in List instead. * rlp: use atomic.Value for type cache (#22902) All encoding/decoding operations read the type cache to find the writer/decoder function responsible for a type. When analyzing CPU profiles of geth during sync, I found that the use of sync.RWMutex in cache lookups appears in the profiles. It seems we are running into CPU cache contention problems when package rlp is heavily used on all CPU cores during sync. This change makes it use atomic.Value + a writer lock instead of sync.RWMutex. In the common case where the typeinfo entry is present in the cache, we simply fetch the map and lookup the type. * rlp: optimize byte array handling (#22924) This change improves the performance of encoding/decoding [N]byte. name old time/op new time/op delta DecodeByteArrayStruct-8 336ns ± 0% 246ns ± 0% -26.98% (p=0.000 n=9+10) EncodeByteArrayStruct-8 225ns ± 1% 148ns ± 1% -34.12% (p=0.000 n=10+10) name old alloc/op new alloc/op delta DecodeByteArrayStruct-8 120B ± 0% 48B ± 0% -60.00% (p=0.000 n=10+10) EncodeByteArrayStruct-8 0.00B 0.00B ~ (all equal) * rlp: optimize big.Int decoding for size <= 32 bytes (#22927) This change grows the static integer buffer in Stream to 32 bytes, making it possible to decode 256bit integers without allocating a temporary buffer. In the recent commit 088da24, Stream struct size decreased from 120 bytes down to 88 bytes. This commit grows the struct to 112 bytes again, but the size change will not degrade performance because Stream instances are internally cached in sync.Pool. name old time/op new time/op delta DecodeBigInts-8 12.2µs ± 0% 8.6µs ± 4% -29.58% (p=0.000 n=9+10) name old speed new speed delta DecodeBigInts-8 230MB/s ± 0% 326MB/s ± 4% +42.04% (p=0.000 n=9+10) * eth/protocols/eth, les: avoid Raw() when decoding HashOrNumber (#22841) Getting the raw value is not necessary to decode this type, and decoding it directly from the stream is faster. * fix testcase * debug no lazy * fix can not repair * address comments Co-authored-by: Felix Lange <fjl@twurst.com>
2021-07-29 17:16:53 +08:00
"github.com/ethereum/go-ethereum/common/gopool"
"github.com/ethereum/go-ethereum/log"
)
// handler handles JSON-RPC messages. There is one handler per connection. Note that
// handler is not safe for concurrent use. Message handling never blocks indefinitely
// because RPCs are processed on background goroutines launched by handler.
//
// The entry points for incoming messages are:
//
// h.handleMsg(message)
// h.handleBatch(message)
//
// Outgoing calls use the requestOp struct. Register the request before sending it
// on the connection:
//
// op := &requestOp{ids: ...}
// h.addRequestOp(op)
//
// Now send the request, then wait for the reply to be delivered through handleMsg:
//
// if err := op.wait(...); err != nil {
// h.removeRequestOp(op) // timeout, etc.
// }
//
type handler struct {
reg *serviceRegistry
unsubscribeCb *callback
idgen func() ID // subscription ID generator
respWait map[string]*requestOp // active client requests
clientSubs map[string]*ClientSubscription // active client subscriptions
callWG sync.WaitGroup // pending call goroutines
rootCtx context.Context // canceled by close()
cancelRoot func() // cancel function for rootCtx
conn jsonWriter // where responses will be sent
log log.Logger
allowSubscribe bool
subLock sync.Mutex
serverSubs map[ID]*Subscription
}
type callProc struct {
ctx context.Context
notifiers []*Notifier
}
func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
rootCtx, cancelRoot := context.WithCancel(connCtx)
h := &handler{
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
}
if conn.remoteAddr() != "" {
h.log = h.log.New("conn", conn.remoteAddr())
}
h.unsubscribeCb = newCallback(reflect.Value{}, reflect.ValueOf(h.unsubscribe))
return h
}
// handleBatch executes all messages in a batch and returns the responses.
2021-01-10 20:34:56 +08:00
func (h *handler) handleBatch(ctx context.Context, msgs []*jsonrpcMessage) {
// Emit error response for empty batches:
if len(msgs) == 0 {
h.startCallProc(func(cp *callProc) {
h.conn.writeJSON(cp.ctx, errorMessage(&invalidRequestError{"empty batch"}))
})
return
}
// Handle non-call messages first:
calls := make([]*jsonrpcMessage, 0, len(msgs))
for _, msg := range msgs {
if handled := h.handleImmediate(msg); !handled {
calls = append(calls, msg)
}
}
if len(calls) == 0 {
return
}
// Process calls on a goroutine because they may block indefinitely:
h.startCallProc(func(cp *callProc) {
answers := make([]*jsonrpcMessage, 0, len(msgs))
for _, msg := range calls {
2021-01-10 20:34:56 +08:00
if answer := h.handleCallMsg(cp, ctx, msg); answer != nil {
answers = append(answers, answer)
}
}
h.addSubscriptions(cp.notifiers)
if len(answers) > 0 {
h.conn.writeJSON(cp.ctx, answers)
}
for _, n := range cp.notifiers {
n.activate()
}
})
}
// handleMsg handles a single message.
2021-01-10 20:34:56 +08:00
func (h *handler) handleMsg(ctx context.Context, msg *jsonrpcMessage) {
if ok := h.handleImmediate(msg); ok {
return
}
h.startCallProc(func(cp *callProc) {
2021-01-10 20:34:56 +08:00
answer := h.handleCallMsg(cp, ctx, msg)
h.addSubscriptions(cp.notifiers)
if answer != nil {
h.conn.writeJSON(cp.ctx, answer)
}
for _, n := range cp.notifiers {
n.activate()
}
})
}
// close cancels all requests except for inflightReq and waits for
// call goroutines to shut down.
func (h *handler) close(err error, inflightReq *requestOp) {
h.cancelAllRequests(err, inflightReq)
h.callWG.Wait()
h.cancelRoot()
h.cancelServerSubscriptions(err)
}
// addRequestOp registers a request operation.
func (h *handler) addRequestOp(op *requestOp) {
for _, id := range op.ids {
h.respWait[string(id)] = op
}
}
// removeRequestOps stops waiting for the given request IDs.
func (h *handler) removeRequestOp(op *requestOp) {
for _, id := range op.ids {
delete(h.respWait, string(id))
}
}
// cancelAllRequests unblocks and removes pending requests and active subscriptions.
func (h *handler) cancelAllRequests(err error, inflightReq *requestOp) {
didClose := make(map[*requestOp]bool)
if inflightReq != nil {
didClose[inflightReq] = true
}
for id, op := range h.respWait {
// Remove the op so that later calls will not close op.resp again.
delete(h.respWait, id)
if !didClose[op] {
op.err = err
close(op.resp)
didClose[op] = true
}
}
for id, sub := range h.clientSubs {
delete(h.clientSubs, id)
sub.close(err)
}
}
func (h *handler) addSubscriptions(nn []*Notifier) {
h.subLock.Lock()
defer h.subLock.Unlock()
for _, n := range nn {
if sub := n.takeSubscription(); sub != nil {
h.serverSubs[sub.ID] = sub
}
}
}
// cancelServerSubscriptions removes all subscriptions and closes their error channels.
func (h *handler) cancelServerSubscriptions(err error) {
h.subLock.Lock()
defer h.subLock.Unlock()
for id, s := range h.serverSubs {
s.err <- err
close(s.err)
delete(h.serverSubs, id)
}
}
// startCallProc runs fn in a new goroutine and starts tracking it in the h.calls wait group.
func (h *handler) startCallProc(fn func(*callProc)) {
h.callWG.Add(1)
[R4R] performance improvement in many aspects (#257) * focus on performance improvement in many aspects. 1. Do BlockBody verification concurrently; 2. Do calculation of intermediate root concurrently; 3. Preload accounts before processing blocks; 4. Make the snapshot layers configurable. 5. Reuse some object to reduce GC. add * rlp: improve decoder stream implementation (#22858) This commit makes various cleanup changes to rlp.Stream. * rlp: shrink Stream struct This removes a lot of unused padding space in Stream by reordering the fields. The size of Stream changes from 120 bytes to 88 bytes. Stream instances are internally cached and reused using sync.Pool, so this does not improve performance. * rlp: simplify list stack The list stack kept track of the size of the current list context as well as the current offset into it. The size had to be stored in the stack in order to subtract it from the remaining bytes of any enclosing list in ListEnd. It seems that this can be implemented in a simpler way: just subtract the size from the enclosing list context in List instead. * rlp: use atomic.Value for type cache (#22902) All encoding/decoding operations read the type cache to find the writer/decoder function responsible for a type. When analyzing CPU profiles of geth during sync, I found that the use of sync.RWMutex in cache lookups appears in the profiles. It seems we are running into CPU cache contention problems when package rlp is heavily used on all CPU cores during sync. This change makes it use atomic.Value + a writer lock instead of sync.RWMutex. In the common case where the typeinfo entry is present in the cache, we simply fetch the map and lookup the type. * rlp: optimize byte array handling (#22924) This change improves the performance of encoding/decoding [N]byte. name old time/op new time/op delta DecodeByteArrayStruct-8 336ns ± 0% 246ns ± 0% -26.98% (p=0.000 n=9+10) EncodeByteArrayStruct-8 225ns ± 1% 148ns ± 1% -34.12% (p=0.000 n=10+10) name old alloc/op new alloc/op delta DecodeByteArrayStruct-8 120B ± 0% 48B ± 0% -60.00% (p=0.000 n=10+10) EncodeByteArrayStruct-8 0.00B 0.00B ~ (all equal) * rlp: optimize big.Int decoding for size <= 32 bytes (#22927) This change grows the static integer buffer in Stream to 32 bytes, making it possible to decode 256bit integers without allocating a temporary buffer. In the recent commit 088da24, Stream struct size decreased from 120 bytes down to 88 bytes. This commit grows the struct to 112 bytes again, but the size change will not degrade performance because Stream instances are internally cached in sync.Pool. name old time/op new time/op delta DecodeBigInts-8 12.2µs ± 0% 8.6µs ± 4% -29.58% (p=0.000 n=9+10) name old speed new speed delta DecodeBigInts-8 230MB/s ± 0% 326MB/s ± 4% +42.04% (p=0.000 n=9+10) * eth/protocols/eth, les: avoid Raw() when decoding HashOrNumber (#22841) Getting the raw value is not necessary to decode this type, and decoding it directly from the stream is faster. * fix testcase * debug no lazy * fix can not repair * address comments Co-authored-by: Felix Lange <fjl@twurst.com>
2021-07-29 17:16:53 +08:00
gopool.Submit(func() {
ctx, cancel := context.WithCancel(h.rootCtx)
defer h.callWG.Done()
defer cancel()
fn(&callProc{ctx: ctx})
[R4R] performance improvement in many aspects (#257) * focus on performance improvement in many aspects. 1. Do BlockBody verification concurrently; 2. Do calculation of intermediate root concurrently; 3. Preload accounts before processing blocks; 4. Make the snapshot layers configurable. 5. Reuse some object to reduce GC. add * rlp: improve decoder stream implementation (#22858) This commit makes various cleanup changes to rlp.Stream. * rlp: shrink Stream struct This removes a lot of unused padding space in Stream by reordering the fields. The size of Stream changes from 120 bytes to 88 bytes. Stream instances are internally cached and reused using sync.Pool, so this does not improve performance. * rlp: simplify list stack The list stack kept track of the size of the current list context as well as the current offset into it. The size had to be stored in the stack in order to subtract it from the remaining bytes of any enclosing list in ListEnd. It seems that this can be implemented in a simpler way: just subtract the size from the enclosing list context in List instead. * rlp: use atomic.Value for type cache (#22902) All encoding/decoding operations read the type cache to find the writer/decoder function responsible for a type. When analyzing CPU profiles of geth during sync, I found that the use of sync.RWMutex in cache lookups appears in the profiles. It seems we are running into CPU cache contention problems when package rlp is heavily used on all CPU cores during sync. This change makes it use atomic.Value + a writer lock instead of sync.RWMutex. In the common case where the typeinfo entry is present in the cache, we simply fetch the map and lookup the type. * rlp: optimize byte array handling (#22924) This change improves the performance of encoding/decoding [N]byte. name old time/op new time/op delta DecodeByteArrayStruct-8 336ns ± 0% 246ns ± 0% -26.98% (p=0.000 n=9+10) EncodeByteArrayStruct-8 225ns ± 1% 148ns ± 1% -34.12% (p=0.000 n=10+10) name old alloc/op new alloc/op delta DecodeByteArrayStruct-8 120B ± 0% 48B ± 0% -60.00% (p=0.000 n=10+10) EncodeByteArrayStruct-8 0.00B 0.00B ~ (all equal) * rlp: optimize big.Int decoding for size <= 32 bytes (#22927) This change grows the static integer buffer in Stream to 32 bytes, making it possible to decode 256bit integers without allocating a temporary buffer. In the recent commit 088da24, Stream struct size decreased from 120 bytes down to 88 bytes. This commit grows the struct to 112 bytes again, but the size change will not degrade performance because Stream instances are internally cached in sync.Pool. name old time/op new time/op delta DecodeBigInts-8 12.2µs ± 0% 8.6µs ± 4% -29.58% (p=0.000 n=9+10) name old speed new speed delta DecodeBigInts-8 230MB/s ± 0% 326MB/s ± 4% +42.04% (p=0.000 n=9+10) * eth/protocols/eth, les: avoid Raw() when decoding HashOrNumber (#22841) Getting the raw value is not necessary to decode this type, and decoding it directly from the stream is faster. * fix testcase * debug no lazy * fix can not repair * address comments Co-authored-by: Felix Lange <fjl@twurst.com>
2021-07-29 17:16:53 +08:00
})
}
// handleImmediate executes non-call messages. It returns false if the message is a
// call or requires a reply.
func (h *handler) handleImmediate(msg *jsonrpcMessage) bool {
start := time.Now()
switch {
case msg.isNotification():
if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
h.handleSubscriptionResult(msg)
return true
}
return false
case msg.isResponse():
h.handleResponse(msg)
h.log.Trace("Handled RPC response", "reqid", idForLog{msg.ID}, "t", time.Since(start))
return true
default:
return false
}
}
// handleSubscriptionResult processes subscription notifications.
func (h *handler) handleSubscriptionResult(msg *jsonrpcMessage) {
var result subscriptionResult
if err := json.Unmarshal(msg.Params, &result); err != nil {
h.log.Debug("Dropping invalid subscription message")
return
}
if h.clientSubs[result.ID] != nil {
h.clientSubs[result.ID].deliver(result.Result)
}
}
// handleResponse processes method call responses.
func (h *handler) handleResponse(msg *jsonrpcMessage) {
op := h.respWait[string(msg.ID)]
if op == nil {
h.log.Debug("Unsolicited RPC response", "reqid", idForLog{msg.ID})
return
}
delete(h.respWait, string(msg.ID))
// For normal responses, just forward the reply to Call/BatchCall.
if op.sub == nil {
op.resp <- msg
return
}
// For subscription responses, start the subscription if the server
// indicates success. EthSubscribe gets unblocked in either case through
// the op.resp channel.
defer close(op.resp)
if msg.Error != nil {
op.err = msg.Error
return
}
if op.err = json.Unmarshal(msg.Result, &op.sub.subid); op.err == nil {
go op.sub.run()
h.clientSubs[op.sub.subid] = op.sub
}
}
// handleCallMsg executes a call message and returns the answer.
2021-01-10 20:34:56 +08:00
func (h *handler) handleCallMsg(ctx *callProc, reqCtx context.Context, msg *jsonrpcMessage) *jsonrpcMessage {
start := time.Now()
switch {
case msg.isNotification():
h.handleCall(ctx, msg)
h.log.Debug("Served "+msg.Method, "t", time.Since(start))
return nil
case msg.isCall():
resp := h.handleCall(ctx, msg)
internal/ethapi: return revert reason for eth_call (#21083) * internal/ethapi: return revert reason for eth_call * internal/ethapi: moved revert reason logic to doCall * accounts/abi/bind/backends: added revert reason logic to simulated backend * internal/ethapi: fixed linting error * internal/ethapi: check if require reason can be unpacked * internal/ethapi: better error logic * internal/ethapi: simplify logic * internal/ethapi: return vmError() * internal/ethapi: move handling of revert out of docall * graphql: removed revert logic until spec change * rpc: internal/ethapi: added custom error types * graphql: use returndata instead of return Return() checks if there is an error. If an error is found, we return nil. For most use cases it can be beneficial to return the output even if there was an error. This code should be changed anyway once the spec supports error reasons in graphql responses * accounts/abi/bind/backends: added tests for revert reason * internal/ethapi: add errorCode to revert error * internal/ethapi: add errorCode of 3 to revertError * internal/ethapi: unified estimateGasErrors, simplified logic * internal/ethapi: unified handling of errors in DoEstimateGas * rpc: print error data field * accounts/abi/bind/backends: unify simulatedBackend and RPC * internal/ethapi: added binary data to revertError data * internal/ethapi: refactored unpacking logic into newRevertError * accounts/abi/bind/backends: fix EstimateGas * accounts, console, internal, rpc: minor error interface cleanups * Revert "accounts, console, internal, rpc: minor error interface cleanups" This reverts commit 2d3ef53c5304e429a04983210a417c1f4e0dafb7. * re-apply the good parts of 2d3ef53c53 * rpc: add test for returning server error data from client Co-authored-by: rjl493456442 <garyrong0905@gmail.com> Co-authored-by: Péter Szilágyi <peterke@gmail.com> Co-authored-by: Felix Lange <fjl@twurst.com>
2020-06-08 10:09:49 +02:00
var ctx []interface{}
ctx = append(ctx, "reqid", idForLog{msg.ID}, "t", time.Since(start))
if resp.Error != nil {
2021-01-10 20:34:56 +08:00
xForward := reqCtx.Value("X-Forwarded-For")
h.log.Warn("Served "+msg.Method, "reqid", idForLog{msg.ID}, "t", time.Since(start), "err", resp.Error.Message, "X-Forwarded-For", xForward)
2021-04-16 12:45:26 +08:00
internal/ethapi: return revert reason for eth_call (#21083) * internal/ethapi: return revert reason for eth_call * internal/ethapi: moved revert reason logic to doCall * accounts/abi/bind/backends: added revert reason logic to simulated backend * internal/ethapi: fixed linting error * internal/ethapi: check if require reason can be unpacked * internal/ethapi: better error logic * internal/ethapi: simplify logic * internal/ethapi: return vmError() * internal/ethapi: move handling of revert out of docall * graphql: removed revert logic until spec change * rpc: internal/ethapi: added custom error types * graphql: use returndata instead of return Return() checks if there is an error. If an error is found, we return nil. For most use cases it can be beneficial to return the output even if there was an error. This code should be changed anyway once the spec supports error reasons in graphql responses * accounts/abi/bind/backends: added tests for revert reason * internal/ethapi: add errorCode to revert error * internal/ethapi: add errorCode of 3 to revertError * internal/ethapi: unified estimateGasErrors, simplified logic * internal/ethapi: unified handling of errors in DoEstimateGas * rpc: print error data field * accounts/abi/bind/backends: unify simulatedBackend and RPC * internal/ethapi: added binary data to revertError data * internal/ethapi: refactored unpacking logic into newRevertError * accounts/abi/bind/backends: fix EstimateGas * accounts, console, internal, rpc: minor error interface cleanups * Revert "accounts, console, internal, rpc: minor error interface cleanups" This reverts commit 2d3ef53c5304e429a04983210a417c1f4e0dafb7. * re-apply the good parts of 2d3ef53c53 * rpc: add test for returning server error data from client Co-authored-by: rjl493456442 <garyrong0905@gmail.com> Co-authored-by: Péter Szilágyi <peterke@gmail.com> Co-authored-by: Felix Lange <fjl@twurst.com>
2020-06-08 10:09:49 +02:00
ctx = append(ctx, "err", resp.Error.Message)
if resp.Error.Data != nil {
ctx = append(ctx, "errdata", resp.Error.Data)
}
h.log.Warn("Served "+msg.Method, ctx...)
} else {
internal/ethapi: return revert reason for eth_call (#21083) * internal/ethapi: return revert reason for eth_call * internal/ethapi: moved revert reason logic to doCall * accounts/abi/bind/backends: added revert reason logic to simulated backend * internal/ethapi: fixed linting error * internal/ethapi: check if require reason can be unpacked * internal/ethapi: better error logic * internal/ethapi: simplify logic * internal/ethapi: return vmError() * internal/ethapi: move handling of revert out of docall * graphql: removed revert logic until spec change * rpc: internal/ethapi: added custom error types * graphql: use returndata instead of return Return() checks if there is an error. If an error is found, we return nil. For most use cases it can be beneficial to return the output even if there was an error. This code should be changed anyway once the spec supports error reasons in graphql responses * accounts/abi/bind/backends: added tests for revert reason * internal/ethapi: add errorCode to revert error * internal/ethapi: add errorCode of 3 to revertError * internal/ethapi: unified estimateGasErrors, simplified logic * internal/ethapi: unified handling of errors in DoEstimateGas * rpc: print error data field * accounts/abi/bind/backends: unify simulatedBackend and RPC * internal/ethapi: added binary data to revertError data * internal/ethapi: refactored unpacking logic into newRevertError * accounts/abi/bind/backends: fix EstimateGas * accounts, console, internal, rpc: minor error interface cleanups * Revert "accounts, console, internal, rpc: minor error interface cleanups" This reverts commit 2d3ef53c5304e429a04983210a417c1f4e0dafb7. * re-apply the good parts of 2d3ef53c53 * rpc: add test for returning server error data from client Co-authored-by: rjl493456442 <garyrong0905@gmail.com> Co-authored-by: Péter Szilágyi <peterke@gmail.com> Co-authored-by: Felix Lange <fjl@twurst.com>
2020-06-08 10:09:49 +02:00
h.log.Debug("Served "+msg.Method, ctx...)
}
return resp
case msg.hasValidID():
return msg.errorResponse(&invalidRequestError{"invalid request"})
default:
return errorMessage(&invalidRequestError{"invalid request"})
}
}
// handleCall processes method calls.
func (h *handler) handleCall(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
if msg.isSubscribe() {
return h.handleSubscribe(cp, msg)
}
var callb *callback
if msg.isUnsubscribe() {
callb = h.unsubscribeCb
} else {
callb = h.reg.callback(msg.Method)
}
if callb == nil {
return msg.errorResponse(&methodNotFoundError{method: msg.Method})
}
args, err := parsePositionalArguments(msg.Params, callb.argTypes)
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
start := time.Now()
answer := h.runMethod(cp.ctx, msg, callb, args)
// Collect the statistics for RPC calls if metrics is enabled.
// We only care about pure rpc call. Filter out subscription.
if callb != h.unsubscribeCb {
rpcRequestGauge.Inc(1)
if answer.Error != nil {
failedReqeustGauge.Inc(1)
} else {
successfulRequestGauge.Inc(1)
}
2021-03-22 14:57:57 +08:00
RpcServingTimer.UpdateSince(start)
2021-01-10 22:01:20 +08:00
newRPCRequestGauge(msg.Method).Inc(1)
newRPCServingTimer(msg.Method, answer.Error == nil).UpdateSince(start)
}
return answer
}
// handleSubscribe processes *_subscribe method calls.
func (h *handler) handleSubscribe(cp *callProc, msg *jsonrpcMessage) *jsonrpcMessage {
if !h.allowSubscribe {
return msg.errorResponse(ErrNotificationsUnsupported)
}
// Subscription method name is first argument.
name, err := parseSubscriptionName(msg.Params)
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
namespace := msg.namespace()
callb := h.reg.subscription(namespace, name)
if callb == nil {
return msg.errorResponse(&subscriptionNotFoundError{namespace, name})
}
// Parse subscription name arg too, but remove it before calling the callback.
argTypes := append([]reflect.Type{stringType}, callb.argTypes...)
args, err := parsePositionalArguments(msg.Params, argTypes)
if err != nil {
return msg.errorResponse(&invalidParamsError{err.Error()})
}
args = args[1:]
// Install notifier in context so the subscription handler can find it.
n := &Notifier{h: h, namespace: namespace}
cp.notifiers = append(cp.notifiers, n)
ctx := context.WithValue(cp.ctx, notifierKey{}, n)
return h.runMethod(ctx, msg, callb, args)
}
// runMethod runs the Go callback for an RPC method.
func (h *handler) runMethod(ctx context.Context, msg *jsonrpcMessage, callb *callback, args []reflect.Value) *jsonrpcMessage {
result, err := callb.call(ctx, msg.Method, args)
if err != nil {
return msg.errorResponse(err)
}
return msg.response(result)
}
// unsubscribe is the callback function for all *_unsubscribe calls.
func (h *handler) unsubscribe(ctx context.Context, id ID) (bool, error) {
h.subLock.Lock()
defer h.subLock.Unlock()
s := h.serverSubs[id]
if s == nil {
return false, ErrSubscriptionNotFound
}
close(s.err)
delete(h.serverSubs, id)
return true, nil
}
type idForLog struct{ json.RawMessage }
func (id idForLog) String() string {
if s, err := strconv.Unquote(string(id.RawMessage)); err == nil {
return s
}
return string(id.RawMessage)
}