fix: ETH getLogs: fix slowness at head and ignore null blocks (#12207)

* fix get logs slowness and handling of null blocks
This commit is contained in:
Aarsh Shah 2024-07-11 15:16:34 +04:00
parent ff0e889b58
commit 44f5ce043f
3 changed files with 32 additions and 22 deletions

View File

@ -698,11 +698,12 @@ func (ei *EventIndex) GetMaxHeightInIndex(ctx context.Context) (uint64, error) {
return maxHeight, err return maxHeight, err
} }
func (ei *EventIndex) IsHeightProcessed(ctx context.Context, height uint64) (bool, error) { func (ei *EventIndex) IsHeightPast(ctx context.Context, height uint64) (bool, error) {
row := ei.stmtIsHeightProcessed.QueryRowContext(ctx, height) maxHeight, err := ei.GetMaxHeightInIndex(ctx)
var exists bool if err != nil {
err := row.Scan(&exists) return false, err
return exists, err }
return height <= maxHeight, nil
} }
func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) { func (ei *EventIndex) IsTipsetProcessed(ctx context.Context, tipsetKeyCid []byte) (bool, error) {

View File

@ -94,17 +94,17 @@ func TestEventIndexPrefillFilter(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(14000), mh) require.Equal(t, uint64(14000), mh)
b, err := ei.IsHeightProcessed(context.Background(), 14000) b, err := ei.IsHeightPast(context.Background(), 14000)
require.NoError(t, err) require.NoError(t, err)
require.True(t, b) require.True(t, b)
b, err = ei.IsHeightProcessed(context.Background(), 14001) b, err = ei.IsHeightPast(context.Background(), 14001)
require.NoError(t, err) require.NoError(t, err)
require.False(t, b) require.False(t, b)
b, err = ei.IsHeightProcessed(context.Background(), 13000) b, err = ei.IsHeightPast(context.Background(), 13000)
require.NoError(t, err) require.NoError(t, err)
require.False(t, b) require.True(t, b)
tsKey := events14000.msgTs.Key() tsKey := events14000.msgTs.Key()
tsKeyCid, err := tsKey.Cid() tsKeyCid, err := tsKey.Cid()

View File

@ -255,7 +255,6 @@ func (a *EthModule) EthGetBlockByNumber(ctx context.Context, blkParam string, fu
func (a *EthModule) EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) { func (a *EthModule) EthGetTransactionByHash(ctx context.Context, txHash *ethtypes.EthHash) (*ethtypes.EthTx, error) {
return a.EthGetTransactionByHashLimited(ctx, txHash, api.LookbackNoLimit) return a.EthGetTransactionByHashLimited(ctx, txHash, api.LookbackNoLimit)
} }
func (a *EthModule) EthGetTransactionByHashLimited(ctx context.Context, txHash *ethtypes.EthHash, limit abi.ChainEpoch) (*ethtypes.EthTx, error) { func (a *EthModule) EthGetTransactionByHashLimited(ctx context.Context, txHash *ethtypes.EthHash, limit abi.ChainEpoch) (*ethtypes.EthTx, error) {
@ -1276,9 +1275,17 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
if pf.tipsetCid == cid.Undef { if pf.tipsetCid == cid.Undef {
maxHeight := pf.maxHeight maxHeight := pf.maxHeight
if maxHeight == -1 { if maxHeight == -1 {
maxHeight = e.Chain.GetHeaviestTipSet().Height() // heaviest tipset doesn't have events because its messages haven't been executed yet
maxHeight = e.Chain.GetHeaviestTipSet().Height() - 1
} }
if maxHeight > e.Chain.GetHeaviestTipSet().Height() {
if maxHeight < 0 {
return nil, xerrors.Errorf("maxHeight requested is less than 0")
}
// we can't return events for the heaviest tipset as the transactions in that tipset will be executed
// in the next non null tipset (because of Filecoin's "deferred execution" model)
if maxHeight > e.Chain.GetHeaviestTipSet().Height()-1 {
return nil, xerrors.Errorf("maxHeight requested is greater than the heaviest tipset") return nil, xerrors.Errorf("maxHeight requested is greater than the heaviest tipset")
} }
@ -1286,13 +1293,13 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
if err != nil { if err != nil {
return nil, err return nil, err
} }
// TODO: Ideally we should also check that events for the epoch at `pf.minheight` have been indexed
// should also have the minHeight in the filter indexed // However, it is currently tricky to check/guarantee this for two reasons:
if b, err := e.EventFilterManager.EventIndex.IsHeightProcessed(ctx, uint64(pf.minHeight)); err != nil { // a) Event Index is not aware of null-blocks. This means that the Event Index wont be able to say whether the block at
return nil, xerrors.Errorf("failed to check if event index has events for the minHeight: %w", err) // `pf.minheight` is a null block or whether it has no events
} else if !b { // b) There can be holes in the index where events at certain epoch simply haven't been indexed because of edge cases around
return nil, xerrors.Errorf("event index does not have event for epoch %d", pf.minHeight) // node restarts while indexing. This needs a long term "auto-repair"/"automated-backfilling" implementation in the index
} // So, for now, the best we can do is ensure that the event index has evenets for events at height >= `pf.maxHeight`
} else { } else {
ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid) ts, err := e.Chain.GetTipSetByCid(ctx, pf.tipsetCid)
if err != nil { if err != nil {
@ -1324,6 +1331,8 @@ func (e *EthEventHandler) EthGetLogs(ctx context.Context, filterSpec *ethtypes.E
return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI) return ethFilterResultFromEvents(ctx, ces, e.SubManager.StateAPI)
} }
// note that we can have null blocks at the given height and the event Index is not null block aware
// so, what we do here is wait till we see the event index contain a block at a height greater than the given height
func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error { func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi.ChainEpoch) error {
ei := e.EventFilterManager.EventIndex ei := e.EventFilterManager.EventIndex
if height > e.Chain.GetHeaviestTipSet().Height() { if height > e.Chain.GetHeaviestTipSet().Height() {
@ -1334,7 +1343,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
defer cancel() defer cancel()
// if the height we're interested in has already been indexed -> there's nothing to do here // if the height we're interested in has already been indexed -> there's nothing to do here
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err) return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b { } else if b {
return nil return nil
@ -1345,7 +1354,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
defer unSubscribeF() defer unSubscribeF()
// it could be that the event index was update while the subscription was being processed -> check if index has what we need now // it could be that the event index was update while the subscription was being processed -> check if index has what we need now
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err) return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b { } else if b {
return nil return nil
@ -1354,7 +1363,7 @@ func (e *EthEventHandler) waitForHeightProcessed(ctx context.Context, height abi
for { for {
select { select {
case <-subCh: case <-subCh:
if b, err := ei.IsHeightProcessed(ctx, uint64(height)); err != nil { if b, err := ei.IsHeightPast(ctx, uint64(height)); err != nil {
return xerrors.Errorf("failed to check if event index has events for given height: %w", err) return xerrors.Errorf("failed to check if event index has events for given height: %w", err)
} else if b { } else if b {
return nil return nil