From 6443afa2bb133f45a791a2277dd8cad88d6324bf Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Wed, 10 Apr 2024 00:32:02 +0400 Subject: [PATCH] fix: api: fix EthSubscribe tipsets off by one (#11858) Eth subscribe tipsets API should only return tipsets that have been executed. We do this by only returning the parent tipset of the latest tipset received by ETH Subscribe from it's TipSetFilter subscription. Closes #11807 Subsumes #11816 --- itests/eth_filter_test.go | 35 +++++++++++++++++++++++++++++++++++ node/impl/full/eth_events.go | 25 +++++++++++++++++++++---- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index 9212e60fc..d77a0ce14 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -137,6 +137,41 @@ func TestEthNewPendingTransactionFilter(t *testing.T) { } } +func TestEthNewHeadsSubSimple(t *testing.T) { + require := require.New(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + kit.QuietAllLogsExcept("events", "messagepool") + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.WithEthRPC()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // install filter + subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(require.NoError)) + require.NoError(err) + + err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + rs := *resp + block, ok := rs.Result.(map[string]interface{}) + require.True(ok) + blockNumber, ok := block["number"].(string) + require.True(ok) + + blk, err := client.EthGetBlockByNumber(ctx, blockNumber, false) + require.NoError(err) + require.NotNil(blk) + fmt.Printf("block: %v\n", blk) + // block hashes should match + require.Equal(block["hash"], blk.Hash.String()) + + return nil + }) + require.NoError(err) + time.Sleep(2 * time.Second) +} + func TestEthNewPendingTransactionSub(t *testing.T) { require := require.New(t) diff --git a/node/impl/full/eth_events.go b/node/impl/full/eth_events.go index 063590d8d..7baba1e81 100644 --- a/node/impl/full/eth_events.go +++ b/node/impl/full/eth_events.go @@ -254,6 +254,8 @@ type ethSubscription struct { sendQueueLen int toSend *queue.Queue[[]byte] sendCond chan struct{} + + lastSentTipset *types.TipSetKey } func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { @@ -341,12 +343,27 @@ func (e *ethSubscription) start(ctx context.Context) { e.send(ctx, r) } case *types.TipSet: - ev, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI) - if err != nil { - break + // Skip processing for tipset at epoch 0 as it has no parent + if vt.Height() == 0 { + continue + } + // Check if the parent has already been processed + parentTipSetKey := vt.Parents() + if e.lastSentTipset != nil && (*e.lastSentTipset) == parentTipSetKey { + continue + } + parentTipSet, loadErr := e.Chain.LoadTipSet(ctx, parentTipSetKey) + if loadErr != nil { + log.Warnw("failed to load parent tipset", "tipset", parentTipSetKey, "error", loadErr) + continue + } + ethBlock, ethBlockErr := newEthBlockFromFilecoinTipSet(ctx, parentTipSet, true, e.Chain, e.StateAPI) + if ethBlockErr != nil { + continue } - e.send(ctx, ev) + e.send(ctx, ethBlock) + e.lastSentTipset = &parentTipSetKey case *types.SignedMessage: // mpool txid evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt}) if err != nil {