Merge pull request #2463 from fjl/rpc-context-key
rpc: remove NotifierContextKey
This commit is contained in:
commit
a6ca8fd268
@ -603,7 +603,7 @@ type NewBlocksArgs struct {
|
|||||||
// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows
|
// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows
|
||||||
// the caller to specify whether the output should contain transactions and in what format.
|
// the caller to specify whether the output should contain transactions and in what format.
|
||||||
func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) {
|
func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) {
|
||||||
notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
if !supported {
|
if !supported {
|
||||||
return nil, rpc.ErrNotificationsUnsupported
|
return nil, rpc.ErrNotificationsUnsupported
|
||||||
}
|
}
|
||||||
@ -1345,7 +1345,7 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() []*RPCTransaction {
|
|||||||
// NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool
|
// NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool
|
||||||
// and is send from one of the transactions this nodes manages.
|
// and is send from one of the transactions this nodes manages.
|
||||||
func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) {
|
func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) {
|
||||||
notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
if !supported {
|
if !supported {
|
||||||
return nil, rpc.ErrNotificationsUnsupported
|
return nil, rpc.ErrNotificationsUnsupported
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ type SyncingResult struct {
|
|||||||
|
|
||||||
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
|
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
|
||||||
func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
|
func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
|
||||||
notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
if !supported {
|
if !supported {
|
||||||
return nil, rpc.ErrNotificationsUnsupported
|
return nil, rpc.ErrNotificationsUnsupported
|
||||||
}
|
}
|
||||||
|
@ -234,7 +234,7 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
|
func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
|
||||||
notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
if !supported {
|
if !supported {
|
||||||
return nil, rpc.ErrNotificationsUnsupported
|
return nil, rpc.ErrNotificationsUnsupported
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
|
|
||||||
"github.com/ethereum/go-ethereum/logger"
|
"github.com/ethereum/go-ethereum/logger"
|
||||||
"github.com/ethereum/go-ethereum/logger/glog"
|
"github.com/ethereum/go-ethereum/logger/glog"
|
||||||
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -62,6 +63,14 @@ type Notifier interface {
|
|||||||
Unsubscribe(id string) error
|
Unsubscribe(id string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type notifierKey struct{}
|
||||||
|
|
||||||
|
// NotifierFromContext returns the Notifier value stored in ctx, if any.
|
||||||
|
func NotifierFromContext(ctx context.Context) (Notifier, bool) {
|
||||||
|
n, ok := ctx.Value(notifierKey{}).(Notifier)
|
||||||
|
return n, ok
|
||||||
|
}
|
||||||
|
|
||||||
// Subscription defines the interface for objects that can notify subscribers
|
// Subscription defines the interface for objects that can notify subscribers
|
||||||
type Subscription interface {
|
type Subscription interface {
|
||||||
// Inform client of an event
|
// Inform client of an event
|
||||||
|
@ -36,7 +36,7 @@ func (s *NotificationTestService) Unsubscribe(subid string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
|
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
|
||||||
notifier, supported := ctx.Value(NotifierContextKey).(Notifier)
|
notifier, supported := NotifierFromContext(ctx)
|
||||||
if !supported {
|
if !supported {
|
||||||
return nil, ErrNotificationsUnsupported
|
return nil, ErrNotificationsUnsupported
|
||||||
}
|
}
|
||||||
|
@ -32,9 +32,6 @@ import (
|
|||||||
const (
|
const (
|
||||||
stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
|
stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
|
||||||
|
|
||||||
// NotifierContextKey is the key where the notifier associated with the codec is stored in the context
|
|
||||||
NotifierContextKey = 1
|
|
||||||
|
|
||||||
notificationBufferSize = 10000 // max buffered notifications before codec is closed
|
notificationBufferSize = 10000 // max buffered notifications before codec is closed
|
||||||
|
|
||||||
DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3"
|
DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3"
|
||||||
@ -171,7 +168,7 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecO
|
|||||||
// to send notification to clients. It is thight to the codec/connection. If the
|
// to send notification to clients. It is thight to the codec/connection. If the
|
||||||
// connection is closed the notifier will stop and cancels all active subscriptions.
|
// connection is closed the notifier will stop and cancels all active subscriptions.
|
||||||
if options&OptionSubscriptions == OptionSubscriptions {
|
if options&OptionSubscriptions == OptionSubscriptions {
|
||||||
ctx = context.WithValue(ctx, NotifierContextKey, newBufferedNotifier(codec, notificationBufferSize))
|
ctx = context.WithValue(ctx, notifierKey{}, newBufferedNotifier(codec, notificationBufferSize))
|
||||||
}
|
}
|
||||||
s.codecsMu.Lock()
|
s.codecsMu.Lock()
|
||||||
if atomic.LoadInt32(&s.run) != 1 { // server stopped
|
if atomic.LoadInt32(&s.run) != 1 { // server stopped
|
||||||
@ -275,7 +272,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
|
|||||||
|
|
||||||
if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
|
if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
|
||||||
if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
|
if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
|
||||||
notifier, supported := ctx.Value(NotifierContextKey).(*bufferedNotifier)
|
notifier, supported := NotifierFromContext(ctx)
|
||||||
if !supported { // interface doesn't support subscriptions (e.g. http)
|
if !supported { // interface doesn't support subscriptions (e.g. http)
|
||||||
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
|
return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
|
||||||
}
|
}
|
||||||
@ -298,8 +295,8 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
|
|||||||
|
|
||||||
// active the subscription after the sub id was successful sent to the client
|
// active the subscription after the sub id was successful sent to the client
|
||||||
activateSub := func() {
|
activateSub := func() {
|
||||||
notifier, _ := ctx.Value(NotifierContextKey).(*bufferedNotifier)
|
notifier, _ := NotifierFromContext(ctx)
|
||||||
notifier.activate(subid)
|
notifier.(*bufferedNotifier).activate(subid)
|
||||||
}
|
}
|
||||||
|
|
||||||
return codec.CreateResponse(req.id, subid), activateSub
|
return codec.CreateResponse(req.id, subid), activateSub
|
||||||
|
Loading…
Reference in New Issue
Block a user