From 61ca075d1a849f20092a9a5b64d4b7ab040ded89 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Thu, 6 Apr 2023 12:45:22 +0200 Subject: [PATCH] Include execution tipset in `GetMsgInfo` This commit updates the GetMsgInfo to include the execution tipset of the message (if it exists). This allows us to call GetTipSetByCid instead of GetTipsetByHeight which should be considerably faster and help speed up both StateSearcMessage and StateWaitForMessage calls. --- chain/index/interface.go | 9 +++-- chain/index/msgindex.go | 77 ++++++++++++++++++++++++------------ chain/index/msgindex_test.go | 4 +- chain/stmgr/searchwait.go | 18 ++++++--- 4 files changed, 71 insertions(+), 37 deletions(-) diff --git a/chain/index/interface.go b/chain/index/interface.go index f875a94bf..239abf8fa 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -24,18 +24,19 @@ type MsgInfo struct { // MsgIndex is the interface to the message index type MsgIndex interface { - // GetMsgInfo retrieves the message metadata through the index. + // GetMsgInfo looks up the message in index and retrieves its metadata and execution + // tipset cid. // The lookup is done using the onchain message Cid; that is the signed message Cid // for SECP messages and unsigned message Cid for BLS messages. - GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) + GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) // Close closes the index Close() error } type dummyMsgIndex struct{} -func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { - return MsgInfo{}, ErrNotFound +func (dummyMsgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) { + return MsgInfo{}, cid.Undef, ErrNotFound } func (dummyMsgIndex) Close() error { diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 39ba487f2..5d6c5cd88 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -30,8 +30,8 @@ var dbDefs = []string{ tipset_cid VARCHAR(80) NOT NULL, epoch INTEGER NOT NULL )`, - `CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid) - `, + `CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)`, + `CREATE INDEX IF NOT EXISTS tipset_epochs ON messages (epoch)`, `CREATE TABLE IF NOT EXISTS _meta ( version UINT64 NOT NULL UNIQUE )`, @@ -44,6 +44,8 @@ const ( dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?" dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)" dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?" + dbqGetTipsetByEpoch = "SELECT tipset_cid FROM messages WHERE epoch = ? LIMIT 1" + // reconciliation dbqCountMessages = "SELECT COUNT(*) FROM messages" dbqMinEpoch = "SELECT MIN(epoch) FROM messages" @@ -75,6 +77,7 @@ type msgIndex struct { db *sql.DB selectMsgStmt *sql.Stmt + selectTipsetStmt *sql.Stmt insertMsgStmt *sql.Stmt deleteTipSetStmt *sql.Stmt @@ -350,6 +353,12 @@ func (x *msgIndex) prepareStatements() error { } x.selectMsgStmt = stmt + stmt, err = x.db.Prepare(dbqGetTipsetByEpoch) + if err != nil { + return xerrors.Errorf("prepare selectTipsetStmt: %w", err) + } + x.selectTipsetStmt = stmt + stmt, err = x.db.Prepare(dbqInsertMessage) if err != nil { return xerrors.Errorf("prepare insertMsgStmt: %w", err) @@ -485,40 +494,56 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er } // interface -func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { +func (x *msgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) { x.closeLk.RLock() defer x.closeLk.RUnlock() if x.closed { - return MsgInfo{}, ErrClosed + return MsgInfo{}, cid.Undef, ErrClosed } - var ( - tipset string - epoch int64 - ) - - key := m.String() - row := x.selectMsgStmt.QueryRow(key) - err := row.Scan(&tipset, &epoch) - switch { - case err == sql.ErrNoRows: - return MsgInfo{}, ErrNotFound - - case err != nil: - return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err) - } - - tipsetCid, err := cid.Decode(tipset) + // fetch message from index (if it exists) + // + var tipset string + var epoch int64 + err := x.selectMsgStmt.QueryRow(mCid.String()).Scan(&tipset, &epoch) if err != nil { - return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err) + if err == sql.ErrNoRows { + // mCid not in index, its fine + return MsgInfo{}, cid.Undef, ErrNotFound + } + + return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying msgindex database: %w", err) + } + tsCid, err := cid.Decode(tipset) + if err != nil { + return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding tipset cid: %w", err) } - return MsgInfo{ - Message: m, - TipSet: tipsetCid, + msgInfo := MsgInfo{ + Message: mCid, + TipSet: tsCid, Epoch: abi.ChainEpoch(epoch), - }, nil + } + + // fetch execution tipset of message (if it exists) + // + var xTipset string + err = x.selectTipsetStmt.QueryRow(epoch + 1).Scan(&xTipset) + if err != nil { + if err == sql.ErrNoRows { + // execution tipset not in index, its fine + return msgInfo, cid.Undef, nil + } + + return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying for execution tipset: %w", err) + } + xtsCid, err := cid.Decode(xTipset) + if err != nil { + return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding execution tipset cid: %w", err) + } + + return msgInfo, xtsCid, nil } func (x *msgIndex) Close() error { diff --git a/chain/index/msgindex_test.go b/chain/index/msgindex_test.go index 24f9b845f..11095855e 100644 --- a/chain/index/msgindex_test.go +++ b/chain/index/msgindex_test.go @@ -157,7 +157,7 @@ func verifyIndex(t *testing.T, cs *mockChainStore, msgIndex MsgIndex) { msgs, err := cs.MessagesForTipset(context.Background(), ts) require.NoError(t, err) for _, m := range msgs { - minfo, err := msgIndex.GetMsgInfo(context.Background(), m.Cid()) + minfo, _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid()) require.NoError(t, err) require.Equal(t, tsCid, minfo.TipSet) require.Equal(t, ts.Height(), minfo.Epoch) @@ -174,7 +174,7 @@ func verifyMissing(t *testing.T, cs *mockChainStore, msgIndex MsgIndex, missing msgs, err := cs.MessagesForTipset(context.Background(), ts) require.NoError(t, err) for _, m := range msgs { - _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid()) + _, _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid()) require.Equal(t, ErrNotFound, err) } } diff --git a/chain/stmgr/searchwait.go b/chain/stmgr/searchwait.go index 356ace23c..2749d3682 100644 --- a/chain/stmgr/searchwait.go +++ b/chain/stmgr/searchwait.go @@ -190,7 +190,7 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet } func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) { - minfo, err := sm.msgIndex.GetMsgInfo(ctx, mcid) + minfo, xtsCid, err := sm.msgIndex.GetMsgInfo(ctx, mcid) if err != nil { return nil, nil, cid.Undef, xerrors.Errorf("error looking up message in index: %w", err) } @@ -203,10 +203,18 @@ func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m } // now get the execution tipset - // TODO optimization: the index should have it implicitly so we can return it in the msginfo. - xts, err := sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false) - if err != nil { - return nil, nil, cid.Undef, xerrors.Errorf("error looking up execution tipset: %w", err) + var xts *types.TipSet = nil + if xtsCid != cid.Undef { + // lookup by cid which is faster + xts, err = sm.cs.GetTipSetByCid(ctx, xtsCid) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipSetByCid: %w", err) + } + } else { + xts, err = sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false) + if err != nil { + return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipsetByHeight: %w", err) + } } // check that the parent of the execution index is indeed the inclusion tipset