From f7328c5ecbd1076582a71ef7bf436485f3868b1f Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Tue, 29 Mar 2016 15:07:40 +0200 Subject: [PATCH] rpc: add pub/sub support --- eth/api.go | 212 +++++++++++++++++++--------- eth/backend.go | 2 +- eth/downloader/api.go | 77 ++++++++--- eth/filters/api.go | 79 +++++++++-- node/node.go | 2 +- rpc/doc.go | 28 +--- rpc/http.go | 2 +- rpc/inproc.go | 2 +- rpc/json.go | 24 ++-- rpc/notification.go | 288 +++++++++++++++++++++++++++++++++++++++ rpc/notification_test.go | 119 ++++++++++++++++ rpc/server.go | 230 +++++++++++++++---------------- rpc/server_test.go | 16 ++- rpc/types.go | 54 +------- rpc/utils.go | 26 +++- rpc/websocket.go | 3 +- 16 files changed, 849 insertions(+), 315 deletions(-) create mode 100644 rpc/notification.go create mode 100644 rpc/notification_test.go diff --git a/eth/api.go b/eth/api.go index a257639ba1..676191fc23 100644 --- a/eth/api.go +++ b/eth/api.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -457,16 +459,46 @@ func (s *PrivateAccountAPI) LockAccount(addr common.Address) bool { // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { config *core.ChainConfig - bc *core.BlockChain - chainDb ethdb.Database - eventMux *event.TypeMux - am *accounts.Manager - miner *miner.Miner + bc *core.BlockChain + chainDb ethdb.Database + eventMux *event.TypeMux + muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions + newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions + am *accounts.Manager + miner *miner.Miner } // NewPublicBlockChainAPI creates a new Etheruem blockchain API. func NewPublicBlockChainAPI(config *core.ChainConfig, bc *core.BlockChain, m *miner.Miner, chainDb ethdb.Database, eventMux *event.TypeMux, am *accounts.Manager) *PublicBlockChainAPI { - return &PublicBlockChainAPI{config: config, bc: bc, miner: m, chainDb: chainDb, eventMux: eventMux, am: am} + api := &PublicBlockChainAPI{ + config: config, + bc: bc, + miner: m, + chainDb: chainDb, + eventMux: eventMux, + am: am, + newBlockSubscriptions: make(map[string]func(core.ChainEvent) error), + } + + go api.subscriptionLoop() + + return api +} + +// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions. +func (s *PublicBlockChainAPI) subscriptionLoop() { + sub := s.eventMux.Subscribe(core.ChainEvent{}) + for event := range sub.Chan() { + if chainEvent, ok := event.Data.(core.ChainEvent); ok { + s.muNewBlockSubscriptions.Lock() + for id, notifyOf := range s.newBlockSubscriptions { + if notifyOf(chainEvent) == rpc.ErrNotificationNotFound { + delete(s.newBlockSubscriptions, id) + } + } + s.muNewBlockSubscriptions.Unlock() + } + } } // BlockNumber returns the block number of the chain head. @@ -564,20 +596,36 @@ type NewBlocksArgs struct { // NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows // the caller to specify whether the output should contain transactions and in what format. -func (s *PublicBlockChainAPI) NewBlocks(args NewBlocksArgs) (rpc.Subscription, error) { - sub := s.eventMux.Subscribe(core.ChainEvent{}) - - output := func(rawBlock interface{}) interface{} { - if event, ok := rawBlock.(core.ChainEvent); ok { - notification, err := s.rpcOutputBlock(event.Block, args.IncludeTransactions, args.TransactionDetails) - if err == nil { - return notification - } - } - return rawBlock +func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported } - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + // create a subscription that will remove itself when unsubscribed/cancelled + subscription, err := notifier.NewSubscription(func(subId string) { + s.muNewBlockSubscriptions.Lock() + delete(s.newBlockSubscriptions, subId) + s.muNewBlockSubscriptions.Unlock() + }) + + if err != nil { + return nil, err + } + + // add a callback that is called on chain events which will format the block and notify the client + s.muNewBlockSubscriptions.Lock() + s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error { + if notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails); err == nil { + return subscription.Notify(notification) + } else { + glog.V(logger.Warn).Info("unable to format block %v\n", err) + } + return nil + } + s.muNewBlockSubscriptions.Unlock() + + return subscription, nil } // GetCode returns the code stored at the given address in the state for the given block number. @@ -821,26 +869,75 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err // PublicTransactionPoolAPI exposes methods for the RPC interface type PublicTransactionPoolAPI struct { - eventMux *event.TypeMux - chainDb ethdb.Database - gpo *GasPriceOracle - bc *core.BlockChain - miner *miner.Miner - am *accounts.Manager - txPool *core.TxPool - txMu sync.Mutex + eventMux *event.TypeMux + chainDb ethdb.Database + gpo *GasPriceOracle + bc *core.BlockChain + miner *miner.Miner + am *accounts.Manager + txPool *core.TxPool + txMu sync.Mutex + muPendingTxSubs sync.Mutex + pendingTxSubs map[string]rpc.Subscription } // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. func NewPublicTransactionPoolAPI(e *Ethereum) *PublicTransactionPoolAPI { - return &PublicTransactionPoolAPI{ - eventMux: e.EventMux(), - gpo: NewGasPriceOracle(e), - chainDb: e.ChainDb(), - bc: e.BlockChain(), - am: e.AccountManager(), - txPool: e.TxPool(), - miner: e.Miner(), + api := &PublicTransactionPoolAPI{ + eventMux: e.EventMux(), + gpo: NewGasPriceOracle(e), + chainDb: e.ChainDb(), + bc: e.BlockChain(), + am: e.AccountManager(), + txPool: e.TxPool(), + miner: e.Miner(), + pendingTxSubs: make(map[string]rpc.Subscription), + } + + go api.subscriptionLoop() + + return api +} + +// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions. +func (s *PublicTransactionPoolAPI) subscriptionLoop() { + sub := s.eventMux.Subscribe(core.TxPreEvent{}) + accountTimeout := time.NewTicker(10 * time.Second) + + // only publish pending tx signed by one of the accounts in the node + accountSet := set.New() + accounts, _ := s.am.Accounts() + for _, acc := range accounts { + accountSet.Add(acc.Address) + } + + for { + select { + case event := <-sub.Chan(): + if event == nil { + continue + } + tx := event.Data.(core.TxPreEvent) + if from, err := tx.Tx.FromFrontier(); err == nil { + if accountSet.Has(from) { + s.muPendingTxSubs.Lock() + for id, sub := range s.pendingTxSubs { + if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound { + delete(s.pendingTxSubs, id) + } + } + s.muPendingTxSubs.Unlock() + } + } + case <-accountTimeout.C: + // refresh account list when accounts are added/removed from the node. + if accounts, err := s.am.Accounts(); err == nil { + accountSet.Clear() + for _, acc := range accounts { + accountSet.Add(acc.Address) + } + } + } } } @@ -1275,40 +1372,27 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, err // NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool // and is send from one of the transactions this nodes manages. -func (s *PublicTransactionPoolAPI) NewPendingTransactions() (rpc.Subscription, error) { - sub := s.eventMux.Subscribe(core.TxPreEvent{}) +func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + subscription, err := notifier.NewSubscription(func(id string) { + s.muPendingTxSubs.Lock() + delete(s.pendingTxSubs, id) + s.muPendingTxSubs.Unlock() + }) - accounts, err := s.am.Accounts() if err != nil { - return rpc.Subscription{}, err - } - accountSet := set.New() - for _, account := range accounts { - accountSet.Add(account.Address) - } - accountSetLastUpdates := time.Now() - - output := func(transaction interface{}) interface{} { - if time.Since(accountSetLastUpdates) > (time.Duration(2) * time.Second) { - if accounts, err = s.am.Accounts(); err != nil { - accountSet.Clear() - for _, account := range accounts { - accountSet.Add(account.Address) - } - accountSetLastUpdates = time.Now() - } - } - - tx := transaction.(core.TxPreEvent) - if from, err := tx.Tx.FromFrontier(); err == nil { - if accountSet.Has(from) { - return tx.Tx.Hash() - } - } - return nil + return nil, err } - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + s.muPendingTxSubs.Lock() + s.pendingTxSubs[subscription.ID()] = subscription + s.muPendingTxSubs.Unlock() + + return subscription, nil } // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the diff --git a/eth/backend.go b/eth/backend.go index 3c3440a53d..d4f72515e1 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -310,7 +310,7 @@ func (s *Ethereum) APIs() []rpc.API { }, { Namespace: "eth", Version: "1.0", - Service: downloader.NewPublicDownloaderAPI(s.Downloader()), + Service: downloader.NewPublicDownloaderAPI(s.Downloader(), s.EventMux()), Public: true, }, { Namespace: "miner", diff --git a/eth/downloader/api.go b/eth/downloader/api.go index 13d0ed46ef..576b33f1d8 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -17,18 +17,55 @@ package downloader import ( + "sync" + + "golang.org/x/net/context" + + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) // PublicDownloaderAPI provides an API which gives information about the current synchronisation status. // It offers only methods that operates on data that can be available to anyone without security risks. type PublicDownloaderAPI struct { - d *Downloader + d *Downloader + mux *event.TypeMux + muSyncSubscriptions sync.Mutex + syncSubscriptions map[string]rpc.Subscription } // NewPublicDownloaderAPI create a new PublicDownloaderAPI. -func NewPublicDownloaderAPI(d *Downloader) *PublicDownloaderAPI { - return &PublicDownloaderAPI{d} +func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI { + api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)} + + go api.run() + + return api +} + +func (api *PublicDownloaderAPI) run() { + sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) + + for event := range sub.Chan() { + var notification interface{} + + switch event.Data.(type) { + case StartEvent: + result := &SyncingResult{Syncing: true} + result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress() + notification = result + case DoneEvent, FailedEvent: + notification = false + } + + api.muSyncSubscriptions.Lock() + for id, sub := range api.syncSubscriptions { + if sub.Notify(notification) == rpc.ErrNotificationNotFound { + delete(api.syncSubscriptions, id) + } + } + api.muSyncSubscriptions.Unlock() + } } // Progress gives progress indications when the node is synchronising with the Ethereum network. @@ -47,19 +84,25 @@ type SyncingResult struct { } // Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. -func (s *PublicDownloaderAPI) Syncing() (rpc.Subscription, error) { - sub := s.d.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) - - output := func(event interface{}) interface{} { - switch event.(type) { - case StartEvent: - result := &SyncingResult{Syncing: true} - result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = s.d.Progress() - return result - case DoneEvent, FailedEvent: - return false - } - return nil +func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported } - return rpc.NewSubscriptionWithOutputFormat(sub, output), nil + + subscription, err := notifier.NewSubscription(func(id string) { + api.muSyncSubscriptions.Lock() + delete(api.syncSubscriptions, id) + api.muSyncSubscriptions.Unlock() + }) + + if err != nil { + return nil, err + } + + api.muSyncSubscriptions.Lock() + api.syncSubscriptions[subscription.ID()] = subscription + api.muSyncSubscriptions.Unlock() + + return subscription, nil } diff --git a/eth/filters/api.go b/eth/filters/api.go index e6a1ce3abf..9566603638 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -17,15 +17,13 @@ package filters import ( - "sync" - "time" - "crypto/rand" "encoding/hex" - "errors" - "encoding/json" + "errors" "fmt" + "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -33,6 +31,8 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + + "golang.org/x/net/context" ) var ( @@ -202,7 +202,7 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) { } // newLogFilter creates a new log filter. -func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) { +func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) { s.logMu.Lock() defer s.logMu.Unlock() @@ -219,17 +219,70 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo filter.SetAddresses(addresses) filter.SetTopics(topics) filter.LogCallback = func(log *vm.Log, removed bool) { - s.logMu.Lock() - defer s.logMu.Unlock() - - if queue := s.logQueue[id]; queue != nil { - queue.add(vmlog{log, removed}) + if callback != nil { + callback(log, removed) + } else { + s.logMu.Lock() + defer s.logMu.Unlock() + if queue := s.logQueue[id]; queue != nil { + queue.add(vmlog{log, removed}) + } } } return id, nil } +func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) { + notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + var ( + externalId string + subscription rpc.Subscription + err error + ) + + if externalId, err = newFilterId(); err != nil { + return nil, err + } + + // uninstall filter when subscription is unsubscribed/cancelled + if subscription, err = notifier.NewSubscription(func(string) { + s.UninstallFilter(externalId) + }); err != nil { + return nil, err + } + + notifySubscriber := func(log *vm.Log, removed bool) { + rpcLog := toRPCLogs(vm.Logs{log}, removed) + if err := subscription.Notify(rpcLog); err != nil { + subscription.Cancel() + } + } + + // from and to block number are not used since subscriptions don't allow you to travel to "time" + var id int + if len(args.Addresses) > 0 { + id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber) + } else { + id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber) + } + + if err != nil { + subscription.Cancel() + return nil, err + } + + s.filterMapMu.Lock() + s.filterMapping[externalId] = id + s.filterMapMu.Unlock() + + return subscription, err +} + // NewFilterArgs represents a request to create a new filter. type NewFilterArgs struct { FromBlock rpc.BlockNumber @@ -364,9 +417,9 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) { var id int if len(args.Addresses) > 0 { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil) } else { - id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) + id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil) } if err != nil { return "", err diff --git a/node/node.go b/node/node.go index 7d3a10874c..62cc3895b2 100644 --- a/node/node.go +++ b/node/node.go @@ -303,7 +303,7 @@ func (n *Node) startIPC(apis []rpc.API) error { glog.V(logger.Error).Infof("IPC accept failed: %v", err) continue } - go handler.ServeCodec(rpc.NewJSONCodec(conn)) + go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation | rpc.OptionSubscriptions) } }() // All listeners booted successfully diff --git a/rpc/doc.go b/rpc/doc.go index a2506ad580..c9dba3270f 100644 --- a/rpc/doc.go +++ b/rpc/doc.go @@ -68,35 +68,19 @@ The package also supports the publish subscribe pattern through the use of subsc A method that is considered eligible for notifications must satisfy the following criteria: - object must be exported - method must be exported + - first method argument type must be context.Context - method argument(s) must be exported or builtin types - method must return the tuple Subscription, error - An example method: - func (s *BlockChainService) Head() (Subscription, error) { - sub := s.bc.eventMux.Subscribe(ChainHeadEvent{}) - return v2.NewSubscription(sub), nil - } - -This method will push all raised ChainHeadEvents to subscribed clients. If the client is only -interested in every N'th block it is possible to add a criteria. - - func (s *BlockChainService) HeadFiltered(nth uint64) (Subscription, error) { - sub := s.bc.eventMux.Subscribe(ChainHeadEvent{}) - - criteria := func(event interface{}) bool { - chainHeadEvent := event.(ChainHeadEvent) - if chainHeadEvent.Block.NumberU64() % nth == 0 { - return true - } - return false - } - - return v2.NewSubscriptionFiltered(sub, criteria), nil + func (s *BlockChainService) NewBlocks(ctx context.Context) (Subscription, error) { + ... } Subscriptions are deleted when: - the user sends an unsubscribe request - - the connection which was used to create the subscription is closed + - the connection which was used to create the subscription is closed. This can be initiated + by the client and server. The server will close the connection on an write error or when + the queue of buffered notifications gets too big. */ package rpc diff --git a/rpc/http.go b/rpc/http.go index af3d29014c..dd1ec2c018 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -126,7 +126,7 @@ func newJSONHTTPHandler(srv *Server) http.HandlerFunc { // a single request. codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) defer codec.Close() - srv.ServeSingleRequest(codec) + srv.ServeSingleRequest(codec, OptionMethodInvocation) } } diff --git a/rpc/inproc.go b/rpc/inproc.go index 3cfbea71cd..250f5c7872 100644 --- a/rpc/inproc.go +++ b/rpc/inproc.go @@ -39,7 +39,7 @@ func (c *inProcClient) Close() { // RPC server. func NewInProcRPCClient(handler *Server) Client { p1, p2 := net.Pipe() - go handler.ServeCodec(NewJSONCodec(p1)) + go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions) return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)} } diff --git a/rpc/json.go b/rpc/json.go index 1ed943c004..a0bfcac049 100644 --- a/rpc/json.go +++ b/rpc/json.go @@ -22,7 +22,7 @@ import ( "io" "reflect" "strings" - "sync/atomic" + "sync" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" @@ -81,19 +81,20 @@ type jsonNotification struct { // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments // and serializing (result) objects. type jsonCodec struct { - closed chan interface{} - isClosed int32 - d *json.Decoder - e *json.Encoder - req JSONRequest - rw io.ReadWriteCloser + closed chan interface{} + closer sync.Once + d *json.Decoder + muEncoder sync.Mutex + e *json.Encoder + req JSONRequest + rw io.ReadWriteCloser } // NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0 func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec { d := json.NewDecoder(rwc) d.UseNumber() - return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc, isClosed: 0} + return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc} } // isBatch returns true when the first non-whitespace characters is '[' @@ -326,15 +327,18 @@ func (c *jsonCodec) CreateNotification(subid string, event interface{}) interfac // Write message to client func (c *jsonCodec) Write(res interface{}) error { + c.muEncoder.Lock() + defer c.muEncoder.Unlock() + return c.e.Encode(res) } // Close the underlying connection func (c *jsonCodec) Close() { - if atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) { + c.closer.Do(func() { close(c.closed) c.rw.Close() - } + }) } // Closed returns a channel which will be closed when Close is called diff --git a/rpc/notification.go b/rpc/notification.go new file mode 100644 index 0000000000..146d785c90 --- /dev/null +++ b/rpc/notification.go @@ -0,0 +1,288 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rpc + +import ( + "errors" + "sync" + "time" + + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" +) + +var ( + // ErrNotificationsUnsupported is returned when the connection doesn't support notifications + ErrNotificationsUnsupported = errors.New("notifications not supported") + + // ErrNotificationNotFound is returned when the notification for the given id is not found + ErrNotificationNotFound = errors.New("notification not found") + + // errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed) + errNotifierStopped = errors.New("unable to send notification") + + // errNotificationQueueFull is returns when there are too many notifications in the queue + errNotificationQueueFull = errors.New("too many pending notifications") +) + +// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered +// notifications that might be pending in the internal queue. +var unsubSignal = new(struct{}) + +// UnsubscribeCallback defines a callback that is called when a subcription ends. +// It receives the subscription id as argument. +type UnsubscribeCallback func(id string) + +// notification is a helper object that holds event data for a subscription +type notification struct { + sub *bufferedSubscription // subscription id + data interface{} // event data +} + +// A Notifier type describes the interface for objects that can send create subscriptions +type Notifier interface { + // Create a new subscription. The given callback is called when this subscription + // is cancelled (e.g. client send an unsubscribe, connection closed). + NewSubscription(UnsubscribeCallback) (Subscription, error) + // Cancel subscription + Unsubscribe(id string) error +} + +// Subscription defines the interface for objects that can notify subscribers +type Subscription interface { + // Inform client of an event + Notify(data interface{}) error + // Unique identifier + ID() string + // Cancel subscription + Cancel() error +} + +// bufferedSubscription is a subscription that uses a bufferedNotifier to send +// notifications to subscribers. +type bufferedSubscription struct { + id string + unsubOnce sync.Once // call unsub method once + unsub UnsubscribeCallback // called on Unsubscribed + notifier *bufferedNotifier // forward notifications to + pending chan interface{} // closed when active + flushed chan interface{} // closed when all buffered notifications are send + lastNotification time.Time // last time a notification was send +} + +// ID returns the subscription identifier that the client uses to refer to this instance. +func (s *bufferedSubscription) ID() string { + return s.id +} + +// Cancel informs the notifier that this subscription is cancelled by the API +func (s *bufferedSubscription) Cancel() error { + return s.notifier.Unsubscribe(s.id) +} + +// Notify the subscriber of a particular event. +func (s *bufferedSubscription) Notify(data interface{}) error { + return s.notifier.send(s.id, data) +} + +// bufferedNotifier is a notifier that queues notifications in an internal queue and +// send them as fast as possible to the client from this queue. It will stop if the +// queue grows past a given size. +type bufferedNotifier struct { + codec ServerCodec // underlying connection + mu sync.Mutex // guard internal state + subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec + queueSize int // max number of items in queue + queue chan *notification // notification queue + stopped bool // indication if this notifier is ordered to stop +} + +// newBufferedNotifier returns a notifier that queues notifications in an internal queue +// from which notifications are send as fast as possible to the client. If the queue size +// limit is reached (client is unable to keep up) it will stop and closes the codec. +func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier { + notifier := &bufferedNotifier{ + codec: codec, + subscriptions: make(map[string]*bufferedSubscription), + queue: make(chan *notification, size), + queueSize: size, + } + + go notifier.run() + + return notifier +} + +// NewSubscription creates a new subscription that forwards events to this instance internal +// queue. The given callback is called when the subscription is unsubscribed/cancelled. +func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) { + id, err := newSubscriptionID() + if err != nil { + return nil, err + } + + n.mu.Lock() + defer n.mu.Unlock() + + if n.stopped { + return nil, errNotifierStopped + } + + sub := &bufferedSubscription{ + id: id, + unsub: callback, + notifier: n, + pending: make(chan interface{}), + flushed: make(chan interface{}), + lastNotification: time.Now(), + } + + n.subscriptions[id] = sub + + return sub, nil +} + +// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned. +func (n *bufferedNotifier) Unsubscribe(subid string) error { + n.mu.Lock() + sub, found := n.subscriptions[subid] + n.mu.Unlock() + + if found { + // send the unsubscribe signal, this will cause the notifier not to accept new events + // for this subscription and will close the flushed channel after the last (buffered) + // notification was send to the client. + if err := n.send(subid, unsubSignal); err != nil { + return err + } + + // wait for confirmation that all (buffered) events are send for this subscription. + // this ensures that the unsubscribe method response is not send before all buffered + // events for this subscription are send. + <-sub.flushed + + return nil + } + + return ErrNotificationNotFound +} + +// Send enques the given data for the subscription with public ID on the internal queue. t returns +// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it +// will remove the subscription with the given id from the subscription collection. +func (n *bufferedNotifier) send(id string, data interface{}) error { + n.mu.Lock() + defer n.mu.Unlock() + + if n.stopped { + return errNotifierStopped + } + + var ( + subscription *bufferedSubscription + found bool + ) + + // check if subscription is associated with this connection, it might be cancelled + // (subscribe/connection closed) + if subscription, found = n.subscriptions[id]; !found { + glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id) + return ErrNotificationNotFound + } + + // received the unsubscribe signal. Add it to the queue to make sure any pending notifications + // for this subscription are send. When the run loop receives this singal it will signal that + // all pending subscriptions are flushed and that the confirmation of the unsubscribe can be + // send to the user. Remove the subscriptions to make sure new notifications are not accepted. + if data == unsubSignal { + delete(n.subscriptions, id) + if subscription.unsub != nil { + subscription.unsubOnce.Do(func() { subscription.unsub(id) }) + } + } + + subscription.lastNotification = time.Now() + + if len(n.queue) >= n.queueSize { + glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection") + n.codec.Close() + return errNotificationQueueFull + } + + n.queue <- ¬ification{subscription, data} + return nil +} + +// run reads notifications from the internal queue and sends them to the client. In case of an +// error, or when the codec is closed it will cancel all active subscriptions and returns. +func (n *bufferedNotifier) run() { + defer func() { + n.mu.Lock() + defer n.mu.Unlock() + + n.stopped = true + close(n.queue) + + // on exit call unsubscribe callback + for id, sub := range n.subscriptions { + if sub.unsub != nil { + sub.unsubOnce.Do(func() { sub.unsub(id) }) + } + close(sub.flushed) + delete(n.subscriptions, id) + } + }() + + for { + select { + case notification := <-n.queue: + // It can happen that an event is raised before the RPC server was able to send the sub + // id to the client. Therefore subscriptions are marked as pending until the sub id was + // send. The RPC server will activate the subscription by closing the pending chan. + <-notification.sub.pending + + if notification.data == unsubSignal { + // unsubSignal is the last accepted message for this subscription. Raise the signal + // that all buffered notifications are sent by closing the flushed channel. This + // indicates that the response for the unsubscribe can be send to the client. + close(notification.sub.flushed) + } else { + msg := n.codec.CreateNotification(notification.sub.id, notification.data) + if err := n.codec.Write(msg); err != nil { + n.codec.Close() + // unable to send notification to client, unsubscribe all subscriptions + glog.V(logger.Warn).Infof("unable to send notification - %v\n", err) + return + } + } + case <-n.codec.Closed(): // connection was closed + glog.V(logger.Debug).Infoln("codec closed, stop subscriptions") + return + } + } +} + +// Marks the subscription as active. This will causes the notifications for this subscription to be +// forwarded to the client. +func (n *bufferedNotifier) activate(subid string) { + n.mu.Lock() + defer n.mu.Unlock() + + if sub, found := n.subscriptions[subid]; found { + close(sub.pending) + } +} diff --git a/rpc/notification_test.go b/rpc/notification_test.go new file mode 100644 index 0000000000..8d2add81c9 --- /dev/null +++ b/rpc/notification_test.go @@ -0,0 +1,119 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . +package rpc + +import ( + "encoding/json" + "net" + "testing" + "time" + + "golang.org/x/net/context" +) + +type NotificationTestService struct{} + +var ( + unsubCallbackCalled = false +) + +func (s *NotificationTestService) Unsubscribe(subid string) { + unsubCallbackCalled = true +} + +func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) { + notifier, supported := ctx.Value(NotifierContextKey).(Notifier) + if !supported { + return nil, ErrNotificationsUnsupported + } + + // by explicitly creating an subscription we make sure that the subscription id is send back to the client + // before the first subscription.Notify is called. Otherwise the events might be send before the response + // for the eth_subscribe method. + subscription, err := notifier.NewSubscription(s.Unsubscribe) + if err != nil { + return nil, err + } + + go func() { + for i := 0; i < n; i++ { + if err := subscription.Notify(val + i); err != nil { + return + } + } + }() + + return subscription, nil +} + +func TestNotifications(t *testing.T) { + server := NewServer() + service := &NotificationTestService{} + + if err := server.RegisterName("eth", service); err != nil { + t.Fatalf("unable to register test service %v", err) + } + + clientConn, serverConn := net.Pipe() + + go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions) + + out := json.NewEncoder(clientConn) + in := json.NewDecoder(clientConn) + + n := 5 + val := 12345 + request := map[string]interface{}{ + "id": 1, + "method": "eth_subscribe", + "version": "2.0", + "params": []interface{}{"someSubscription", n, val}, + } + + // create subscription + if err := out.Encode(request); err != nil { + t.Fatal(err) + } + + var subid string + response := JSONSuccessResponse{Result: subid} + if err := in.Decode(&response); err != nil { + t.Fatal(err) + } + + var ok bool + if subid, ok = response.Result.(string); !ok { + t.Fatalf("expected subscription id, got %T", response.Result) + } + + for i := 0; i < n; i++ { + var notification jsonNotification + if err := in.Decode(¬ification); err != nil { + t.Fatalf("%v", err) + } + + if int(notification.Params.Result.(float64)) != val+i { + t.Fatalf("expected %d, got %d", val+i, notification.Params.Result) + } + } + + clientConn.Close() // causes notification unsubscribe callback to be called + time.Sleep(1 * time.Second) + + if !unsubCallbackCalled { + t.Error("unsubscribe callback not called after closing connection") + } +} diff --git a/rpc/server.go b/rpc/server.go index 22448f8e34..cf90eba02a 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -23,7 +23,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "golang.org/x/net/context" @@ -33,10 +32,26 @@ import ( const ( stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped + // NotifierContextKey is the key where the notifier associated with the codec is stored in the context + NotifierContextKey = 1 + + notificationBufferSize = 10000 // max buffered notifications before codec is closed + DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" DefaultHTTPApis = "eth,net,web3" ) +// CodecOption specifies which type of messages this codec supports +type CodecOption int + +const ( + // OptionMethodInvocation is an indication that the codec supports RPC method calls + OptionMethodInvocation CodecOption = 1 << iota + + // OptionSubscriptions is an indication that the codec suports RPC notifications + OptionSubscriptions = 1 << iota // support pub sub +) + // NewServer will create a new server instance with no registered handlers. func NewServer() *Server { server := &Server{ @@ -63,7 +78,7 @@ type RPCService struct { // Modules returns the list of RPC services with their version number func (s *RPCService) Modules() map[string]string { modules := make(map[string]string) - for name, _ := range s.server.services { + for name := range s.server.services { modules[name] = "1.0" } return modules @@ -92,7 +107,7 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { if regsvc, present := s.services[name]; present { methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) if len(methods) == 0 && len(subscriptions) == 0 { - return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose") + return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } for _, m := range methods { @@ -109,7 +124,7 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ) if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { - return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose") + return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) } s.services[svc.name] = svc @@ -117,12 +132,23 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error { return nil } +// hasOption returns true if option is included in options, otherwise false +func hasOption(option CodecOption, options []CodecOption) bool { + for _, o := range options { + if option == o { + return true + } + } + return false +} + // serveRequest will reads requests from the codec, calls the RPC callback and // writes the response to the given codec. +// // If singleShot is true it will process a single request, otherwise it will handle // requests until the codec returns an error when reading a request (in most cases // an EOF). It executes requests in parallel when singleShot is false. -func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { +func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error { defer func() { if err := recover(); err != nil { const size = 64 << 10 @@ -141,6 +167,12 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // if the codec supports notification include a notifier that callbacks can use + // to send notification to clients. It is thight to the codec/connection. If the + // connection is closed the notifier will stop and cancels all active subscriptions. + if options&OptionSubscriptions == OptionSubscriptions { + ctx = context.WithValue(ctx, NotifierContextKey, newBufferedNotifier(codec, notificationBufferSize)) + } s.codecsMu.Lock() if atomic.LoadInt32(&s.run) != 1 { // server stopped s.codecsMu.Unlock() @@ -193,20 +225,16 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the // response back using the given codec. It will block until the codec is closed or the server is // stopped. In either case the codec is closed. -// -// This server will: -// 1. allow for asynchronous and parallel request execution -// 2. supports notifications (pub/sub) -// 3. supports request batches -func (s *Server) ServeCodec(codec ServerCodec) { +func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) { defer codec.Close() - s.serveRequest(codec, false) + s.serveRequest(codec, false, options) } // ServeSingleRequest reads and processes a single RPC request from the given codec. It will not -// close the codec unless a non-recoverable error has occurred. -func (s *Server) ServeSingleRequest(codec ServerCodec) { - s.serveRequest(codec, true) +// close the codec unless a non-recoverable error has occurred. Note, this method will return after +// a single request has been processed! +func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) { + s.serveRequest(codec, true, options) } // Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish, @@ -225,122 +253,64 @@ func (s *Server) Stop() { } } -// sendNotification will create a notification from the given event by serializing member fields of the event. -// It will then send the notification to the client, when it fails the codec is closed. When the event has multiple -// fields an array of values is returned. -func sendNotification(codec ServerCodec, subid string, event interface{}) { - notification := codec.CreateNotification(subid, event) - - if err := codec.Write(notification); err != nil { - codec.Close() - } -} - -// createSubscription will register a new subscription and waits for raised events. When an event is raised it will: -// 1. test if the event is raised matches the criteria the user has (optionally) specified -// 2. create a notification of the event and send it the client when it matches the criteria -// It will unsubscribe the subscription when the socket is closed or the subscription is unsubscribed by the user. -func (s *Server) createSubscription(c ServerCodec, req *serverRequest) (string, error) { - args := []reflect.Value{req.callb.rcvr} - if len(req.args) > 0 { - args = append(args, req.args...) - } - - subid, err := newSubscriptionId() - if err != nil { - return "", err - } - +// createSubscription will call the subscription callback and returns the subscription id or error. +func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) { + // subscription have as first argument the context following optional arguments + args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)} + args = append(args, req.args...) reply := req.callb.method.Func.Call(args) - if reply[1].IsNil() { // no error - if subscription, ok := reply[0].Interface().(Subscription); ok { - s.muSubcriptions.Lock() - s.subscriptions[subid] = subscription - s.muSubcriptions.Unlock() - go func() { - cases := []reflect.SelectCase{ - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(subscription.Chan())}, // new event - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.Closed())}, // connection closed - } - - for { - idx, notification, recvOk := reflect.Select(cases) - switch idx { - case 0: // new event, or channel closed - if recvOk { // send notification - if event, ok := notification.Interface().(*event.Event); ok { - if subscription.match == nil || subscription.match(event.Data) { - sendNotification(c, subid, subscription.format(event.Data)) - } - } - } else { // user send an eth_unsubscribe request - return - } - case 1: // connection closed - s.unsubscribe(subid) - return - } - } - }() - } else { // unable to create subscription - s.muSubcriptions.Lock() - delete(s.subscriptions, subid) - s.muSubcriptions.Unlock() - } - } else { - return "", fmt.Errorf("Unable to create subscription") + if !reply[1].IsNil() { // subscription creation failed + return "", reply[1].Interface().(error) } - return subid, nil -} - -// unsubscribe calls the Unsubscribe method on the subscription and removes a subscription from the subscription -// registry. -func (s *Server) unsubscribe(subid string) bool { - s.muSubcriptions.Lock() - defer s.muSubcriptions.Unlock() - if sub, ok := s.subscriptions[subid]; ok { - sub.Unsubscribe() - delete(s.subscriptions, subid) - return true - } - return false + return reply[0].Interface().(Subscription).ID(), nil } // handle executes a request and returns the response from the callback. -func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) interface{} { +func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) { if req.err != nil { - return codec.CreateErrorResponse(&req.id, req.err) + return codec.CreateErrorResponse(&req.id, req.err), nil } - if req.isUnsubscribe { // first param must be the subscription id + if req.isUnsubscribe { // cancel subscription, first param must be the subscription id if len(req.args) >= 1 && req.args[0].Kind() == reflect.String { - subid := req.args[0].String() - if s.unsubscribe(subid) { - return codec.CreateResponse(req.id, true) - } else { - return codec.CreateErrorResponse(&req.id, - &callbackError{fmt.Sprintf("subscription '%s' not found", subid)}) + notifier, supported := ctx.Value(NotifierContextKey).(*bufferedNotifier) + if !supported { // interface doesn't support subscriptions (e.g. http) + return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil } + + subid := req.args[0].String() + if err := notifier.Unsubscribe(subid); err != nil { + return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil + } + + return codec.CreateResponse(req.id, true), nil } - return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as argument"}) + return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil } if req.callb.isSubscribe { - subid, err := s.createSubscription(codec, req) + subid, err := s.createSubscription(ctx, codec, req) if err != nil { - return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}) + return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil } - return codec.CreateResponse(req.id, subid) + + // active the subscription after the sub id was successful sent to the client + activateSub := func() { + notifier, _ := ctx.Value(NotifierContextKey).(*bufferedNotifier) + notifier.activate(subid) + } + + return codec.CreateResponse(req.id, subid), activateSub } - // regular RPC call + // regular RPC call, prepare arguments if len(req.args) != len(req.callb.argTypes) { rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d", req.svcname, serviceMethodSeparator, req.callb.method.Name, len(req.callb.argTypes), len(req.args))} - return codec.CreateErrorResponse(&req.id, rpcErr) + return codec.CreateErrorResponse(&req.id, rpcErr), nil } arguments := []reflect.Value{req.callb.rcvr} @@ -351,45 +321,56 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque arguments = append(arguments, req.args...) } + // execute RPC method and return result reply := req.callb.method.Func.Call(arguments) - if len(reply) == 0 { - return codec.CreateResponse(req.id, nil) + return codec.CreateResponse(req.id, nil), nil } if req.callb.errPos >= 0 { // test if method returned an error if !reply[req.callb.errPos].IsNil() { e := reply[req.callb.errPos].Interface().(error) res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()}) - return res + return res, nil } } - return codec.CreateResponse(req.id, reply[0].Interface()) + return codec.CreateResponse(req.id, reply[0].Interface()), nil } // exec executes the given request and writes the result back using the codec. func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) { var response interface{} + var callback func() if req.err != nil { response = codec.CreateErrorResponse(&req.id, req.err) } else { - response = s.handle(ctx, codec, req) + response, callback = s.handle(ctx, codec, req) } + if err := codec.Write(response); err != nil { glog.V(logger.Error).Infof("%v\n", err) codec.Close() } + + // when request was a subscribe request this allows these subscriptions to be actived + if callback != nil { + callback() + } } -// execBatch executes the given requests and writes the result back using the codec. It will only write the response -// back when the last request is processed. +// execBatch executes the given requests and writes the result back using the codec. +// It will only write the response back when the last request is processed. func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) { responses := make([]interface{}, len(requests)) + var callbacks []func() for i, req := range requests { if req.err != nil { responses[i] = codec.CreateErrorResponse(&req.id, req.err) } else { - responses[i] = s.handle(ctx, codec, req) + var callback func() + if responses[i], callback = s.handle(ctx, codec, req); callback != nil { + callbacks = append(callbacks, callback) + } } } @@ -397,11 +378,16 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s glog.V(logger.Error).Infof("%v\n", err) codec.Close() } + + // when request holds one of more subscribe requests this allows these subscriptions to be actived + for _, c := range callbacks { + c() + } } -// readRequest requests the next (batch) request from the codec. It will return the collection of requests, an -// indication if the request was a batch, the invalid request identifier and an error when the request could not be -// read/parsed. +// readRequest requests the next (batch) request from the codec. It will return the collection +// of requests, an indication if the request was a batch, the invalid request identifier and an +// error when the request could not be read/parsed. func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) { reqs, batch, err := codec.ReadRequestHeaders() if err != nil { @@ -417,7 +403,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro if r.isPubSub && r.method == unsubscribeMethod { requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} - argTypes := []reflect.Type{reflect.TypeOf("")} + argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { requests[i].args = args } else { @@ -426,12 +412,12 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro continue } - if svc, ok = s.services[r.service]; !ok { + if svc, ok = s.services[r.service]; !ok { // rpc method isn't available requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} continue } - if r.isPubSub { // eth_subscribe + if r.isPubSub { // eth_subscribe, r.method contains the subscription method name if callb, ok := svc.subscriptions[r.method]; ok { requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} if r.params != nil && len(callb.argTypes) > 0 { @@ -449,7 +435,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro continue } - if callb, ok := svc.callbacks[r.method]; ok { + if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} if r.params != nil && len(callb.argTypes) > 0 { if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil { diff --git a/rpc/server_test.go b/rpc/server_test.go index 5b91fe42ac..c60db38df2 100644 --- a/rpc/server_test.go +++ b/rpc/server_test.go @@ -65,8 +65,12 @@ func (s *Service) InvalidRets3() (string, string, error) { return "", "", nil } -func (s *Service) Subscription() (Subscription, error) { - return NewSubscription(nil), nil +func (s *Service) Subscription(ctx context.Context) (Subscription, error) { + return nil, nil +} + +func (s *Service) SubsriptionWithArgs(ctx context.Context, a, b int) (Subscription, error) { + return nil, nil } func TestServerRegisterName(t *testing.T) { @@ -90,8 +94,8 @@ func TestServerRegisterName(t *testing.T) { t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks)) } - if len(svc.subscriptions) != 1 { - t.Errorf("Expected 1 subscription for service 'calc', got %d", len(svc.subscriptions)) + if len(svc.subscriptions) != 2 { + t.Errorf("Expected 2 subscriptions for service 'calc', got %d", len(svc.subscriptions)) } } @@ -229,7 +233,7 @@ func TestServerMethodExecution(t *testing.T) { input, _ := json.Marshal(&req) codec := &ServerTestCodec{input: input, closer: make(chan interface{})} - go server.ServeCodec(codec) + go server.ServeCodec(codec, OptionMethodInvocation) <-codec.closer @@ -259,7 +263,7 @@ func TestServerMethodWithCtx(t *testing.T) { input, _ := json.Marshal(&req) codec := &ServerTestCodec{input: input, closer: make(chan interface{})} - go server.ServeCodec(codec) + go server.ServeCodec(codec, OptionMethodInvocation) <-codec.closer diff --git a/rpc/types.go b/rpc/types.go index f268d84db2..596fdf2649 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -24,7 +24,6 @@ import ( "strings" "sync" - "github.com/ethereum/go-ethereum/event" "gopkg.in/fatih/set.v0" ) @@ -66,10 +65,10 @@ type serverRequest struct { err RPCError } -type serviceRegistry map[string]*service // collection of services -type callbacks map[string]*callback // collection of RPC callbacks -type subscriptions map[string]*callback // collection of subscription callbacks -type subscriptionRegistry map[string]Subscription // collection of subscriptions +type serviceRegistry map[string]*service // collection of services +type callbacks map[string]*callback // collection of RPC callbacks +type subscriptions map[string]*callback // collection of subscription callbacks +type subscriptionRegistry map[string]*callback // collection of subscription callbacks // Server represents a RPC server type Server struct { @@ -123,51 +122,6 @@ type ServerCodec interface { Closed() <-chan interface{} } -// SubscriptionMatcher returns true if the given value matches the criteria specified by the user -type SubscriptionMatcher func(interface{}) bool - -// SubscriptionOutputFormat accepts event data and has the ability to format the data before it is send to the client -type SubscriptionOutputFormat func(interface{}) interface{} - -// defaultSubscriptionOutputFormatter returns data and is used as default output format for notifications -func defaultSubscriptionOutputFormatter(data interface{}) interface{} { - return data -} - -// Subscription is used by the server to send notifications to the client -type Subscription struct { - sub event.Subscription - match SubscriptionMatcher - format SubscriptionOutputFormat -} - -// NewSubscription create a new RPC subscription -func NewSubscription(sub event.Subscription) Subscription { - return Subscription{sub, nil, defaultSubscriptionOutputFormatter} -} - -// NewSubscriptionWithOutputFormat create a new RPC subscription which a custom notification output format -func NewSubscriptionWithOutputFormat(sub event.Subscription, formatter SubscriptionOutputFormat) Subscription { - return Subscription{sub, nil, formatter} -} - -// NewSubscriptionFiltered will create a new subscription. For each raised event the given matcher is -// called. If it returns true the event is send as notification to the client, otherwise it is ignored. -func NewSubscriptionFiltered(sub event.Subscription, match SubscriptionMatcher) Subscription { - return Subscription{sub, match, defaultSubscriptionOutputFormatter} -} - -// Chan returns the channel where new events will be published. It's up the user to call the matcher to -// determine if the events are interesting for the client. -func (s *Subscription) Chan() <-chan *event.Event { - return s.sub.Chan() -} - -// Unsubscribe will end the subscription and closes the event channel -func (s *Subscription) Unsubscribe() { - s.sub.Unsubscribe() -} - // HexNumber serializes a number to hex format using the "%#x" format type HexNumber big.Int diff --git a/rpc/utils.go b/rpc/utils.go index fa114284dd..d43c504957 100644 --- a/rpc/utils.go +++ b/rpc/utils.go @@ -45,6 +45,16 @@ func isExportedOrBuiltinType(t reflect.Type) bool { return isExported(t.Name()) || t.PkgPath() == "" } +var contextType = reflect.TypeOf((*context.Context)(nil)).Elem() + +// isContextType returns an indication if the given t is of context.Context or *context.Context type +func isContextType(t reflect.Type) bool { + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t == contextType +} + var errorType = reflect.TypeOf((*error)(nil)).Elem() // Implements this type the error interface @@ -57,6 +67,7 @@ func isErrorType(t reflect.Type) bool { var subscriptionType = reflect.TypeOf((*Subscription)(nil)).Elem() +// isSubscriptionType returns an indication if the given t is of Subscription or *Subscription type func isSubscriptionType(t reflect.Type) bool { for t.Kind() == reflect.Ptr { t = t.Elem() @@ -64,12 +75,17 @@ func isSubscriptionType(t reflect.Type) bool { return t == subscriptionType } -// isPubSub tests whether the given method return the pair (v2.Subscription, error) +// isPubSub tests whether the given method has as as first argument a context.Context +// and returns the pair (Subscription, error) func isPubSub(methodType reflect.Type) bool { - if methodType.NumOut() != 2 { + // numIn(0) is the receiver type + if methodType.NumIn() < 2 || methodType.NumOut() != 2 { return false } - return isSubscriptionType(methodType.Out(0)) && isErrorType(methodType.Out(1)) + + return isContextType(methodType.In(1)) && + isSubscriptionType(methodType.Out(0)) && + isErrorType(methodType.Out(1)) } // formatName will convert to first character to lower case @@ -110,8 +126,6 @@ func isBlockNumber(t reflect.Type) bool { return t == blockNumberType } -var contextType = reflect.TypeOf(new(context.Context)).Elem() - // suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria // for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server // documentation for a summary of these criteria. @@ -205,7 +219,7 @@ METHODS: return callbacks, subscriptions } -func newSubscriptionId() (string, error) { +func newSubscriptionID() (string, error) { var subid [16]byte n, _ := rand.Read(subid[:]) if n != 16 { diff --git a/rpc/websocket.go b/rpc/websocket.go index 92615494ea..499eedabe6 100644 --- a/rpc/websocket.go +++ b/rpc/websocket.go @@ -93,7 +93,8 @@ func NewWSServer(cors string, handler *Server) *http.Server { Handler: websocket.Server{ Handshake: wsHandshakeValidator(strings.Split(cors, ",")), Handler: func(conn *websocket.Conn) { - handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn})) + handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}), + OptionMethodInvocation|OptionSubscriptions) }, }, }