Include execution tipset in GetMsgInfo
This commit updates the GetMsgInfo to include the execution tipset of the message (if it exists). This allows us to call GetTipSetByCid instead of GetTipsetByHeight which should be considerably faster and help speed up both StateSearcMessage and StateWaitForMessage calls.
This commit is contained in:
parent
9d44c88cbb
commit
61ca075d1a
@ -24,18 +24,19 @@ type MsgInfo struct {
|
||||
|
||||
// MsgIndex is the interface to the message index
|
||||
type MsgIndex interface {
|
||||
// GetMsgInfo retrieves the message metadata through the index.
|
||||
// GetMsgInfo looks up the message in index and retrieves its metadata and execution
|
||||
// tipset cid.
|
||||
// The lookup is done using the onchain message Cid; that is the signed message Cid
|
||||
// for SECP messages and unsigned message Cid for BLS messages.
|
||||
GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error)
|
||||
GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error)
|
||||
// Close closes the index
|
||||
Close() error
|
||||
}
|
||||
|
||||
type dummyMsgIndex struct{}
|
||||
|
||||
func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
||||
return MsgInfo{}, ErrNotFound
|
||||
func (dummyMsgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) {
|
||||
return MsgInfo{}, cid.Undef, ErrNotFound
|
||||
}
|
||||
|
||||
func (dummyMsgIndex) Close() error {
|
||||
|
||||
@ -30,8 +30,8 @@ var dbDefs = []string{
|
||||
tipset_cid VARCHAR(80) NOT NULL,
|
||||
epoch INTEGER NOT NULL
|
||||
)`,
|
||||
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)
|
||||
`,
|
||||
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)`,
|
||||
`CREATE INDEX IF NOT EXISTS tipset_epochs ON messages (epoch)`,
|
||||
`CREATE TABLE IF NOT EXISTS _meta (
|
||||
version UINT64 NOT NULL UNIQUE
|
||||
)`,
|
||||
@ -44,6 +44,8 @@ const (
|
||||
dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?"
|
||||
dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)"
|
||||
dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?"
|
||||
dbqGetTipsetByEpoch = "SELECT tipset_cid FROM messages WHERE epoch = ? LIMIT 1"
|
||||
|
||||
// reconciliation
|
||||
dbqCountMessages = "SELECT COUNT(*) FROM messages"
|
||||
dbqMinEpoch = "SELECT MIN(epoch) FROM messages"
|
||||
@ -75,6 +77,7 @@ type msgIndex struct {
|
||||
|
||||
db *sql.DB
|
||||
selectMsgStmt *sql.Stmt
|
||||
selectTipsetStmt *sql.Stmt
|
||||
insertMsgStmt *sql.Stmt
|
||||
deleteTipSetStmt *sql.Stmt
|
||||
|
||||
@ -350,6 +353,12 @@ func (x *msgIndex) prepareStatements() error {
|
||||
}
|
||||
x.selectMsgStmt = stmt
|
||||
|
||||
stmt, err = x.db.Prepare(dbqGetTipsetByEpoch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("prepare selectTipsetStmt: %w", err)
|
||||
}
|
||||
x.selectTipsetStmt = stmt
|
||||
|
||||
stmt, err = x.db.Prepare(dbqInsertMessage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("prepare insertMsgStmt: %w", err)
|
||||
@ -485,40 +494,56 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er
|
||||
}
|
||||
|
||||
// interface
|
||||
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
||||
func (x *msgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) {
|
||||
x.closeLk.RLock()
|
||||
defer x.closeLk.RUnlock()
|
||||
|
||||
if x.closed {
|
||||
return MsgInfo{}, ErrClosed
|
||||
return MsgInfo{}, cid.Undef, ErrClosed
|
||||
}
|
||||
|
||||
var (
|
||||
tipset string
|
||||
epoch int64
|
||||
)
|
||||
|
||||
key := m.String()
|
||||
row := x.selectMsgStmt.QueryRow(key)
|
||||
err := row.Scan(&tipset, &epoch)
|
||||
switch {
|
||||
case err == sql.ErrNoRows:
|
||||
return MsgInfo{}, ErrNotFound
|
||||
|
||||
case err != nil:
|
||||
return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err)
|
||||
}
|
||||
|
||||
tipsetCid, err := cid.Decode(tipset)
|
||||
// fetch message from index (if it exists)
|
||||
//
|
||||
var tipset string
|
||||
var epoch int64
|
||||
err := x.selectMsgStmt.QueryRow(mCid.String()).Scan(&tipset, &epoch)
|
||||
if err != nil {
|
||||
return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err)
|
||||
if err == sql.ErrNoRows {
|
||||
// mCid not in index, its fine
|
||||
return MsgInfo{}, cid.Undef, ErrNotFound
|
||||
}
|
||||
|
||||
return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying msgindex database: %w", err)
|
||||
}
|
||||
tsCid, err := cid.Decode(tipset)
|
||||
if err != nil {
|
||||
return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding tipset cid: %w", err)
|
||||
}
|
||||
|
||||
return MsgInfo{
|
||||
Message: m,
|
||||
TipSet: tipsetCid,
|
||||
msgInfo := MsgInfo{
|
||||
Message: mCid,
|
||||
TipSet: tsCid,
|
||||
Epoch: abi.ChainEpoch(epoch),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// fetch execution tipset of message (if it exists)
|
||||
//
|
||||
var xTipset string
|
||||
err = x.selectTipsetStmt.QueryRow(epoch + 1).Scan(&xTipset)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
// execution tipset not in index, its fine
|
||||
return msgInfo, cid.Undef, nil
|
||||
}
|
||||
|
||||
return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying for execution tipset: %w", err)
|
||||
}
|
||||
xtsCid, err := cid.Decode(xTipset)
|
||||
if err != nil {
|
||||
return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding execution tipset cid: %w", err)
|
||||
}
|
||||
|
||||
return msgInfo, xtsCid, nil
|
||||
}
|
||||
|
||||
func (x *msgIndex) Close() error {
|
||||
|
||||
@ -157,7 +157,7 @@ func verifyIndex(t *testing.T, cs *mockChainStore, msgIndex MsgIndex) {
|
||||
msgs, err := cs.MessagesForTipset(context.Background(), ts)
|
||||
require.NoError(t, err)
|
||||
for _, m := range msgs {
|
||||
minfo, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||
minfo, _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tsCid, minfo.TipSet)
|
||||
require.Equal(t, ts.Height(), minfo.Epoch)
|
||||
@ -174,7 +174,7 @@ func verifyMissing(t *testing.T, cs *mockChainStore, msgIndex MsgIndex, missing
|
||||
msgs, err := cs.MessagesForTipset(context.Background(), ts)
|
||||
require.NoError(t, err)
|
||||
for _, m := range msgs {
|
||||
_, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||
_, _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||
require.Equal(t, ErrNotFound, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -190,7 +190,7 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet
|
||||
}
|
||||
|
||||
func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
|
||||
minfo, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
|
||||
minfo, xtsCid, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
|
||||
if err != nil {
|
||||
return nil, nil, cid.Undef, xerrors.Errorf("error looking up message in index: %w", err)
|
||||
}
|
||||
@ -203,10 +203,18 @@ func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m
|
||||
}
|
||||
|
||||
// now get the execution tipset
|
||||
// TODO optimization: the index should have it implicitly so we can return it in the msginfo.
|
||||
xts, err := sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false)
|
||||
if err != nil {
|
||||
return nil, nil, cid.Undef, xerrors.Errorf("error looking up execution tipset: %w", err)
|
||||
var xts *types.TipSet = nil
|
||||
if xtsCid != cid.Undef {
|
||||
// lookup by cid which is faster
|
||||
xts, err = sm.cs.GetTipSetByCid(ctx, xtsCid)
|
||||
if err != nil {
|
||||
return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipSetByCid: %w", err)
|
||||
}
|
||||
} else {
|
||||
xts, err = sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false)
|
||||
if err != nil {
|
||||
return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipsetByHeight: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// check that the parent of the execution index is indeed the inclusion tipset
|
||||
|
||||
Loading…
Reference in New Issue
Block a user