Merge pull request #10211 from filecoin-project/fix/ethsub-noarray
fix: ethrpc: Don't send sub notifs in array
This commit is contained in:
commit
030b40b2a0
@ -608,11 +608,7 @@ func TestEthSubscribeLogs(t *testing.T) {
|
|||||||
|
|
||||||
var elogs []*ethtypes.EthLog
|
var elogs []*ethtypes.EthLog
|
||||||
for resp := range responseCh {
|
for resp := range responseCh {
|
||||||
rlist, ok := resp.Result.([]interface{})
|
rmap, ok := resp.Result.(map[string]interface{})
|
||||||
require.True(ok, "expected subscription result to be []interface{}, but was %T", resp.Result)
|
|
||||||
|
|
||||||
for _, rentry := range rlist {
|
|
||||||
rmap, ok := rentry.(map[string]interface{})
|
|
||||||
require.True(ok, "expected subscription result entry to be map[string]interface{}, but was %T", resp.Result)
|
require.True(ok, "expected subscription result entry to be map[string]interface{}, but was %T", resp.Result)
|
||||||
|
|
||||||
elog, err := ParseEthLog(rmap)
|
elog, err := ParseEthLog(rmap)
|
||||||
@ -620,8 +616,6 @@ func TestEthSubscribeLogs(t *testing.T) {
|
|||||||
|
|
||||||
elogs = append(elogs, elog)
|
elogs = append(elogs, elog)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
|
||||||
AssertEthLogs(t, elogs, tc.expected, messages)
|
AssertEthLogs(t, elogs, tc.expected, messages)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -2012,15 +2006,9 @@ func AssertEthLogs(t *testing.T, actual []*ethtypes.EthLog, expected []ExpectedE
|
|||||||
func parseEthLogsFromSubscriptionResponses(subResponses []ethtypes.EthSubscriptionResponse) ([]*ethtypes.EthLog, error) {
|
func parseEthLogsFromSubscriptionResponses(subResponses []ethtypes.EthSubscriptionResponse) ([]*ethtypes.EthLog, error) {
|
||||||
elogs := make([]*ethtypes.EthLog, 0, len(subResponses))
|
elogs := make([]*ethtypes.EthLog, 0, len(subResponses))
|
||||||
for i := range subResponses {
|
for i := range subResponses {
|
||||||
rlist, ok := subResponses[i].Result.([]interface{})
|
rmap, ok := subResponses[i].Result.(map[string]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, xerrors.Errorf("expected subscription result to be []interface{}, but was %T", subResponses[i].Result)
|
return nil, xerrors.Errorf("expected subscription result entry to be map[string]interface{}, but was %T", subResponses[i].Result)
|
||||||
}
|
|
||||||
|
|
||||||
for _, r := range rlist {
|
|
||||||
rmap, ok := r.(map[string]interface{})
|
|
||||||
if !ok {
|
|
||||||
return nil, xerrors.Errorf("expected subscription result entry to be map[string]interface{}, but was %T", r)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
elog, err := ParseEthLog(rmap)
|
elog, err := ParseEthLog(rmap)
|
||||||
@ -2029,7 +2017,6 @@ func parseEthLogsFromSubscriptionResponses(subResponses []ethtypes.EthSubscripti
|
|||||||
}
|
}
|
||||||
elogs = append(elogs, elog)
|
elogs = append(elogs, elog)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return elogs, nil
|
return elogs, nil
|
||||||
}
|
}
|
||||||
|
@ -1517,45 +1517,50 @@ func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) {
|
|||||||
e.filters = append(e.filters, f)
|
e.filters = append(e.filters, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e *ethSubscription) send(ctx context.Context, v interface{}) {
|
||||||
|
resp := ethtypes.EthSubscriptionResponse{
|
||||||
|
SubscriptionID: e.id,
|
||||||
|
Result: v,
|
||||||
|
}
|
||||||
|
|
||||||
|
outParam, err := json.Marshal(resp)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("marshaling subscription response", "sub", e.id, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.out(ctx, outParam); err != nil {
|
||||||
|
log.Warnw("sending subscription response", "sub", e.id, "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (e *ethSubscription) start(ctx context.Context) {
|
func (e *ethSubscription) start(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case v := <-e.in:
|
case v := <-e.in:
|
||||||
resp := ethtypes.EthSubscriptionResponse{
|
|
||||||
SubscriptionID: e.id,
|
|
||||||
}
|
|
||||||
|
|
||||||
var err error
|
|
||||||
switch vt := v.(type) {
|
switch vt := v.(type) {
|
||||||
case *filter.CollectedEvent:
|
case *filter.CollectedEvent:
|
||||||
resp.Result, err = ethFilterResultFromEvents([]*filter.CollectedEvent{vt}, e.StateAPI)
|
evs, err := ethFilterResultFromEvents([]*filter.CollectedEvent{vt}, e.StateAPI)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, r := range evs.Results {
|
||||||
|
e.send(ctx, r)
|
||||||
|
}
|
||||||
case *types.TipSet:
|
case *types.TipSet:
|
||||||
eb, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI)
|
ev, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
resp.Result = eb
|
e.send(ctx, ev)
|
||||||
default:
|
default:
|
||||||
log.Warnf("unexpected subscription value type: %T", vt)
|
log.Warnf("unexpected subscription value type: %T", vt)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
outParam, err := json.Marshal(resp)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnw("marshaling subscription response", "sub", e.id, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := e.out(ctx, outParam); err != nil {
|
|
||||||
log.Warnw("sending subscription response", "sub", e.id, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user