diff --git a/chain/index/interface.go b/chain/index/interface.go index 3cb6794b9..a6c07d4be 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -2,19 +2,22 @@ package index import ( "context" + "errors" "github.com/filecoin-project/go-state-types/abi" "github.com/ipfs/go-cid" ) +var ErrNotFound = errors.New("message not found") + // MsgInfo is the Message metadata the index tracks. type MsgInfo struct { // the message this record refers to Message cid.Cid + // the tipset where this messages was executed + Tipset cid.Cid // the epoch whre this message was executed Epoch abi.ChainEpoch - // the tipset where this messages executed - Tipset cid.Cid // the canonical execution order of the message in the tipset Index int } diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index a48d6deef..07ff75869 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -9,13 +9,14 @@ import ( "path" "time" - "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" _ "github.com/mattn/go-sqlite3" "golang.org/x/xerrors" + "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" ) // chain store interface; we could use store.ChainStore directly, @@ -29,7 +30,9 @@ var _ ChainStore = (*store.ChainStore)(nil) type msgIndex struct { cs ChainStore - db *sql.DB + db *sql.DB + selectMsgStmt *sql.Stmt + deleteTipSetStmt *sql.Stmt } var _ MsgIndex = (*msgIndex)(nil) @@ -117,8 +120,13 @@ opendb: // init utilities func createTables(db *sql.DB) error { - // TODO - return errors.New("TODO: index.createTables") + // Just a single table for now; ghetto, but this an index so we denormalize to avoid joins. + if _, err := db.Exec("CREATE TABLE Messages (cid VARCHAR(80) PRIMARY KEY, tipset VARCHAR(80), xepoch INTEGER, xindex INTEGER)"); err != nil { + return err + } + + // TODO Should we add an index for tipset to speed up deletion on revert? + return nil } func reconcileIndex(db *sql.DB, cs ChainStore) error { @@ -127,8 +135,20 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error { } func (x *msgIndex) prepareStatements() error { - // TODO - return errors.New("TODO: msgIndex.prepareStatements") + stmt, err := x.db.Prepare("SELECT (tipset, xepoch, xindex) FROM Messages WHERE cid = ?") + if err != nil { + return err + } + x.selectMsgStmt = stmt + + stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?") + if err != nil { + return err + } + x.deleteTipSetStmt = stmt + + // TODO reconciliation stmts + return nil } // head change notifee @@ -139,8 +159,34 @@ func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error { // interface func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { - // TODO - return MsgInfo{}, errors.New("TODO: msgIndex.GetMsgInfo") + var ( + tipset string + epoch int64 + index int64 + ) + + key := m.String() + row := x.selectMsgStmt.QueryRow(key) + err := row.Scan(&tipset, &epoch, &index) + switch { + case err == sql.ErrNoRows: + return MsgInfo{}, ErrNotFound + + case err != nil: + return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err) + } + + tipsetCid, err := cid.Decode(tipset) + if err != nil { + return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err) + } + + return MsgInfo{ + Message: m, + Tipset: tipsetCid, + Epoch: abi.ChainEpoch(epoch), + Index: int(index), + }, nil } func (x *msgIndex) Close() error {