Add a Subscribe method to rpc client interface

This commit is contained in:
Elizabeth Engelman 2019-07-02 07:31:26 -05:00
parent 36533f7c3f
commit 34f8e16c11
3 changed files with 60 additions and 12 deletions

View File

@ -18,6 +18,7 @@ package core
import ( import (
"context" "context"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/vulcanizedb/pkg/geth/client" "github.com/vulcanize/vulcanizedb/pkg/geth/client"
) )
@ -27,4 +28,5 @@ type RpcClient interface {
BatchCall(batch []client.BatchElem) error BatchCall(batch []client.BatchElem) error
IpcPath() string IpcPath() string
SupportedModules() (map[string]string, error) SupportedModules() (map[string]string, error)
Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error)
} }

View File

@ -18,10 +18,13 @@ package fakes
import ( import (
"context" "context"
"errors"
"github.com/ethereum/go-ethereum/statediff"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/core"
@ -29,18 +32,45 @@ import (
) )
type MockRpcClient struct { type MockRpcClient struct {
callContextErr error callContextErr error
ipcPath string ipcPath string
nodeType core.NodeType nodeType core.NodeType
passedContext context.Context passedContext context.Context
passedMethod string passedMethod string
passedResult interface{} passedResult interface{}
passedBatch []client.BatchElem passedBatch []client.BatchElem
lengthOfBatch int passedNamespace string
returnPOAHeader core.POAHeader passedPayloadChan chan statediff.Payload
returnPOAHeaders []core.POAHeader passedSubscribeArgs []interface{}
returnPOWHeaders []*types.Header lengthOfBatch int
supportedModules map[string]string returnPOAHeader core.POAHeader
returnPOAHeaders []core.POAHeader
returnPOWHeaders []*types.Header
supportedModules map[string]string
}
func (client *MockRpcClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
client.passedNamespace = namespace
passedPayloadChan, ok := payloadChan.(chan statediff.Payload)
if !ok {
return nil, errors.New("passed in channel is not of the correct type")
}
client.passedPayloadChan = passedPayloadChan
for _, arg := range args {
client.passedSubscribeArgs = append(client.passedSubscribeArgs, arg)
}
subscription := rpc.ClientSubscription{}
return &subscription, nil
}
func (client *MockRpcClient) AssertSubscribeCalledWith(namespace string, payloadChan chan statediff.Payload, args []interface{}) {
Expect(client.passedNamespace).To(Equal(namespace))
Expect(client.passedPayloadChan).To(Equal(payloadChan))
Expect(client.passedSubscribeArgs).To(Equal(args))
} }
func NewMockRpcClient() *MockRpcClient { func NewMockRpcClient() *MockRpcClient {

View File

@ -18,6 +18,9 @@ package client
import ( import (
"context" "context"
"errors"
"reflect"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
@ -73,3 +76,16 @@ func (client RpcClient) BatchCall(batch []BatchElem) error {
} }
return client.client.BatchCall(rpcBatch) return client.client.BatchCall(rpcBatch)
} }
// Subscribe subscribes to an rpc "namespace_subscribe" subscription with the given channel
// The first argument needs to be the method we wish to invoke
func (client RpcClient) Subscribe(namespace string, payloadChan interface{}, args ...interface{}) (*rpc.ClientSubscription, error) {
chanVal := reflect.ValueOf(payloadChan)
if chanVal.Kind() != reflect.Chan || chanVal.Type().ChanDir()&reflect.SendDir == 0 {
return nil, errors.New("second argument to Subscribe must be a writable channel")
}
if chanVal.IsNil() {
return nil, errors.New("channel given to Subscribe must not be nil")
}
return client.client.Subscribe(context.Background(), namespace, payloadChan, args...)
}