Fix signature of EthSubscribe

This commit is contained in:
Ian Davis 2022-11-16 12:16:19 +00:00
parent 8134d2f05b
commit 314fb31886
5 changed files with 30 additions and 41 deletions

View File

@ -824,7 +824,7 @@ type FullNode interface {
// - logs: notify new event logs that match a criteria // - logs: notify new event logs that match a criteria
// params contains additional parameters used with the log event type // params contains additional parameters used with the log event type
// The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called. // The client will receive a stream of EthSubscriptionResponse values until EthUnsubscribe is called.
EthSubscribe(ctx context.Context, eventTypes []string, params EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) //perm:write EthSubscribe(ctx context.Context, eventType string, params *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) //perm:write
// Unsubscribe from a websocket subscription // Unsubscribe from a websocket subscription
EthUnsubscribe(ctx context.Context, id EthSubscriptionID) (bool, error) //perm:write EthUnsubscribe(ctx context.Context, id EthSubscriptionID) (bool, error) //perm:write

View File

@ -1342,7 +1342,7 @@ func (mr *MockFullNodeMockRecorder) EthSendRawTransaction(arg0, arg1 interface{}
} }
// EthSubscribe mocks base method. // EthSubscribe mocks base method.
func (m *MockFullNode) EthSubscribe(arg0 context.Context, arg1 []string, arg2 api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { func (m *MockFullNode) EthSubscribe(arg0 context.Context, arg1 string, arg2 *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "EthSubscribe", arg0, arg1, arg2) ret := m.ctrl.Call(m, "EthSubscribe", arg0, arg1, arg2)
ret0, _ := ret[0].(<-chan api.EthSubscriptionResponse) ret0, _ := ret[0].(<-chan api.EthSubscriptionResponse)

View File

@ -275,7 +275,7 @@ type FullNodeStruct struct {
EthSendRawTransaction func(p0 context.Context, p1 EthBytes) (EthHash, error) `perm:"read"` EthSendRawTransaction func(p0 context.Context, p1 EthBytes) (EthHash, error) `perm:"read"`
EthSubscribe func(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) `perm:"write"` EthSubscribe func(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) `perm:"write"`
EthUninstallFilter func(p0 context.Context, p1 EthFilterID) (bool, error) `perm:"write"` EthUninstallFilter func(p0 context.Context, p1 EthFilterID) (bool, error) `perm:"write"`
@ -2166,14 +2166,14 @@ func (s *FullNodeStub) EthSendRawTransaction(p0 context.Context, p1 EthBytes) (E
return *new(EthHash), ErrNotSupported return *new(EthHash), ErrNotSupported
} }
func (s *FullNodeStruct) EthSubscribe(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) { func (s *FullNodeStruct) EthSubscribe(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) {
if s.Internal.EthSubscribe == nil { if s.Internal.EthSubscribe == nil {
return nil, ErrNotSupported return nil, ErrNotSupported
} }
return s.Internal.EthSubscribe(p0, p1, p2) return s.Internal.EthSubscribe(p0, p1, p2)
} }
func (s *FullNodeStub) EthSubscribe(p0 context.Context, p1 []string, p2 EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) { func (s *FullNodeStub) EthSubscribe(p0 context.Context, p1 string, p2 *EthSubscriptionParams) (<-chan EthSubscriptionResponse, error) {
return nil, ErrNotSupported return nil, ErrNotSupported
} }

View File

@ -2796,9 +2796,7 @@ Perms: write
Inputs: Inputs:
```json ```json
[ [
[ "string value",
"string value"
],
{ {
"topics": [ "topics": [
[ [

View File

@ -70,7 +70,7 @@ type EthEventAPI interface {
EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error) EthNewBlockFilter(ctx context.Context) (api.EthFilterID, error)
EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error) EthNewPendingTransactionFilter(ctx context.Context) (api.EthFilterID, error)
EthUninstallFilter(ctx context.Context, id api.EthFilterID) (bool, error) EthUninstallFilter(ctx context.Context, id api.EthFilterID) (bool, error)
EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error)
EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error) EthUnsubscribe(ctx context.Context, id api.EthSubscriptionID) (bool, error)
} }
@ -1135,28 +1135,16 @@ const (
EthSubscribeEventTypeLogs = "logs" EthSubscribeEventTypeLogs = "logs"
) )
func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) { func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *api.EthSubscriptionParams) (<-chan api.EthSubscriptionResponse, error) {
// Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the // Note that go-jsonrpc will set the method field of the response to "xrpc.ch.val" but the ethereum api expects the name of the
// method to be "eth_subscription". This probably doesn't matter in practice. // method to be "eth_subscription". This probably doesn't matter in practice.
// Validate event types and parameters first
for _, et := range eventTypes {
switch et {
case EthSubscribeEventTypeHeads:
case EthSubscribeEventTypeLogs:
default:
return nil, xerrors.Errorf("unsupported event type: %s", et)
}
}
sub, err := e.SubManager.StartSubscription(ctx) sub, err := e.SubManager.StartSubscription(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, et := range eventTypes { switch eventType {
switch et {
case EthSubscribeEventTypeHeads: case EthSubscribeEventTypeHeads:
f, err := e.TipSetFilterManager.Install(ctx) f, err := e.TipSetFilterManager.Install(ctx)
if err != nil { if err != nil {
@ -1168,6 +1156,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params
case EthSubscribeEventTypeLogs: case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{} keys := map[string][][]byte{}
if params != nil {
for idx, vals := range params.Topics { for idx, vals := range params.Topics {
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4 // Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1) key := fmt.Sprintf("topic%d", idx+1)
@ -1177,6 +1166,7 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params
} }
keys[key] = keyvals keys[key] = keyvals
} }
}
f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys) f, err := e.EventFilterManager.Install(ctx, -1, -1, cid.Undef, []address.Address{}, keys)
if err != nil { if err != nil {
@ -1185,7 +1175,8 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventTypes []string, params
return nil, err return nil, err
} }
sub.addFilter(ctx, f) sub.addFilter(ctx, f)
} default:
return nil, xerrors.Errorf("unsupported event type: %s", eventType)
} }
return sub.out, nil return sub.out, nil