From 34f8e16c114bf7f3c7035705e9f78577ab809b4c Mon Sep 17 00:00:00 2001 From: Elizabeth Engelman Date: Tue, 2 Jul 2019 07:31:26 -0500 Subject: [PATCH] Add a Subscribe method to rpc client interface --- pkg/core/rpc_client.go | 2 ++ pkg/fakes/mock_rpc_client.go | 54 +++++++++++++++++++++++++++-------- pkg/geth/client/rpc_client.go | 16 +++++++++++ 3 files changed, 60 insertions(+), 12 deletions(-) diff --git a/pkg/core/rpc_client.go b/pkg/core/rpc_client.go index 1f52b27a..a89ca230 100644 --- a/pkg/core/rpc_client.go +++ b/pkg/core/rpc_client.go @@ -18,6 +18,7 @@ package core import ( "context" + "github.com/ethereum/go-ethereum/rpc" "github.com/vulcanize/vulcanizedb/pkg/geth/client" ) @@ -27,4 +28,5 @@ type RpcClient interface { BatchCall(batch []client.BatchElem) error IpcPath() string SupportedModules() (map[string]string, error) + Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) } diff --git a/pkg/fakes/mock_rpc_client.go b/pkg/fakes/mock_rpc_client.go index fcf224cc..2adc6a62 100644 --- a/pkg/fakes/mock_rpc_client.go +++ b/pkg/fakes/mock_rpc_client.go @@ -18,10 +18,13 @@ package fakes import ( "context" + "errors" + "github.com/ethereum/go-ethereum/statediff" "math/big" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -29,18 +32,45 @@ import ( ) type MockRpcClient struct { - callContextErr error - ipcPath string - nodeType core.NodeType - passedContext context.Context - passedMethod string - passedResult interface{} - passedBatch []client.BatchElem - lengthOfBatch int - returnPOAHeader core.POAHeader - returnPOAHeaders []core.POAHeader - returnPOWHeaders []*types.Header - supportedModules map[string]string + callContextErr error + ipcPath string + nodeType core.NodeType + passedContext context.Context + passedMethod string + passedResult interface{} + passedBatch []client.BatchElem + passedNamespace string + passedPayloadChan chan statediff.Payload + passedSubscribeArgs []interface{} + lengthOfBatch int + returnPOAHeader core.POAHeader + returnPOAHeaders []core.POAHeader + returnPOWHeaders []*types.Header + supportedModules map[string]string +} + +func (client *MockRpcClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { + client.passedNamespace = namespace + + passedPayloadChan, ok := payloadChan.(chan statediff.Payload) + if !ok { + return nil, errors.New("passed in channel is not of the correct type") + } + client.passedPayloadChan = passedPayloadChan + + for _, arg := range args { + client.passedSubscribeArgs = append(client.passedSubscribeArgs, arg) + } + + subscription := rpc.ClientSubscription{} + return &subscription, nil +} + + +func (client *MockRpcClient) AssertSubscribeCalledWith(namespace string, payloadChan chan statediff.Payload, args []interface{}) { + Expect(client.passedNamespace).To(Equal(namespace)) + Expect(client.passedPayloadChan).To(Equal(payloadChan)) + Expect(client.passedSubscribeArgs).To(Equal(args)) } func NewMockRpcClient() *MockRpcClient { diff --git a/pkg/geth/client/rpc_client.go b/pkg/geth/client/rpc_client.go index b80a746d..bdcf49f4 100644 --- a/pkg/geth/client/rpc_client.go +++ b/pkg/geth/client/rpc_client.go @@ -18,6 +18,9 @@ package client import ( "context" + "errors" + "reflect" + "github.com/ethereum/go-ethereum/rpc" ) @@ -73,3 +76,16 @@ func (client RpcClient) BatchCall(batch []BatchElem) error { } return client.client.BatchCall(rpcBatch) } + +// Subscribe subscribes to an rpc "namespace_subscribe" subscription with the given channel +// The first argument needs to be the method we wish to invoke +func (client RpcClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) { + chanVal := reflect.ValueOf(payloadChan) + if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 { + return nil, errors.New("second argument to Subscribe must be a writable channel") + } + if chanVal.IsNil() { + return nil, errors.New("channel given to Subscribe must not be nil") + } + return client.client.Subscribe(context.Background(), namespace, payloadChan, args...) +} \ No newline at end of file