fix: api: fix EthSubscribe tipsets off by one (#11858)

Eth subscribe tipsets API should only return tipsets that have been executed.

We do this by only returning the parent tipset of the latest tipset received by ETH Subscribe from it's TipSetFilter subscription.

Closes #11807
Subsumes #11816
This commit is contained in:
Aarsh Shah 2024-04-10 00:32:02 +04:00 committed by GitHub
parent 8206954bb0
commit 6443afa2bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 56 additions and 4 deletions

View File

@ -137,6 +137,41 @@ func TestEthNewPendingTransactionFilter(t *testing.T) {
} }
} }
func TestEthNewHeadsSubSimple(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
kit.QuietAllLogsExcept("events", "messagepool")
client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.WithEthRPC())
ens.InterconnectAll().BeginMining(10 * time.Millisecond)
// install filter
subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(require.NoError))
require.NoError(err)
err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error {
rs := *resp
block, ok := rs.Result.(map[string]interface{})
require.True(ok)
blockNumber, ok := block["number"].(string)
require.True(ok)
blk, err := client.EthGetBlockByNumber(ctx, blockNumber, false)
require.NoError(err)
require.NotNil(blk)
fmt.Printf("block: %v\n", blk)
// block hashes should match
require.Equal(block["hash"], blk.Hash.String())
return nil
})
require.NoError(err)
time.Sleep(2 * time.Second)
}
func TestEthNewPendingTransactionSub(t *testing.T) { func TestEthNewPendingTransactionSub(t *testing.T) {
require := require.New(t) require := require.New(t)

View File

@ -254,6 +254,8 @@ type ethSubscription struct {
sendQueueLen int sendQueueLen int
toSend *queue.Queue[[]byte] toSend *queue.Queue[[]byte]
sendCond chan struct{} sendCond chan struct{}
lastSentTipset *types.TipSetKey
} }
func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) {
@ -341,12 +343,27 @@ func (e *ethSubscription) start(ctx context.Context) {
e.send(ctx, r) e.send(ctx, r)
} }
case *types.TipSet: case *types.TipSet:
ev, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI) // Skip processing for tipset at epoch 0 as it has no parent
if err != nil { if vt.Height() == 0 {
break continue
}
// Check if the parent has already been processed
parentTipSetKey := vt.Parents()
if e.lastSentTipset != nil && (*e.lastSentTipset) == parentTipSetKey {
continue
}
parentTipSet, loadErr := e.Chain.LoadTipSet(ctx, parentTipSetKey)
if loadErr != nil {
log.Warnw("failed to load parent tipset", "tipset", parentTipSetKey, "error", loadErr)
continue
}
ethBlock, ethBlockErr := newEthBlockFromFilecoinTipSet(ctx, parentTipSet, true, e.Chain, e.StateAPI)
if ethBlockErr != nil {
continue
} }
e.send(ctx, ev) e.send(ctx, ethBlock)
e.lastSentTipset = &parentTipSetKey
case *types.SignedMessage: // mpool txid case *types.SignedMessage: // mpool txid
evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt}) evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt})
if err != nil { if err != nil {