rpc: add pub/sub support

This commit is contained in:
Bas van Kervel 2016-03-29 15:07:40 +02:00
parent fb578f4550
commit f7328c5ecb
16 changed files with 849 additions and 315 deletions

@ -28,6 +28,8 @@ import (
"sync" "sync"
"time" "time"
"golang.org/x/net/context"
"github.com/ethereum/ethash" "github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -457,16 +459,46 @@ func (s *PrivateAccountAPI) LockAccount(addr common.Address) bool {
// It offers only methods that operate on public data that is freely available to anyone. // It offers only methods that operate on public data that is freely available to anyone.
type PublicBlockChainAPI struct { type PublicBlockChainAPI struct {
config *core.ChainConfig config *core.ChainConfig
bc *core.BlockChain bc *core.BlockChain
chainDb ethdb.Database chainDb ethdb.Database
eventMux *event.TypeMux eventMux *event.TypeMux
am *accounts.Manager muNewBlockSubscriptions sync.Mutex // protects newBlocksSubscriptions
miner *miner.Miner newBlockSubscriptions map[string]func(core.ChainEvent) error // callbacks for new block subscriptions
am *accounts.Manager
miner *miner.Miner
} }
// NewPublicBlockChainAPI creates a new Etheruem blockchain API. // NewPublicBlockChainAPI creates a new Etheruem blockchain API.
func NewPublicBlockChainAPI(config *core.ChainConfig, bc *core.BlockChain, m *miner.Miner, chainDb ethdb.Database, eventMux *event.TypeMux, am *accounts.Manager) *PublicBlockChainAPI { func NewPublicBlockChainAPI(config *core.ChainConfig, bc *core.BlockChain, m *miner.Miner, chainDb ethdb.Database, eventMux *event.TypeMux, am *accounts.Manager) *PublicBlockChainAPI {
return &PublicBlockChainAPI{config: config, bc: bc, miner: m, chainDb: chainDb, eventMux: eventMux, am: am} api := &PublicBlockChainAPI{
config: config,
bc: bc,
miner: m,
chainDb: chainDb,
eventMux: eventMux,
am: am,
newBlockSubscriptions: make(map[string]func(core.ChainEvent) error),
}
go api.subscriptionLoop()
return api
}
// subscriptionLoop reads events from the global event mux and creates notifications for the matched subscriptions.
func (s *PublicBlockChainAPI) subscriptionLoop() {
sub := s.eventMux.Subscribe(core.ChainEvent{})
for event := range sub.Chan() {
if chainEvent, ok := event.Data.(core.ChainEvent); ok {
s.muNewBlockSubscriptions.Lock()
for id, notifyOf := range s.newBlockSubscriptions {
if notifyOf(chainEvent) == rpc.ErrNotificationNotFound {
delete(s.newBlockSubscriptions, id)
}
}
s.muNewBlockSubscriptions.Unlock()
}
}
} }
// BlockNumber returns the block number of the chain head. // BlockNumber returns the block number of the chain head.
@ -564,20 +596,36 @@ type NewBlocksArgs struct {
// NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows // NewBlocks triggers a new block event each time a block is appended to the chain. It accepts an argument which allows
// the caller to specify whether the output should contain transactions and in what format. // the caller to specify whether the output should contain transactions and in what format.
func (s *PublicBlockChainAPI) NewBlocks(args NewBlocksArgs) (rpc.Subscription, error) { func (s *PublicBlockChainAPI) NewBlocks(ctx context.Context, args NewBlocksArgs) (rpc.Subscription, error) {
sub := s.eventMux.Subscribe(core.ChainEvent{}) notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
if !supported {
output := func(rawBlock interface{}) interface{} { return nil, rpc.ErrNotificationsUnsupported
if event, ok := rawBlock.(core.ChainEvent); ok {
notification, err := s.rpcOutputBlock(event.Block, args.IncludeTransactions, args.TransactionDetails)
if err == nil {
return notification
}
}
return rawBlock
} }
return rpc.NewSubscriptionWithOutputFormat(sub, output), nil // create a subscription that will remove itself when unsubscribed/cancelled
subscription, err := notifier.NewSubscription(func(subId string) {
s.muNewBlockSubscriptions.Lock()
delete(s.newBlockSubscriptions, subId)
s.muNewBlockSubscriptions.Unlock()
})
if err != nil {
return nil, err
}
// add a callback that is called on chain events which will format the block and notify the client
s.muNewBlockSubscriptions.Lock()
s.newBlockSubscriptions[subscription.ID()] = func(e core.ChainEvent) error {
if notification, err := s.rpcOutputBlock(e.Block, args.IncludeTransactions, args.TransactionDetails); err == nil {
return subscription.Notify(notification)
} else {
glog.V(logger.Warn).Info("unable to format block %v\n", err)
}
return nil
}
s.muNewBlockSubscriptions.Unlock()
return subscription, nil
} }
// GetCode returns the code stored at the given address in the state for the given block number. // GetCode returns the code stored at the given address in the state for the given block number.
@ -821,26 +869,75 @@ func newRPCTransaction(b *types.Block, txHash common.Hash) (*RPCTransaction, err
// PublicTransactionPoolAPI exposes methods for the RPC interface // PublicTransactionPoolAPI exposes methods for the RPC interface
type PublicTransactionPoolAPI struct { type PublicTransactionPoolAPI struct {
eventMux *event.TypeMux eventMux *event.TypeMux
chainDb ethdb.Database chainDb ethdb.Database
gpo *GasPriceOracle gpo *GasPriceOracle
bc *core.BlockChain bc *core.BlockChain
miner *miner.Miner miner *miner.Miner
am *accounts.Manager am *accounts.Manager
txPool *core.TxPool txPool *core.TxPool
txMu sync.Mutex txMu sync.Mutex
muPendingTxSubs sync.Mutex
pendingTxSubs map[string]rpc.Subscription
} }
// NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool. // NewPublicTransactionPoolAPI creates a new RPC service with methods specific for the transaction pool.
func NewPublicTransactionPoolAPI(e *Ethereum) *PublicTransactionPoolAPI { func NewPublicTransactionPoolAPI(e *Ethereum) *PublicTransactionPoolAPI {
return &PublicTransactionPoolAPI{ api := &PublicTransactionPoolAPI{
eventMux: e.EventMux(), eventMux: e.EventMux(),
gpo: NewGasPriceOracle(e), gpo: NewGasPriceOracle(e),
chainDb: e.ChainDb(), chainDb: e.ChainDb(),
bc: e.BlockChain(), bc: e.BlockChain(),
am: e.AccountManager(), am: e.AccountManager(),
txPool: e.TxPool(), txPool: e.TxPool(),
miner: e.Miner(), miner: e.Miner(),
pendingTxSubs: make(map[string]rpc.Subscription),
}
go api.subscriptionLoop()
return api
}
// subscriptionLoop listens for events on the global event mux and creates notifications for subscriptions.
func (s *PublicTransactionPoolAPI) subscriptionLoop() {
sub := s.eventMux.Subscribe(core.TxPreEvent{})
accountTimeout := time.NewTicker(10 * time.Second)
// only publish pending tx signed by one of the accounts in the node
accountSet := set.New()
accounts, _ := s.am.Accounts()
for _, acc := range accounts {
accountSet.Add(acc.Address)
}
for {
select {
case event := <-sub.Chan():
if event == nil {
continue
}
tx := event.Data.(core.TxPreEvent)
if from, err := tx.Tx.FromFrontier(); err == nil {
if accountSet.Has(from) {
s.muPendingTxSubs.Lock()
for id, sub := range s.pendingTxSubs {
if sub.Notify(tx.Tx.Hash()) == rpc.ErrNotificationNotFound {
delete(s.pendingTxSubs, id)
}
}
s.muPendingTxSubs.Unlock()
}
}
case <-accountTimeout.C:
// refresh account list when accounts are added/removed from the node.
if accounts, err := s.am.Accounts(); err == nil {
accountSet.Clear()
for _, acc := range accounts {
accountSet.Add(acc.Address)
}
}
}
} }
} }
@ -1275,40 +1372,27 @@ func (s *PublicTransactionPoolAPI) PendingTransactions() ([]*RPCTransaction, err
// NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool // NewPendingTransaction creates a subscription that is triggered each time a transaction enters the transaction pool
// and is send from one of the transactions this nodes manages. // and is send from one of the transactions this nodes manages.
func (s *PublicTransactionPoolAPI) NewPendingTransactions() (rpc.Subscription, error) { func (s *PublicTransactionPoolAPI) NewPendingTransactions(ctx context.Context) (rpc.Subscription, error) {
sub := s.eventMux.Subscribe(core.TxPreEvent{}) notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
subscription, err := notifier.NewSubscription(func(id string) {
s.muPendingTxSubs.Lock()
delete(s.pendingTxSubs, id)
s.muPendingTxSubs.Unlock()
})
accounts, err := s.am.Accounts()
if err != nil { if err != nil {
return rpc.Subscription{}, err return nil, err
}
accountSet := set.New()
for _, account := range accounts {
accountSet.Add(account.Address)
}
accountSetLastUpdates := time.Now()
output := func(transaction interface{}) interface{} {
if time.Since(accountSetLastUpdates) > (time.Duration(2) * time.Second) {
if accounts, err = s.am.Accounts(); err != nil {
accountSet.Clear()
for _, account := range accounts {
accountSet.Add(account.Address)
}
accountSetLastUpdates = time.Now()
}
}
tx := transaction.(core.TxPreEvent)
if from, err := tx.Tx.FromFrontier(); err == nil {
if accountSet.Has(from) {
return tx.Tx.Hash()
}
}
return nil
} }
return rpc.NewSubscriptionWithOutputFormat(sub, output), nil s.muPendingTxSubs.Lock()
s.pendingTxSubs[subscription.ID()] = subscription
s.muPendingTxSubs.Unlock()
return subscription, nil
} }
// Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the // Resend accepts an existing transaction and a new gas price and limit. It will remove the given transaction from the

@ -310,7 +310,7 @@ func (s *Ethereum) APIs() []rpc.API {
}, { }, {
Namespace: "eth", Namespace: "eth",
Version: "1.0", Version: "1.0",
Service: downloader.NewPublicDownloaderAPI(s.Downloader()), Service: downloader.NewPublicDownloaderAPI(s.Downloader(), s.EventMux()),
Public: true, Public: true,
}, { }, {
Namespace: "miner", Namespace: "miner",

@ -17,18 +17,55 @@
package downloader package downloader
import ( import (
"sync"
"golang.org/x/net/context"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
// PublicDownloaderAPI provides an API which gives information about the current synchronisation status. // PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks. // It offers only methods that operates on data that can be available to anyone without security risks.
type PublicDownloaderAPI struct { type PublicDownloaderAPI struct {
d *Downloader d *Downloader
mux *event.TypeMux
muSyncSubscriptions sync.Mutex
syncSubscriptions map[string]rpc.Subscription
} }
// NewPublicDownloaderAPI create a new PublicDownloaderAPI. // NewPublicDownloaderAPI create a new PublicDownloaderAPI.
func NewPublicDownloaderAPI(d *Downloader) *PublicDownloaderAPI { func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
return &PublicDownloaderAPI{d} api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)}
go api.run()
return api
}
func (api *PublicDownloaderAPI) run() {
sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
for event := range sub.Chan() {
var notification interface{}
switch event.Data.(type) {
case StartEvent:
result := &SyncingResult{Syncing: true}
result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
notification = result
case DoneEvent, FailedEvent:
notification = false
}
api.muSyncSubscriptions.Lock()
for id, sub := range api.syncSubscriptions {
if sub.Notify(notification) == rpc.ErrNotificationNotFound {
delete(api.syncSubscriptions, id)
}
}
api.muSyncSubscriptions.Unlock()
}
} }
// Progress gives progress indications when the node is synchronising with the Ethereum network. // Progress gives progress indications when the node is synchronising with the Ethereum network.
@ -47,19 +84,25 @@ type SyncingResult struct {
} }
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished. // Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
func (s *PublicDownloaderAPI) Syncing() (rpc.Subscription, error) { func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
sub := s.d.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{}) notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
if !supported {
output := func(event interface{}) interface{} { return nil, rpc.ErrNotificationsUnsupported
switch event.(type) {
case StartEvent:
result := &SyncingResult{Syncing: true}
result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = s.d.Progress()
return result
case DoneEvent, FailedEvent:
return false
}
return nil
} }
return rpc.NewSubscriptionWithOutputFormat(sub, output), nil
subscription, err := notifier.NewSubscription(func(id string) {
api.muSyncSubscriptions.Lock()
delete(api.syncSubscriptions, id)
api.muSyncSubscriptions.Unlock()
})
if err != nil {
return nil, err
}
api.muSyncSubscriptions.Lock()
api.syncSubscriptions[subscription.ID()] = subscription
api.muSyncSubscriptions.Unlock()
return subscription, nil
} }

@ -17,15 +17,13 @@
package filters package filters
import ( import (
"sync"
"time"
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"errors"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -33,6 +31,8 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
) )
var ( var (
@ -202,7 +202,7 @@ func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
} }
// newLogFilter creates a new log filter. // newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) { func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
s.logMu.Lock() s.logMu.Lock()
defer s.logMu.Unlock() defer s.logMu.Unlock()
@ -219,17 +219,70 @@ func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []commo
filter.SetAddresses(addresses) filter.SetAddresses(addresses)
filter.SetTopics(topics) filter.SetTopics(topics)
filter.LogCallback = func(log *vm.Log, removed bool) { filter.LogCallback = func(log *vm.Log, removed bool) {
s.logMu.Lock() if callback != nil {
defer s.logMu.Unlock() callback(log, removed)
} else {
if queue := s.logQueue[id]; queue != nil { s.logMu.Lock()
queue.add(vmlog{log, removed}) defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil {
queue.add(vmlog{log, removed})
}
} }
} }
return id, nil return id, nil
} }
func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
var (
externalId string
subscription rpc.Subscription
err error
)
if externalId, err = newFilterId(); err != nil {
return nil, err
}
// uninstall filter when subscription is unsubscribed/cancelled
if subscription, err = notifier.NewSubscription(func(string) {
s.UninstallFilter(externalId)
}); err != nil {
return nil, err
}
notifySubscriber := func(log *vm.Log, removed bool) {
rpcLog := toRPCLogs(vm.Logs{log}, removed)
if err := subscription.Notify(rpcLog); err != nil {
subscription.Cancel()
}
}
// from and to block number are not used since subscriptions don't allow you to travel to "time"
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
} else {
id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
}
if err != nil {
subscription.Cancel()
return nil, err
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return subscription, err
}
// NewFilterArgs represents a request to create a new filter. // NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct { type NewFilterArgs struct {
FromBlock rpc.BlockNumber FromBlock rpc.BlockNumber
@ -364,9 +417,9 @@ func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
var id int var id int
if len(args.Addresses) > 0 { if len(args.Addresses) > 0 {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics) id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
} else { } else {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics) id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
} }
if err != nil { if err != nil {
return "", err return "", err

@ -303,7 +303,7 @@ func (n *Node) startIPC(apis []rpc.API) error {
glog.V(logger.Error).Infof("IPC accept failed: %v", err) glog.V(logger.Error).Infof("IPC accept failed: %v", err)
continue continue
} }
go handler.ServeCodec(rpc.NewJSONCodec(conn)) go handler.ServeCodec(rpc.NewJSONCodec(conn), rpc.OptionMethodInvocation | rpc.OptionSubscriptions)
} }
}() }()
// All listeners booted successfully // All listeners booted successfully

@ -68,35 +68,19 @@ The package also supports the publish subscribe pattern through the use of subsc
A method that is considered eligible for notifications must satisfy the following criteria: A method that is considered eligible for notifications must satisfy the following criteria:
- object must be exported - object must be exported
- method must be exported - method must be exported
- first method argument type must be context.Context
- method argument(s) must be exported or builtin types - method argument(s) must be exported or builtin types
- method must return the tuple Subscription, error - method must return the tuple Subscription, error
An example method: An example method:
func (s *BlockChainService) Head() (Subscription, error) { func (s *BlockChainService) NewBlocks(ctx context.Context) (Subscription, error) {
sub := s.bc.eventMux.Subscribe(ChainHeadEvent{}) ...
return v2.NewSubscription(sub), nil
}
This method will push all raised ChainHeadEvents to subscribed clients. If the client is only
interested in every N'th block it is possible to add a criteria.
func (s *BlockChainService) HeadFiltered(nth uint64) (Subscription, error) {
sub := s.bc.eventMux.Subscribe(ChainHeadEvent{})
criteria := func(event interface{}) bool {
chainHeadEvent := event.(ChainHeadEvent)
if chainHeadEvent.Block.NumberU64() % nth == 0 {
return true
}
return false
}
return v2.NewSubscriptionFiltered(sub, criteria), nil
} }
Subscriptions are deleted when: Subscriptions are deleted when:
- the user sends an unsubscribe request - the user sends an unsubscribe request
- the connection which was used to create the subscription is closed - the connection which was used to create the subscription is closed. This can be initiated
by the client and server. The server will close the connection on an write error or when
the queue of buffered notifications gets too big.
*/ */
package rpc package rpc

@ -126,7 +126,7 @@ func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
// a single request. // a single request.
codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w}) codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
defer codec.Close() defer codec.Close()
srv.ServeSingleRequest(codec) srv.ServeSingleRequest(codec, OptionMethodInvocation)
} }
} }

@ -39,7 +39,7 @@ func (c *inProcClient) Close() {
// RPC server. // RPC server.
func NewInProcRPCClient(handler *Server) Client { func NewInProcRPCClient(handler *Server) Client {
p1, p2 := net.Pipe() p1, p2 := net.Pipe()
go handler.ServeCodec(NewJSONCodec(p1)) go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)} return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
} }

@ -22,7 +22,7 @@ import (
"io" "io"
"reflect" "reflect"
"strings" "strings"
"sync/atomic" "sync"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
@ -81,19 +81,20 @@ type jsonNotification struct {
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments // jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments
// and serializing (result) objects. // and serializing (result) objects.
type jsonCodec struct { type jsonCodec struct {
closed chan interface{} closed chan interface{}
isClosed int32 closer sync.Once
d *json.Decoder d *json.Decoder
e *json.Encoder muEncoder sync.Mutex
req JSONRequest e *json.Encoder
rw io.ReadWriteCloser req JSONRequest
rw io.ReadWriteCloser
} }
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0 // NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec { func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
d := json.NewDecoder(rwc) d := json.NewDecoder(rwc)
d.UseNumber() d.UseNumber()
return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc, isClosed: 0} return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc}
} }
// isBatch returns true when the first non-whitespace characters is '[' // isBatch returns true when the first non-whitespace characters is '['
@ -326,15 +327,18 @@ func (c *jsonCodec) CreateNotification(subid string, event interface{}) interfac
// Write message to client // Write message to client
func (c *jsonCodec) Write(res interface{}) error { func (c *jsonCodec) Write(res interface{}) error {
c.muEncoder.Lock()
defer c.muEncoder.Unlock()
return c.e.Encode(res) return c.e.Encode(res)
} }
// Close the underlying connection // Close the underlying connection
func (c *jsonCodec) Close() { func (c *jsonCodec) Close() {
if atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) { c.closer.Do(func() {
close(c.closed) close(c.closed)
c.rw.Close() c.rw.Close()
} })
} }
// Closed returns a channel which will be closed when Close is called // Closed returns a channel which will be closed when Close is called

288
rpc/notification.go Normal file

@ -0,0 +1,288 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
var (
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
ErrNotificationsUnsupported = errors.New("notifications not supported")
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrNotificationNotFound = errors.New("notification not found")
// errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed)
errNotifierStopped = errors.New("unable to send notification")
// errNotificationQueueFull is returns when there are too many notifications in the queue
errNotificationQueueFull = errors.New("too many pending notifications")
)
// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered
// notifications that might be pending in the internal queue.
var unsubSignal = new(struct{})
// UnsubscribeCallback defines a callback that is called when a subcription ends.
// It receives the subscription id as argument.
type UnsubscribeCallback func(id string)
// notification is a helper object that holds event data for a subscription
type notification struct {
sub *bufferedSubscription // subscription id
data interface{} // event data
}
// A Notifier type describes the interface for objects that can send create subscriptions
type Notifier interface {
// Create a new subscription. The given callback is called when this subscription
// is cancelled (e.g. client send an unsubscribe, connection closed).
NewSubscription(UnsubscribeCallback) (Subscription, error)
// Cancel subscription
Unsubscribe(id string) error
}
// Subscription defines the interface for objects that can notify subscribers
type Subscription interface {
// Inform client of an event
Notify(data interface{}) error
// Unique identifier
ID() string
// Cancel subscription
Cancel() error
}
// bufferedSubscription is a subscription that uses a bufferedNotifier to send
// notifications to subscribers.
type bufferedSubscription struct {
id string
unsubOnce sync.Once // call unsub method once
unsub UnsubscribeCallback // called on Unsubscribed
notifier *bufferedNotifier // forward notifications to
pending chan interface{} // closed when active
flushed chan interface{} // closed when all buffered notifications are send
lastNotification time.Time // last time a notification was send
}
// ID returns the subscription identifier that the client uses to refer to this instance.
func (s *bufferedSubscription) ID() string {
return s.id
}
// Cancel informs the notifier that this subscription is cancelled by the API
func (s *bufferedSubscription) Cancel() error {
return s.notifier.Unsubscribe(s.id)
}
// Notify the subscriber of a particular event.
func (s *bufferedSubscription) Notify(data interface{}) error {
return s.notifier.send(s.id, data)
}
// bufferedNotifier is a notifier that queues notifications in an internal queue and
// send them as fast as possible to the client from this queue. It will stop if the
// queue grows past a given size.
type bufferedNotifier struct {
codec ServerCodec // underlying connection
mu sync.Mutex // guard internal state
subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec
queueSize int // max number of items in queue
queue chan *notification // notification queue
stopped bool // indication if this notifier is ordered to stop
}
// newBufferedNotifier returns a notifier that queues notifications in an internal queue
// from which notifications are send as fast as possible to the client. If the queue size
// limit is reached (client is unable to keep up) it will stop and closes the codec.
func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier {
notifier := &bufferedNotifier{
codec: codec,
subscriptions: make(map[string]*bufferedSubscription),
queue: make(chan *notification, size),
queueSize: size,
}
go notifier.run()
return notifier
}
// NewSubscription creates a new subscription that forwards events to this instance internal
// queue. The given callback is called when the subscription is unsubscribed/cancelled.
func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) {
id, err := newSubscriptionID()
if err != nil {
return nil, err
}
n.mu.Lock()
defer n.mu.Unlock()
if n.stopped {
return nil, errNotifierStopped
}
sub := &bufferedSubscription{
id: id,
unsub: callback,
notifier: n,
pending: make(chan interface{}),
flushed: make(chan interface{}),
lastNotification: time.Now(),
}
n.subscriptions[id] = sub
return sub, nil
}
// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned.
func (n *bufferedNotifier) Unsubscribe(subid string) error {
n.mu.Lock()
sub, found := n.subscriptions[subid]
n.mu.Unlock()
if found {
// send the unsubscribe signal, this will cause the notifier not to accept new events
// for this subscription and will close the flushed channel after the last (buffered)
// notification was send to the client.
if err := n.send(subid, unsubSignal); err != nil {
return err
}
// wait for confirmation that all (buffered) events are send for this subscription.
// this ensures that the unsubscribe method response is not send before all buffered
// events for this subscription are send.
<-sub.flushed
return nil
}
return ErrNotificationNotFound
}
// Send enques the given data for the subscription with public ID on the internal queue. t returns
// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it
// will remove the subscription with the given id from the subscription collection.
func (n *bufferedNotifier) send(id string, data interface{}) error {
n.mu.Lock()
defer n.mu.Unlock()
if n.stopped {
return errNotifierStopped
}
var (
subscription *bufferedSubscription
found bool
)
// check if subscription is associated with this connection, it might be cancelled
// (subscribe/connection closed)
if subscription, found = n.subscriptions[id]; !found {
glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id)
return ErrNotificationNotFound
}
// received the unsubscribe signal. Add it to the queue to make sure any pending notifications
// for this subscription are send. When the run loop receives this singal it will signal that
// all pending subscriptions are flushed and that the confirmation of the unsubscribe can be
// send to the user. Remove the subscriptions to make sure new notifications are not accepted.
if data == unsubSignal {
delete(n.subscriptions, id)
if subscription.unsub != nil {
subscription.unsubOnce.Do(func() { subscription.unsub(id) })
}
}
subscription.lastNotification = time.Now()
if len(n.queue) >= n.queueSize {
glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection")
n.codec.Close()
return errNotificationQueueFull
}
n.queue <- &notification{subscription, data}
return nil
}
// run reads notifications from the internal queue and sends them to the client. In case of an
// error, or when the codec is closed it will cancel all active subscriptions and returns.
func (n *bufferedNotifier) run() {
defer func() {
n.mu.Lock()
defer n.mu.Unlock()
n.stopped = true
close(n.queue)
// on exit call unsubscribe callback
for id, sub := range n.subscriptions {
if sub.unsub != nil {
sub.unsubOnce.Do(func() { sub.unsub(id) })
}
close(sub.flushed)
delete(n.subscriptions, id)
}
}()
for {
select {
case notification := <-n.queue:
// It can happen that an event is raised before the RPC server was able to send the sub
// id to the client. Therefore subscriptions are marked as pending until the sub id was
// send. The RPC server will activate the subscription by closing the pending chan.
<-notification.sub.pending
if notification.data == unsubSignal {
// unsubSignal is the last accepted message for this subscription. Raise the signal
// that all buffered notifications are sent by closing the flushed channel. This
// indicates that the response for the unsubscribe can be send to the client.
close(notification.sub.flushed)
} else {
msg := n.codec.CreateNotification(notification.sub.id, notification.data)
if err := n.codec.Write(msg); err != nil {
n.codec.Close()
// unable to send notification to client, unsubscribe all subscriptions
glog.V(logger.Warn).Infof("unable to send notification - %v\n", err)
return
}
}
case <-n.codec.Closed(): // connection was closed
glog.V(logger.Debug).Infoln("codec closed, stop subscriptions")
return
}
}
}
// Marks the subscription as active. This will causes the notifications for this subscription to be
// forwarded to the client.
func (n *bufferedNotifier) activate(subid string) {
n.mu.Lock()
defer n.mu.Unlock()
if sub, found := n.subscriptions[subid]; found {
close(sub.pending)
}
}

119
rpc/notification_test.go Normal file

@ -0,0 +1,119 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"encoding/json"
"net"
"testing"
"time"
"golang.org/x/net/context"
)
type NotificationTestService struct{}
var (
unsubCallbackCalled = false
)
func (s *NotificationTestService) Unsubscribe(subid string) {
unsubCallbackCalled = true
}
func (s *NotificationTestService) SomeSubscription(ctx context.Context, n, val int) (Subscription, error) {
notifier, supported := ctx.Value(NotifierContextKey).(Notifier)
if !supported {
return nil, ErrNotificationsUnsupported
}
// by explicitly creating an subscription we make sure that the subscription id is send back to the client
// before the first subscription.Notify is called. Otherwise the events might be send before the response
// for the eth_subscribe method.
subscription, err := notifier.NewSubscription(s.Unsubscribe)
if err != nil {
return nil, err
}
go func() {
for i := 0; i < n; i++ {
if err := subscription.Notify(val + i); err != nil {
return
}
}
}()
return subscription, nil
}
func TestNotifications(t *testing.T) {
server := NewServer()
service := &NotificationTestService{}
if err := server.RegisterName("eth", service); err != nil {
t.Fatalf("unable to register test service %v", err)
}
clientConn, serverConn := net.Pipe()
go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
out := json.NewEncoder(clientConn)
in := json.NewDecoder(clientConn)
n := 5
val := 12345
request := map[string]interface{}{
"id": 1,
"method": "eth_subscribe",
"version": "2.0",
"params": []interface{}{"someSubscription", n, val},
}
// create subscription
if err := out.Encode(request); err != nil {
t.Fatal(err)
}
var subid string
response := JSONSuccessResponse{Result: subid}
if err := in.Decode(&response); err != nil {
t.Fatal(err)
}
var ok bool
if subid, ok = response.Result.(string); !ok {
t.Fatalf("expected subscription id, got %T", response.Result)
}
for i := 0; i < n; i++ {
var notification jsonNotification
if err := in.Decode(&notification); err != nil {
t.Fatalf("%v", err)
}
if int(notification.Params.Result.(float64)) != val+i {
t.Fatalf("expected %d, got %d", val+i, notification.Params.Result)
}
}
clientConn.Close() // causes notification unsubscribe callback to be called
time.Sleep(1 * time.Second)
if !unsubCallbackCalled {
t.Error("unsubscribe callback not called after closing connection")
}
}

@ -23,7 +23,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -33,10 +32,26 @@ import (
const ( const (
stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped stopPendingRequestTimeout = 3 * time.Second // give pending requests stopPendingRequestTimeout the time to finish when the server is stopped
// NotifierContextKey is the key where the notifier associated with the codec is stored in the context
NotifierContextKey = 1
notificationBufferSize = 10000 // max buffered notifications before codec is closed
DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3" DefaultIPCApis = "admin,eth,debug,miner,net,shh,txpool,personal,web3"
DefaultHTTPApis = "eth,net,web3" DefaultHTTPApis = "eth,net,web3"
) )
// CodecOption specifies which type of messages this codec supports
type CodecOption int
const (
// OptionMethodInvocation is an indication that the codec supports RPC method calls
OptionMethodInvocation CodecOption = 1 << iota
// OptionSubscriptions is an indication that the codec suports RPC notifications
OptionSubscriptions = 1 << iota // support pub sub
)
// NewServer will create a new server instance with no registered handlers. // NewServer will create a new server instance with no registered handlers.
func NewServer() *Server { func NewServer() *Server {
server := &Server{ server := &Server{
@ -63,7 +78,7 @@ type RPCService struct {
// Modules returns the list of RPC services with their version number // Modules returns the list of RPC services with their version number
func (s *RPCService) Modules() map[string]string { func (s *RPCService) Modules() map[string]string {
modules := make(map[string]string) modules := make(map[string]string)
for name, _ := range s.server.services { for name := range s.server.services {
modules[name] = "1.0" modules[name] = "1.0"
} }
return modules return modules
@ -92,7 +107,7 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
if regsvc, present := s.services[name]; present { if regsvc, present := s.services[name]; present {
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ) methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
if len(methods) == 0 && len(subscriptions) == 0 { if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose") return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
} }
for _, m := range methods { for _, m := range methods {
@ -109,7 +124,7 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ) svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ)
if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose") return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
} }
s.services[svc.name] = svc s.services[svc.name] = svc
@ -117,12 +132,23 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return nil return nil
} }
// hasOption returns true if option is included in options, otherwise false
func hasOption(option CodecOption, options []CodecOption) bool {
for _, o := range options {
if option == o {
return true
}
}
return false
}
// serveRequest will reads requests from the codec, calls the RPC callback and // serveRequest will reads requests from the codec, calls the RPC callback and
// writes the response to the given codec. // writes the response to the given codec.
//
// If singleShot is true it will process a single request, otherwise it will handle // If singleShot is true it will process a single request, otherwise it will handle
// requests until the codec returns an error when reading a request (in most cases // requests until the codec returns an error when reading a request (in most cases
// an EOF). It executes requests in parallel when singleShot is false. // an EOF). It executes requests in parallel when singleShot is false.
func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error { func (s *Server) serveRequest(codec ServerCodec, singleShot bool, options CodecOption) error {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
const size = 64 << 10 const size = 64 << 10
@ -141,6 +167,12 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
// if the codec supports notification include a notifier that callbacks can use
// to send notification to clients. It is thight to the codec/connection. If the
// connection is closed the notifier will stop and cancels all active subscriptions.
if options&OptionSubscriptions == OptionSubscriptions {
ctx = context.WithValue(ctx, NotifierContextKey, newBufferedNotifier(codec, notificationBufferSize))
}
s.codecsMu.Lock() s.codecsMu.Lock()
if atomic.LoadInt32(&s.run) != 1 { // server stopped if atomic.LoadInt32(&s.run) != 1 { // server stopped
s.codecsMu.Unlock() s.codecsMu.Unlock()
@ -193,20 +225,16 @@ func (s *Server) serveRequest(codec ServerCodec, singleShot bool) error {
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the // ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
// response back using the given codec. It will block until the codec is closed or the server is // response back using the given codec. It will block until the codec is closed or the server is
// stopped. In either case the codec is closed. // stopped. In either case the codec is closed.
// func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
// This server will:
// 1. allow for asynchronous and parallel request execution
// 2. supports notifications (pub/sub)
// 3. supports request batches
func (s *Server) ServeCodec(codec ServerCodec) {
defer codec.Close() defer codec.Close()
s.serveRequest(codec, false) s.serveRequest(codec, false, options)
} }
// ServeSingleRequest reads and processes a single RPC request from the given codec. It will not // ServeSingleRequest reads and processes a single RPC request from the given codec. It will not
// close the codec unless a non-recoverable error has occurred. // close the codec unless a non-recoverable error has occurred. Note, this method will return after
func (s *Server) ServeSingleRequest(codec ServerCodec) { // a single request has been processed!
s.serveRequest(codec, true) func (s *Server) ServeSingleRequest(codec ServerCodec, options CodecOption) {
s.serveRequest(codec, true, options)
} }
// Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish, // Stop will stop reading new requests, wait for stopPendingRequestTimeout to allow pending requests to finish,
@ -225,122 +253,64 @@ func (s *Server) Stop() {
} }
} }
// sendNotification will create a notification from the given event by serializing member fields of the event. // createSubscription will call the subscription callback and returns the subscription id or error.
// It will then send the notification to the client, when it fails the codec is closed. When the event has multiple func (s *Server) createSubscription(ctx context.Context, c ServerCodec, req *serverRequest) (string, error) {
// fields an array of values is returned. // subscription have as first argument the context following optional arguments
func sendNotification(codec ServerCodec, subid string, event interface{}) { args := []reflect.Value{req.callb.rcvr, reflect.ValueOf(ctx)}
notification := codec.CreateNotification(subid, event) args = append(args, req.args...)
if err := codec.Write(notification); err != nil {
codec.Close()
}
}
// createSubscription will register a new subscription and waits for raised events. When an event is raised it will:
// 1. test if the event is raised matches the criteria the user has (optionally) specified
// 2. create a notification of the event and send it the client when it matches the criteria
// It will unsubscribe the subscription when the socket is closed or the subscription is unsubscribed by the user.
func (s *Server) createSubscription(c ServerCodec, req *serverRequest) (string, error) {
args := []reflect.Value{req.callb.rcvr}
if len(req.args) > 0 {
args = append(args, req.args...)
}
subid, err := newSubscriptionId()
if err != nil {
return "", err
}
reply := req.callb.method.Func.Call(args) reply := req.callb.method.Func.Call(args)
if reply[1].IsNil() { // no error if !reply[1].IsNil() { // subscription creation failed
if subscription, ok := reply[0].Interface().(Subscription); ok { return "", reply[1].Interface().(error)
s.muSubcriptions.Lock()
s.subscriptions[subid] = subscription
s.muSubcriptions.Unlock()
go func() {
cases := []reflect.SelectCase{
reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(subscription.Chan())}, // new event
reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.Closed())}, // connection closed
}
for {
idx, notification, recvOk := reflect.Select(cases)
switch idx {
case 0: // new event, or channel closed
if recvOk { // send notification
if event, ok := notification.Interface().(*event.Event); ok {
if subscription.match == nil || subscription.match(event.Data) {
sendNotification(c, subid, subscription.format(event.Data))
}
}
} else { // user send an eth_unsubscribe request
return
}
case 1: // connection closed
s.unsubscribe(subid)
return
}
}
}()
} else { // unable to create subscription
s.muSubcriptions.Lock()
delete(s.subscriptions, subid)
s.muSubcriptions.Unlock()
}
} else {
return "", fmt.Errorf("Unable to create subscription")
} }
return subid, nil return reply[0].Interface().(Subscription).ID(), nil
}
// unsubscribe calls the Unsubscribe method on the subscription and removes a subscription from the subscription
// registry.
func (s *Server) unsubscribe(subid string) bool {
s.muSubcriptions.Lock()
defer s.muSubcriptions.Unlock()
if sub, ok := s.subscriptions[subid]; ok {
sub.Unsubscribe()
delete(s.subscriptions, subid)
return true
}
return false
} }
// handle executes a request and returns the response from the callback. // handle executes a request and returns the response from the callback.
func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) interface{} { func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverRequest) (interface{}, func()) {
if req.err != nil { if req.err != nil {
return codec.CreateErrorResponse(&req.id, req.err) return codec.CreateErrorResponse(&req.id, req.err), nil
} }
if req.isUnsubscribe { // first param must be the subscription id if req.isUnsubscribe { // cancel subscription, first param must be the subscription id
if len(req.args) >= 1 && req.args[0].Kind() == reflect.String { if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
subid := req.args[0].String() notifier, supported := ctx.Value(NotifierContextKey).(*bufferedNotifier)
if s.unsubscribe(subid) { if !supported { // interface doesn't support subscriptions (e.g. http)
return codec.CreateResponse(req.id, true) return codec.CreateErrorResponse(&req.id, &callbackError{ErrNotificationsUnsupported.Error()}), nil
} else {
return codec.CreateErrorResponse(&req.id,
&callbackError{fmt.Sprintf("subscription '%s' not found", subid)})
} }
subid := req.args[0].String()
if err := notifier.Unsubscribe(subid); err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
}
return codec.CreateResponse(req.id, true), nil
} }
return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as argument"}) return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as first argument"}), nil
} }
if req.callb.isSubscribe { if req.callb.isSubscribe {
subid, err := s.createSubscription(codec, req) subid, err := s.createSubscription(ctx, codec, req)
if err != nil { if err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}) return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()}), nil
} }
return codec.CreateResponse(req.id, subid)
// active the subscription after the sub id was successful sent to the client
activateSub := func() {
notifier, _ := ctx.Value(NotifierContextKey).(*bufferedNotifier)
notifier.activate(subid)
}
return codec.CreateResponse(req.id, subid), activateSub
} }
// regular RPC call // regular RPC call, prepare arguments
if len(req.args) != len(req.callb.argTypes) { if len(req.args) != len(req.callb.argTypes) {
rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d", rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
req.svcname, serviceMethodSeparator, req.callb.method.Name, req.svcname, serviceMethodSeparator, req.callb.method.Name,
len(req.callb.argTypes), len(req.args))} len(req.callb.argTypes), len(req.args))}
return codec.CreateErrorResponse(&req.id, rpcErr) return codec.CreateErrorResponse(&req.id, rpcErr), nil
} }
arguments := []reflect.Value{req.callb.rcvr} arguments := []reflect.Value{req.callb.rcvr}
@ -351,45 +321,56 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
arguments = append(arguments, req.args...) arguments = append(arguments, req.args...)
} }
// execute RPC method and return result
reply := req.callb.method.Func.Call(arguments) reply := req.callb.method.Func.Call(arguments)
if len(reply) == 0 { if len(reply) == 0 {
return codec.CreateResponse(req.id, nil) return codec.CreateResponse(req.id, nil), nil
} }
if req.callb.errPos >= 0 { // test if method returned an error if req.callb.errPos >= 0 { // test if method returned an error
if !reply[req.callb.errPos].IsNil() { if !reply[req.callb.errPos].IsNil() {
e := reply[req.callb.errPos].Interface().(error) e := reply[req.callb.errPos].Interface().(error)
res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()}) res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
return res return res, nil
} }
} }
return codec.CreateResponse(req.id, reply[0].Interface()) return codec.CreateResponse(req.id, reply[0].Interface()), nil
} }
// exec executes the given request and writes the result back using the codec. // exec executes the given request and writes the result back using the codec.
func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) { func (s *Server) exec(ctx context.Context, codec ServerCodec, req *serverRequest) {
var response interface{} var response interface{}
var callback func()
if req.err != nil { if req.err != nil {
response = codec.CreateErrorResponse(&req.id, req.err) response = codec.CreateErrorResponse(&req.id, req.err)
} else { } else {
response = s.handle(ctx, codec, req) response, callback = s.handle(ctx, codec, req)
} }
if err := codec.Write(response); err != nil { if err := codec.Write(response); err != nil {
glog.V(logger.Error).Infof("%v\n", err) glog.V(logger.Error).Infof("%v\n", err)
codec.Close() codec.Close()
} }
// when request was a subscribe request this allows these subscriptions to be actived
if callback != nil {
callback()
}
} }
// execBatch executes the given requests and writes the result back using the codec. It will only write the response // execBatch executes the given requests and writes the result back using the codec.
// back when the last request is processed. // It will only write the response back when the last request is processed.
func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) { func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*serverRequest) {
responses := make([]interface{}, len(requests)) responses := make([]interface{}, len(requests))
var callbacks []func()
for i, req := range requests { for i, req := range requests {
if req.err != nil { if req.err != nil {
responses[i] = codec.CreateErrorResponse(&req.id, req.err) responses[i] = codec.CreateErrorResponse(&req.id, req.err)
} else { } else {
responses[i] = s.handle(ctx, codec, req) var callback func()
if responses[i], callback = s.handle(ctx, codec, req); callback != nil {
callbacks = append(callbacks, callback)
}
} }
} }
@ -397,11 +378,16 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
glog.V(logger.Error).Infof("%v\n", err) glog.V(logger.Error).Infof("%v\n", err)
codec.Close() codec.Close()
} }
// when request holds one of more subscribe requests this allows these subscriptions to be actived
for _, c := range callbacks {
c()
}
} }
// readRequest requests the next (batch) request from the codec. It will return the collection of requests, an // readRequest requests the next (batch) request from the codec. It will return the collection
// indication if the request was a batch, the invalid request identifier and an error when the request could not be // of requests, an indication if the request was a batch, the invalid request identifier and an
// read/parsed. // error when the request could not be read/parsed.
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) { func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) {
reqs, batch, err := codec.ReadRequestHeaders() reqs, batch, err := codec.ReadRequestHeaders()
if err != nil { if err != nil {
@ -417,7 +403,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
if r.isPubSub && r.method == unsubscribeMethod { if r.isPubSub && r.method == unsubscribeMethod {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")} argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
requests[i].args = args requests[i].args = args
} else { } else {
@ -426,12 +412,12 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
continue continue
} }
if svc, ok = s.services[r.service]; !ok { if svc, ok = s.services[r.service]; !ok { // rpc method isn't available
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}} requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
continue continue
} }
if r.isPubSub { // eth_subscribe if r.isPubSub { // eth_subscribe, r.method contains the subscription method name
if callb, ok := svc.subscriptions[r.method]; ok { if callb, ok := svc.subscriptions[r.method]; ok {
requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
if r.params != nil && len(callb.argTypes) > 0 { if r.params != nil && len(callb.argTypes) > 0 {
@ -449,7 +435,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCErro
continue continue
} }
if callb, ok := svc.callbacks[r.method]; ok { if callb, ok := svc.callbacks[r.method]; ok { // lookup RPC method
requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb} requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
if r.params != nil && len(callb.argTypes) > 0 { if r.params != nil && len(callb.argTypes) > 0 {
if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil { if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {

@ -65,8 +65,12 @@ func (s *Service) InvalidRets3() (string, string, error) {
return "", "", nil return "", "", nil
} }
func (s *Service) Subscription() (Subscription, error) { func (s *Service) Subscription(ctx context.Context) (Subscription, error) {
return NewSubscription(nil), nil return nil, nil
}
func (s *Service) SubsriptionWithArgs(ctx context.Context, a, b int) (Subscription, error) {
return nil, nil
} }
func TestServerRegisterName(t *testing.T) { func TestServerRegisterName(t *testing.T) {
@ -90,8 +94,8 @@ func TestServerRegisterName(t *testing.T) {
t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks)) t.Errorf("Expected 4 callbacks for service 'calc', got %d", len(svc.callbacks))
} }
if len(svc.subscriptions) != 1 { if len(svc.subscriptions) != 2 {
t.Errorf("Expected 1 subscription for service 'calc', got %d", len(svc.subscriptions)) t.Errorf("Expected 2 subscriptions for service 'calc', got %d", len(svc.subscriptions))
} }
} }
@ -229,7 +233,7 @@ func TestServerMethodExecution(t *testing.T) {
input, _ := json.Marshal(&req) input, _ := json.Marshal(&req)
codec := &ServerTestCodec{input: input, closer: make(chan interface{})} codec := &ServerTestCodec{input: input, closer: make(chan interface{})}
go server.ServeCodec(codec) go server.ServeCodec(codec, OptionMethodInvocation)
<-codec.closer <-codec.closer
@ -259,7 +263,7 @@ func TestServerMethodWithCtx(t *testing.T) {
input, _ := json.Marshal(&req) input, _ := json.Marshal(&req)
codec := &ServerTestCodec{input: input, closer: make(chan interface{})} codec := &ServerTestCodec{input: input, closer: make(chan interface{})}
go server.ServeCodec(codec) go server.ServeCodec(codec, OptionMethodInvocation)
<-codec.closer <-codec.closer

@ -24,7 +24,6 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/ethereum/go-ethereum/event"
"gopkg.in/fatih/set.v0" "gopkg.in/fatih/set.v0"
) )
@ -66,10 +65,10 @@ type serverRequest struct {
err RPCError err RPCError
} }
type serviceRegistry map[string]*service // collection of services type serviceRegistry map[string]*service // collection of services
type callbacks map[string]*callback // collection of RPC callbacks type callbacks map[string]*callback // collection of RPC callbacks
type subscriptions map[string]*callback // collection of subscription callbacks type subscriptions map[string]*callback // collection of subscription callbacks
type subscriptionRegistry map[string]Subscription // collection of subscriptions type subscriptionRegistry map[string]*callback // collection of subscription callbacks
// Server represents a RPC server // Server represents a RPC server
type Server struct { type Server struct {
@ -123,51 +122,6 @@ type ServerCodec interface {
Closed() <-chan interface{} Closed() <-chan interface{}
} }
// SubscriptionMatcher returns true if the given value matches the criteria specified by the user
type SubscriptionMatcher func(interface{}) bool
// SubscriptionOutputFormat accepts event data and has the ability to format the data before it is send to the client
type SubscriptionOutputFormat func(interface{}) interface{}
// defaultSubscriptionOutputFormatter returns data and is used as default output format for notifications
func defaultSubscriptionOutputFormatter(data interface{}) interface{} {
return data
}
// Subscription is used by the server to send notifications to the client
type Subscription struct {
sub event.Subscription
match SubscriptionMatcher
format SubscriptionOutputFormat
}
// NewSubscription create a new RPC subscription
func NewSubscription(sub event.Subscription) Subscription {
return Subscription{sub, nil, defaultSubscriptionOutputFormatter}
}
// NewSubscriptionWithOutputFormat create a new RPC subscription which a custom notification output format
func NewSubscriptionWithOutputFormat(sub event.Subscription, formatter SubscriptionOutputFormat) Subscription {
return Subscription{sub, nil, formatter}
}
// NewSubscriptionFiltered will create a new subscription. For each raised event the given matcher is
// called. If it returns true the event is send as notification to the client, otherwise it is ignored.
func NewSubscriptionFiltered(sub event.Subscription, match SubscriptionMatcher) Subscription {
return Subscription{sub, match, defaultSubscriptionOutputFormatter}
}
// Chan returns the channel where new events will be published. It's up the user to call the matcher to
// determine if the events are interesting for the client.
func (s *Subscription) Chan() <-chan *event.Event {
return s.sub.Chan()
}
// Unsubscribe will end the subscription and closes the event channel
func (s *Subscription) Unsubscribe() {
s.sub.Unsubscribe()
}
// HexNumber serializes a number to hex format using the "%#x" format // HexNumber serializes a number to hex format using the "%#x" format
type HexNumber big.Int type HexNumber big.Int

@ -45,6 +45,16 @@ func isExportedOrBuiltinType(t reflect.Type) bool {
return isExported(t.Name()) || t.PkgPath() == "" return isExported(t.Name()) || t.PkgPath() == ""
} }
var contextType = reflect.TypeOf((*context.Context)(nil)).Elem()
// isContextType returns an indication if the given t is of context.Context or *context.Context type
func isContextType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t == contextType
}
var errorType = reflect.TypeOf((*error)(nil)).Elem() var errorType = reflect.TypeOf((*error)(nil)).Elem()
// Implements this type the error interface // Implements this type the error interface
@ -57,6 +67,7 @@ func isErrorType(t reflect.Type) bool {
var subscriptionType = reflect.TypeOf((*Subscription)(nil)).Elem() var subscriptionType = reflect.TypeOf((*Subscription)(nil)).Elem()
// isSubscriptionType returns an indication if the given t is of Subscription or *Subscription type
func isSubscriptionType(t reflect.Type) bool { func isSubscriptionType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr { for t.Kind() == reflect.Ptr {
t = t.Elem() t = t.Elem()
@ -64,12 +75,17 @@ func isSubscriptionType(t reflect.Type) bool {
return t == subscriptionType return t == subscriptionType
} }
// isPubSub tests whether the given method return the pair (v2.Subscription, error) // isPubSub tests whether the given method has as as first argument a context.Context
// and returns the pair (Subscription, error)
func isPubSub(methodType reflect.Type) bool { func isPubSub(methodType reflect.Type) bool {
if methodType.NumOut() != 2 { // numIn(0) is the receiver type
if methodType.NumIn() < 2 || methodType.NumOut() != 2 {
return false return false
} }
return isSubscriptionType(methodType.Out(0)) && isErrorType(methodType.Out(1))
return isContextType(methodType.In(1)) &&
isSubscriptionType(methodType.Out(0)) &&
isErrorType(methodType.Out(1))
} }
// formatName will convert to first character to lower case // formatName will convert to first character to lower case
@ -110,8 +126,6 @@ func isBlockNumber(t reflect.Type) bool {
return t == blockNumberType return t == blockNumberType
} }
var contextType = reflect.TypeOf(new(context.Context)).Elem()
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria // suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server // for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria. // documentation for a summary of these criteria.
@ -205,7 +219,7 @@ METHODS:
return callbacks, subscriptions return callbacks, subscriptions
} }
func newSubscriptionId() (string, error) { func newSubscriptionID() (string, error) {
var subid [16]byte var subid [16]byte
n, _ := rand.Read(subid[:]) n, _ := rand.Read(subid[:])
if n != 16 { if n != 16 {

@ -93,7 +93,8 @@ func NewWSServer(cors string, handler *Server) *http.Server {
Handler: websocket.Server{ Handler: websocket.Server{
Handshake: wsHandshakeValidator(strings.Split(cors, ",")), Handshake: wsHandshakeValidator(strings.Split(cors, ",")),
Handler: func(conn *websocket.Conn) { Handler: func(conn *websocket.Conn) {
handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn})) handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}),
OptionMethodInvocation|OptionSubscriptions)
}, },
}, },
} }