diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index ea8124df2..59e0e6337 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -608,19 +608,13 @@ func TestEthSubscribeLogs(t *testing.T) { var elogs []*ethtypes.EthLog for resp := range responseCh { - rlist, ok := resp.Result.([]interface{}) - require.True(ok, "expected subscription result to be []interface{}, but was %T", resp.Result) + rmap, ok := resp.Result.(map[string]interface{}) + require.True(ok, "expected subscription result entry to be map[string]interface{}, but was %T", resp.Result) - for _, rentry := range rlist { - rmap, ok := rentry.(map[string]interface{}) - require.True(ok, "expected subscription result entry to be map[string]interface{}, but was %T", resp.Result) - - elog, err := ParseEthLog(rmap) - require.NoError(err) - - elogs = append(elogs, elog) - } + elog, err := ParseEthLog(rmap) + require.NoError(err) + elogs = append(elogs, elog) } AssertEthLogs(t, elogs, tc.expected, messages) }) @@ -2012,23 +2006,16 @@ func AssertEthLogs(t *testing.T, actual []*ethtypes.EthLog, expected []ExpectedE func parseEthLogsFromSubscriptionResponses(subResponses []ethtypes.EthSubscriptionResponse) ([]*ethtypes.EthLog, error) { elogs := make([]*ethtypes.EthLog, 0, len(subResponses)) for i := range subResponses { - rlist, ok := subResponses[i].Result.([]interface{}) + rmap, ok := subResponses[i].Result.(map[string]interface{}) if !ok { - return nil, xerrors.Errorf("expected subscription result to be []interface{}, but was %T", subResponses[i].Result) + return nil, xerrors.Errorf("expected subscription result entry to be map[string]interface{}, but was %T", subResponses[i].Result) } - for _, r := range rlist { - rmap, ok := r.(map[string]interface{}) - if !ok { - return nil, xerrors.Errorf("expected subscription result entry to be map[string]interface{}, but was %T", r) - } - - elog, err := ParseEthLog(rmap) - if err != nil { - return nil, err - } - elogs = append(elogs, elog) + elog, err := ParseEthLog(rmap) + if err != nil { + return nil, err } + elogs = append(elogs, elog) } return elogs, nil diff --git a/node/impl/full/eth.go b/node/impl/full/eth.go index dde87d1b2..e4557a363 100644 --- a/node/impl/full/eth.go +++ b/node/impl/full/eth.go @@ -1517,45 +1517,50 @@ func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { e.filters = append(e.filters, f) } +func (e *ethSubscription) send(ctx context.Context, v interface{}) { + resp := ethtypes.EthSubscriptionResponse{ + SubscriptionID: e.id, + Result: v, + } + + outParam, err := json.Marshal(resp) + if err != nil { + log.Warnw("marshaling subscription response", "sub", e.id, "error", err) + return + } + + if err := e.out(ctx, outParam); err != nil { + log.Warnw("sending subscription response", "sub", e.id, "error", err) + return + } +} + func (e *ethSubscription) start(ctx context.Context) { for { select { case <-ctx.Done(): return case v := <-e.in: - resp := ethtypes.EthSubscriptionResponse{ - SubscriptionID: e.id, - } - - var err error switch vt := v.(type) { case *filter.CollectedEvent: - resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt}, e.StateAPI) + evs, err := ethFilterResultFromEvents([]*filter.CollectedEvent{vt}, e.StateAPI) + if err != nil { + continue + } + + for _, r := range evs.Results { + e.send(ctx, r) + } case *types.TipSet: - eb, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI) + ev, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI) if err != nil { break } - resp.Result = eb + e.send(ctx, ev) default: log.Warnf("unexpected subscription value type: %T", vt) } - - if err != nil { - continue - } - - outParam, err := json.Marshal(resp) - if err != nil { - log.Warnw("marshaling subscription response", "sub", e.id, "error", err) - continue - } - - if err := e.out(ctx, outParam); err != nil { - log.Warnw("sending subscription response", "sub", e.id, "error", err) - continue - } } } }