Merge pull request #13885 from bas-vk/rpc_generic_pubsub

rpc: support subscriptions under custom namespaces
This commit is contained in:
Péter Szilágyi 2017-05-03 11:41:07 +03:00 committed by GitHub
commit a8eafcdc0e
6 changed files with 217 additions and 55 deletions

@ -27,6 +27,7 @@ import (
"net/url" "net/url"
"reflect" "reflect"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -373,14 +374,14 @@ func (c *Client) EthSubscribe(ctx context.Context, channel interface{}, args ...
return nil, ErrNotificationsUnsupported return nil, ErrNotificationsUnsupported
} }
msg, err := c.newMessage(subscribeMethod, args...) msg, err := c.newMessage("eth"+subscribeMethodSuffix, args...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
op := &requestOp{ op := &requestOp{
ids: []json.RawMessage{msg.ID}, ids: []json.RawMessage{msg.ID},
resp: make(chan *jsonrpcMessage), resp: make(chan *jsonrpcMessage),
sub: newClientSubscription(c, chanVal), sub: newClientSubscription(c, "eth", chanVal),
} }
// Send the subscription request. // Send the subscription request.
@ -575,7 +576,7 @@ func (c *Client) closeRequestOps(err error) {
} }
func (c *Client) handleNotification(msg *jsonrpcMessage) { func (c *Client) handleNotification(msg *jsonrpcMessage) {
if msg.Method != notificationMethod { if !strings.HasSuffix(msg.Method, notificationMethodSuffix) {
log.Debug(fmt.Sprint("dropping non-subscription message: ", msg)) log.Debug(fmt.Sprint("dropping non-subscription message: ", msg))
return return
} }
@ -653,11 +654,12 @@ func (c *Client) read(conn net.Conn) error {
// A ClientSubscription represents a subscription established through EthSubscribe. // A ClientSubscription represents a subscription established through EthSubscribe.
type ClientSubscription struct { type ClientSubscription struct {
client *Client client *Client
etype reflect.Type etype reflect.Type
channel reflect.Value channel reflect.Value
subid string namespace string
in chan json.RawMessage subid string
in chan json.RawMessage
quitOnce sync.Once // ensures quit is closed once quitOnce sync.Once // ensures quit is closed once
quit chan struct{} // quit is closed when the subscription exits quit chan struct{} // quit is closed when the subscription exits
@ -665,14 +667,15 @@ type ClientSubscription struct {
err chan error err chan error
} }
func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription { func newClientSubscription(c *Client, namespace string, channel reflect.Value) *ClientSubscription {
sub := &ClientSubscription{ sub := &ClientSubscription{
client: c, client: c,
etype: channel.Type().Elem(), namespace: namespace,
channel: channel, etype: channel.Type().Elem(),
quit: make(chan struct{}), channel: channel,
err: make(chan error, 1), quit: make(chan struct{}),
in: make(chan json.RawMessage), err: make(chan error, 1),
in: make(chan json.RawMessage),
} }
return sub return sub
} }
@ -774,5 +777,5 @@ func (sub *ClientSubscription) unmarshal(result json.RawMessage) (interface{}, e
func (sub *ClientSubscription) requestUnsubscribe() error { func (sub *ClientSubscription) requestUnsubscribe() error {
var result interface{} var result interface{}
return sub.client.Call(&result, unsubscribeMethod, sub.subid) return sub.client.Call(&result, sub.namespace+unsubscribeMethodSuffix, sub.subid)
} }

@ -30,11 +30,11 @@ import (
) )
const ( const (
jsonrpcVersion = "2.0" jsonrpcVersion = "2.0"
serviceMethodSeparator = "_" serviceMethodSeparator = "_"
subscribeMethod = "eth_subscribe" subscribeMethodSuffix = "_subscribe"
unsubscribeMethod = "eth_unsubscribe" unsubscribeMethodSuffix = "_unsubscribe"
notificationMethod = "eth_subscription" notificationMethodSuffix = "_subscription"
) )
type jsonRequest struct { type jsonRequest struct {
@ -164,7 +164,7 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
} }
// subscribe are special, they will always use `subscribeMethod` as first param in the payload // subscribe are special, they will always use `subscribeMethod` as first param in the payload
if in.Method == subscribeMethod { if strings.HasSuffix(in.Method, subscribeMethodSuffix) {
reqs := []rpcRequest{{id: &in.Id, isPubSub: true}} reqs := []rpcRequest{{id: &in.Id, isPubSub: true}}
if len(in.Payload) > 0 { if len(in.Payload) > 0 {
// first param must be subscription name // first param must be subscription name
@ -174,17 +174,16 @@ func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error) {
return nil, false, &invalidRequestError{"Unable to parse subscription request"} return nil, false, &invalidRequestError{"Unable to parse subscription request"}
} }
// all subscriptions are made on the eth service reqs[0].service, reqs[0].method = strings.TrimSuffix(in.Method, subscribeMethodSuffix), subscribeMethod[0]
reqs[0].service, reqs[0].method = "eth", subscribeMethod[0]
reqs[0].params = in.Payload reqs[0].params = in.Payload
return reqs, false, nil return reqs, false, nil
} }
return nil, false, &invalidRequestError{"Unable to parse subscription request"} return nil, false, &invalidRequestError{"Unable to parse subscription request"}
} }
if in.Method == unsubscribeMethod { if strings.HasSuffix(in.Method, unsubscribeMethodSuffix) {
return []rpcRequest{{id: &in.Id, isPubSub: true, return []rpcRequest{{id: &in.Id, isPubSub: true,
method: unsubscribeMethod, params: in.Payload}}, false, nil method: in.Method, params: in.Payload}}, false, nil
} }
elems := strings.Split(in.Method, serviceMethodSeparator) elems := strings.Split(in.Method, serviceMethodSeparator)
@ -216,8 +215,8 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error)
id := &in[i].Id id := &in[i].Id
// subscribe are special, they will always use `subscribeMethod` as first param in the payload // subscribe are special, they will always use `subscriptionMethod` as first param in the payload
if r.Method == subscribeMethod { if strings.HasSuffix(r.Method, subscribeMethodSuffix) {
requests[i] = rpcRequest{id: id, isPubSub: true} requests[i] = rpcRequest{id: id, isPubSub: true}
if len(r.Payload) > 0 { if len(r.Payload) > 0 {
// first param must be subscription name // first param must be subscription name
@ -227,8 +226,7 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error)
return nil, false, &invalidRequestError{"Unable to parse subscription request"} return nil, false, &invalidRequestError{"Unable to parse subscription request"}
} }
// all subscriptions are made on the eth service requests[i].service, requests[i].method = strings.TrimSuffix(r.Method, subscribeMethodSuffix), subscribeMethod[0]
requests[i].service, requests[i].method = "eth", subscribeMethod[0]
requests[i].params = r.Payload requests[i].params = r.Payload
continue continue
} }
@ -236,8 +234,8 @@ func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, Error)
return nil, true, &invalidRequestError{"Unable to parse (un)subscribe request arguments"} return nil, true, &invalidRequestError{"Unable to parse (un)subscribe request arguments"}
} }
if r.Method == unsubscribeMethod { if strings.HasSuffix(r.Method, unsubscribeMethodSuffix) {
requests[i] = rpcRequest{id: id, isPubSub: true, method: unsubscribeMethod, params: r.Payload} requests[i] = rpcRequest{id: id, isPubSub: true, method: r.Method, params: r.Payload}
continue continue
} }
@ -325,13 +323,13 @@ func (c *jsonCodec) CreateErrorResponseWithInfo(id interface{}, err Error, info
} }
// CreateNotification will create a JSON-RPC notification with the given subscription id and event as params. // CreateNotification will create a JSON-RPC notification with the given subscription id and event as params.
func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} { func (c *jsonCodec) CreateNotification(subid, namespace string, event interface{}) interface{} {
if isHexNum(reflect.TypeOf(event)) { if isHexNum(reflect.TypeOf(event)) {
return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod, return &jsonNotification{Version: jsonrpcVersion, Method: namespace + notificationMethodSuffix,
Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}} Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}}
} }
return &jsonNotification{Version: jsonrpcVersion, Method: notificationMethod, return &jsonNotification{Version: jsonrpcVersion, Method: namespace + notificationMethodSuffix,
Params: jsonSubscription{Subscription: subid, Result: event}} Params: jsonSubscription{Subscription: subid, Result: event}}
} }

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"runtime" "runtime"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -96,32 +97,30 @@ func (s *Server) RegisterName(name string, rcvr interface{}) error {
return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name()) return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
} }
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
// already a previous service register under given sname, merge methods/subscriptions // already a previous service register under given sname, merge methods/subscriptions
if regsvc, present := s.services[name]; present { if regsvc, present := s.services[name]; present {
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
if len(methods) == 0 && len(subscriptions) == 0 { if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
} }
for _, m := range methods { for _, m := range methods {
regsvc.callbacks[formatName(m.method.Name)] = m regsvc.callbacks[formatName(m.method.Name)] = m
} }
for _, s := range subscriptions { for _, s := range subscriptions {
regsvc.subscriptions[formatName(s.method.Name)] = s regsvc.subscriptions[formatName(s.method.Name)] = s
} }
return nil return nil
} }
svc.name = name svc.name = name
svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ) svc.callbacks, svc.subscriptions = methods, subscriptions
if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 { if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr) return fmt.Errorf("Service %T doesn't have any suitable methods/subscriptions to expose", rcvr)
} }
s.services[svc.name] = svc s.services[svc.name] = svc
return nil return nil
} }
@ -303,7 +302,7 @@ func (s *Server) handle(ctx context.Context, codec ServerCodec, req *serverReque
// active the subscription after the sub id was successfully sent to the client // active the subscription after the sub id was successfully sent to the client
activateSub := func() { activateSub := func() {
notifier, _ := NotifierFromContext(ctx) notifier, _ := NotifierFromContext(ctx)
notifier.activate(subid) notifier.activate(subid, req.svcname)
} }
return codec.CreateResponse(req.id, subid), activateSub return codec.CreateResponse(req.id, subid), activateSub
@ -383,7 +382,7 @@ func (s *Server) execBatch(ctx context.Context, codec ServerCodec, requests []*s
codec.Close() codec.Close()
} }
// when request holds one of more subscribe requests this allows these subscriptions to be actived // when request holds one of more subscribe requests this allows these subscriptions to be activated
for _, c := range callbacks { for _, c := range callbacks {
c() c()
} }
@ -410,7 +409,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error)
continue continue
} }
if r.isPubSub && r.method == unsubscribeMethod { if r.isPubSub && strings.HasSuffix(r.method, unsubscribeMethodSuffix) {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true} requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg argTypes := []reflect.Type{reflect.TypeOf("")} // expect subscription id as first arg
if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil { if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
@ -439,7 +438,7 @@ func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, Error)
} }
} }
} else { } else {
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}} requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.method, r.method}}
} }
continue continue
} }

@ -35,8 +35,9 @@ type ID string
// a Subscription is created by a notifier and tight to that notifier. The client can use // a Subscription is created by a notifier and tight to that notifier. The client can use
// this subscription to wait for an unsubscribe request for the client, see Err(). // this subscription to wait for an unsubscribe request for the client, see Err().
type Subscription struct { type Subscription struct {
ID ID ID ID
err chan error // closed on unsubscribe namespace string
err chan error // closed on unsubscribe
} }
// Err returns a channel that is closed when the client send an unsubscribe request. // Err returns a channel that is closed when the client send an unsubscribe request.
@ -78,7 +79,7 @@ func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
// are dropped until the subscription is marked as active. This is done // are dropped until the subscription is marked as active. This is done
// by the RPC server after the subscription ID is send to the client. // by the RPC server after the subscription ID is send to the client.
func (n *Notifier) CreateSubscription() *Subscription { func (n *Notifier) CreateSubscription() *Subscription {
s := &Subscription{NewID(), make(chan error)} s := &Subscription{ID: NewID(), err: make(chan error)}
n.subMu.Lock() n.subMu.Lock()
n.inactive[s.ID] = s n.inactive[s.ID] = s
n.subMu.Unlock() n.subMu.Unlock()
@ -91,9 +92,9 @@ func (n *Notifier) Notify(id ID, data interface{}) error {
n.subMu.RLock() n.subMu.RLock()
defer n.subMu.RUnlock() defer n.subMu.RUnlock()
_, active := n.active[id] sub, active := n.active[id]
if active { if active {
notification := n.codec.CreateNotification(string(id), data) notification := n.codec.CreateNotification(string(id), sub.namespace, data)
if err := n.codec.Write(notification); err != nil { if err := n.codec.Write(notification); err != nil {
n.codec.Close() n.codec.Close()
return err return err
@ -124,10 +125,11 @@ func (n *Notifier) unsubscribe(id ID) error {
// notifications are dropped. This method is called by the RPC server after // notifications are dropped. This method is called by the RPC server after
// the subscription ID was sent to client. This prevents notifications being // the subscription ID was sent to client. This prevents notifications being
// send to the client before the subscription ID is send to the client. // send to the client before the subscription ID is send to the client.
func (n *Notifier) activate(id ID) { func (n *Notifier) activate(id ID, namespace string) {
n.subMu.Lock() n.subMu.Lock()
defer n.subMu.Unlock() defer n.subMu.Unlock()
if sub, found := n.inactive[id]; found { if sub, found := n.inactive[id]; found {
sub.namespace = namespace
n.active[id] = sub n.active[id] = sub
delete(n.inactive, id) delete(n.inactive, id)
} }

@ -19,6 +19,7 @@ package rpc
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net" "net"
"sync" "sync"
"testing" "testing"
@ -162,3 +163,162 @@ func TestNotifications(t *testing.T) {
t.Error("unsubscribe callback not called after closing connection") t.Error("unsubscribe callback not called after closing connection")
} }
} }
func waitForMessages(t *testing.T, in *json.Decoder, successes chan<- jsonSuccessResponse,
failures chan<- jsonErrResponse, notifications chan<- jsonNotification) {
// read and parse server messages
for {
var rmsg json.RawMessage
if err := in.Decode(&rmsg); err != nil {
return
}
var responses []map[string]interface{}
if rmsg[0] == '[' {
if err := json.Unmarshal(rmsg, &responses); err != nil {
t.Fatalf("Received invalid message: %s", rmsg)
}
} else {
var msg map[string]interface{}
if err := json.Unmarshal(rmsg, &msg); err != nil {
t.Fatalf("Received invalid message: %s", rmsg)
}
responses = append(responses, msg)
}
for _, msg := range responses {
// determine what kind of msg was received and broadcast
// it to over the corresponding channel
if _, found := msg["result"]; found {
successes <- jsonSuccessResponse{
Version: msg["jsonrpc"].(string),
Id: msg["id"],
Result: msg["result"],
}
continue
}
if _, found := msg["error"]; found {
params := msg["params"].(map[string]interface{})
failures <- jsonErrResponse{
Version: msg["jsonrpc"].(string),
Id: msg["id"],
Error: jsonError{int(params["subscription"].(float64)), params["message"].(string), params["data"]},
}
continue
}
if _, found := msg["params"]; found {
params := msg["params"].(map[string]interface{})
notifications <- jsonNotification{
Version: msg["jsonrpc"].(string),
Method: msg["method"].(string),
Params: jsonSubscription{params["subscription"].(string), params["result"]},
}
continue
}
t.Fatalf("Received invalid message: %s", msg)
}
}
}
// TestSubscriptionMultipleNamespaces ensures that subscriptions can exists
// for multiple different namespaces.
func TestSubscriptionMultipleNamespaces(t *testing.T) {
var (
namespaces = []string{"eth", "shh", "bzz"}
server = NewServer()
service = NotificationTestService{}
clientConn, serverConn = net.Pipe()
out = json.NewEncoder(clientConn)
in = json.NewDecoder(clientConn)
successes = make(chan jsonSuccessResponse)
failures = make(chan jsonErrResponse)
notifications = make(chan jsonNotification)
)
// setup and start server
for _, namespace := range namespaces {
if err := server.RegisterName(namespace, &service); err != nil {
t.Fatalf("unable to register test service %v", err)
}
}
go server.ServeCodec(NewJSONCodec(serverConn), OptionMethodInvocation|OptionSubscriptions)
defer server.Stop()
// wait for message and write them to the given channels
go waitForMessages(t, in, successes, failures, notifications)
// create subscriptions one by one
n := 3
for i, namespace := range namespaces {
request := map[string]interface{}{
"id": i,
"method": fmt.Sprintf("%s_subscribe", namespace),
"version": "2.0",
"params": []interface{}{"someSubscription", n, i},
}
if err := out.Encode(&request); err != nil {
t.Fatalf("Could not create subscription: %v", err)
}
}
// create all subscriptions in 1 batch
var requests []interface{}
for i, namespace := range namespaces {
requests = append(requests, map[string]interface{}{
"id": i,
"method": fmt.Sprintf("%s_subscribe", namespace),
"version": "2.0",
"params": []interface{}{"someSubscription", n, i},
})
}
if err := out.Encode(&requests); err != nil {
t.Fatalf("Could not create subscription in batch form: %v", err)
}
timeout := time.After(30 * time.Second)
subids := make(map[string]string, 2*len(namespaces))
count := make(map[string]int, 2*len(namespaces))
for {
done := true
for id, _ := range count {
if count, found := count[id]; !found || count < (2*n) {
done = false
}
}
if done && len(count) == len(namespaces) {
break
}
select {
case suc := <-successes: // subscription created
subids[namespaces[int(suc.Id.(float64))]] = suc.Result.(string)
case failure := <-failures:
t.Errorf("received error: %v", failure.Error)
case notification := <-notifications:
if cnt, found := count[notification.Params.Subscription]; found {
count[notification.Params.Subscription] = cnt + 1
} else {
count[notification.Params.Subscription] = 1
}
case <-timeout:
for _, namespace := range namespaces {
subid, found := subids[namespace]
if !found {
t.Errorf("Subscription for '%s' not created", namespace)
continue
}
if count, found := count[subid]; !found || count < n {
t.Errorf("Didn't receive all notifications (%d<%d) in time for namespace '%s'", count, n, namespace)
}
}
return
}
}
}

@ -104,17 +104,17 @@ type ServerCodec interface {
// Read next request // Read next request
ReadRequestHeaders() ([]rpcRequest, bool, Error) ReadRequestHeaders() ([]rpcRequest, bool, Error)
// Parse request argument to the given types // Parse request argument to the given types
ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, Error) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, Error)
// Assemble success response, expects response id and payload // Assemble success response, expects response id and payload
CreateResponse(interface{}, interface{}) interface{} CreateResponse(id interface{}, reply interface{}) interface{}
// Assemble error response, expects response id and error // Assemble error response, expects response id and error
CreateErrorResponse(interface{}, Error) interface{} CreateErrorResponse(id interface{}, err Error) interface{}
// Assemble error response with extra information about the error through info // Assemble error response with extra information about the error through info
CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{} CreateErrorResponseWithInfo(id interface{}, err Error, info interface{}) interface{}
// Create notification response // Create notification response
CreateNotification(string, interface{}) interface{} CreateNotification(id, namespace string, event interface{}) interface{}
// Write msg to client. // Write msg to client.
Write(interface{}) error Write(msg interface{}) error
// Close underlying data stream // Close underlying data stream
Close() Close()
// Closed when underlying connection is closed // Closed when underlying connection is closed