From df6dfdf8a984bf0ad8a65927f54c5107af4b2e1b Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 13 Mar 2023 05:51:24 +0200 Subject: [PATCH] refactor database - drop the execution index; we don't need it - it is inclusion tipset - use MessagesForTipset - hoist db sql stuffs on top for clarity - add index for tipset on messages --- chain/index/interface.go | 6 +- chain/index/msgindex.go | 130 ++++++++++++++++++++------------------- 2 files changed, 70 insertions(+), 66 deletions(-) diff --git a/chain/index/interface.go b/chain/index/interface.go index d212eba88..ff46ecad7 100644 --- a/chain/index/interface.go +++ b/chain/index/interface.go @@ -16,12 +16,10 @@ var ErrClosed = errors.New("index closed") type MsgInfo struct { // the message this record refers to Message cid.Cid - // the tipset where this messages was executed + // the tipset where this messages was included TipSet cid.Cid - // the epoch whre this message was executed + // the epoch whre this message was included Epoch abi.ChainEpoch - // the canonical execution order of the message in the tipset - Index int } // MsgIndex is the interface to the message index diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 78add0f79..d2686133c 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -23,9 +23,36 @@ import ( var log = logging.Logger("msgindex") -var ( - dbName = "msgindex.db" +var dbName = "msgindex.db" +var dbDefs = []string{ + `CREATE TABLE IF NOT EXISTS messages ( + cid VARCHAR(80) PRIMARY KEY, + tipset_cid VARCHAR(80) NOT NULL, + epoch INTEGER NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid) + `, + `CREATE TABLE IF NOT EXISTS _meta ( + version UINT64 NOT NULL UNIQUE + )`, + `INSERT OR IGNORE INTO _meta (version) VALUES (1)`, +} +var dbPragmas = []string{} +const ( + // prepared stmts + dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?" + dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)" + dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?" + // reconciliation + dbqCountMessages = "SELECT COUNT(*) FROM messages" + dbqMinEpoch = "SELECT MIN(epoch) FROM messages" + dbqCountTipsetMessages = "SELECT COUNT(*) FROM messages WHERE tipset_cid = ?" + dbqDeleteMessagesByEpoch = "DELETE FROM messages WHERE epoch >= ?" +) + +// coalescer configuration (TODO: use observer instead) +var ( coalesceMinDelay = 100 * time.Millisecond coalesceMaxDelay = time.Second coalesceMergeInterval = 100 * time.Millisecond @@ -35,7 +62,7 @@ var ( // but this simplifies unit testing. type ChainStore interface { SubscribeHeadChanges(f store.ReorgNotifee) - MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) + MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) GetHeaviestTipSet() *types.TipSet GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) } @@ -69,8 +96,8 @@ type headChange struct { func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) { var ( - mkdb bool dbPath string + exists bool err error ) @@ -82,8 +109,10 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) { dbPath = path.Join(basePath, dbName) _, err = os.Stat(dbPath) switch { + case err == nil: + exists = true + case errors.Is(err, fs.ErrNotExist): - mkdb = true case err != nil: return nil, xerrors.Errorf("error stating msgindex database: %w", err) @@ -96,16 +125,13 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) { return nil, xerrors.Errorf("error opening msgindex database: %w", err) } - if mkdb { - err = createTables(db) - if err != nil { - return nil, xerrors.Errorf("error creating msgindex database: %w", err) - } + if err := prepareDB(db); err != nil { + return nil, xerrors.Errorf("error creating msgindex database: %w", err) + } - // TODO we may consider populating the index in this case. - } else { - err = reconcileIndex(db, cs) - if err != nil { + // TODO we may consider populating the index when first creating the db + if exists { + if err := reconcileIndex(db, cs); err != nil { return nil, xerrors.Errorf("error reconciling msgindex database: %w", err) } } @@ -144,13 +170,19 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) { } // init utilities -func createTables(db *sql.DB) error { - // Just a single table for now; ghetto, but this an index so we denormalize to avoid joins. - if _, err := db.Exec("CREATE TABLE Messages (cid VARCHAR(80) PRIMARY KEY, tipset VARCHAR(80), xepoch INTEGER, xindex INTEGER)"); err != nil { - return err +func prepareDB(db *sql.DB) error { + for _, stmt := range dbDefs { + if _, err := db.Exec(stmt); err != nil { + return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err) + } + } + + for _, stmt := range dbPragmas { + if _, err := db.Exec(stmt); err != nil { + return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err) + } } - // TODO Should we add an index for tipset to speed up deletion on revert? return nil } @@ -166,7 +198,7 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error { // 5. If the walk ends in the boundary epoch, then delete everything. // - row := db.QueryRow("SELECT COUNT(*) FROM Messages") + row := db.QueryRow(dbqCountMessages) var result int64 if err := row.Scan(&result); err != nil { @@ -177,14 +209,14 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error { return nil } - row = db.QueryRow("SELECT MIN(xepoch) FROM Messages") + row = db.QueryRow(dbqMinEpoch) if err := row.Scan(&result); err != nil { return xerrors.Errorf("error finding boundary epoch: %w", err) } boundaryEpoch := abi.ChainEpoch(result) - countMsgsStmt, err := db.Prepare("SELECT COUNT(*) FROM Messages WHERE tipset = ?") + countMsgsStmt, err := db.Prepare(dbqCountTipsetMessages) if err != nil { return xerrors.Errorf("error preparing statement: %w", err) } @@ -217,7 +249,7 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error { } // delete everything above the minEpoch - if _, err = db.Exec("DELETE FROM Messages WHERE xepoch >= ?", int64(boundaryEpoch)); err != nil { + if _, err = db.Exec(dbqDeleteMessagesByEpoch, int64(boundaryEpoch)); err != nil { return xerrors.Errorf("error deleting stale reorged out message: %w", err) } @@ -225,19 +257,19 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error { } func (x *msgIndex) prepareStatements() error { - stmt, err := x.db.Prepare("SELECT tipset, xepoch, xindex FROM Messages WHERE cid = ?") + stmt, err := x.db.Prepare(dbqGetMessageInfo) if err != nil { return xerrors.Errorf("prepare selectMsgStmt: %w", err) } x.selectMsgStmt = stmt - stmt, err = x.db.Prepare("INSERT INTO Messages VALUES (?, ?, ?, ?)") + stmt, err = x.db.Prepare(dbqInsertMessage) if err != nil { return xerrors.Errorf("prepare insertMsgStmt: %w", err) } x.insertMsgStmt = stmt - stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?") + stmt, err = x.db.Prepare(dbqDeleteTipsetMessages) if err != nil { return xerrors.Errorf("prepare deleteTipSetStmt: %w", err) } @@ -335,42 +367,18 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er } tskey := tscid.String() - xepoch := int64(ts.Height()) - var xindex int64 + epoch := int64(ts.Height()) - seen := make(map[string]struct{}) - insert := func(key string) error { - if _, ok := seen[key]; ok { - return nil - } - - if _, err := tx.Stmt(x.insertMsgStmt).Exec(key, tskey, xepoch, xindex); err != nil { - return err - } - seen[key] = struct{}{} - xindex++ - - return nil + msgs, err := x.cs.MessagesForTipset(ctx, ts) + if err != nil { + return xerrors.Errorf("error retrieving messages for tipset %s: %w", ts, err) } - for _, blk := range ts.Blocks() { - bmsgs, smsgs, err := x.cs.MessagesForBlock(ctx, blk) - if err != nil { - return xerrors.Errorf("error retrieving messages for block %s in %s: %w", blk.Cid(), ts, err) - } - - for _, m := range bmsgs { - key := m.Cid().String() - if err := insert(key); err != nil { - return err - } - } - - for _, m := range smsgs { - key := m.Cid().String() - if err := insert(key); err != nil { - return err - } + insertStmt := tx.Stmt(x.insertMsgStmt) + for _, msg := range msgs { + key := msg.Cid().String() + if _, err := insertStmt.Exec(key, tskey, epoch); err != nil { + return xerrors.Errorf("error inserting message: %w", err) } } @@ -389,12 +397,11 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { var ( tipset string epoch int64 - index int64 ) key := m.String() row := x.selectMsgStmt.QueryRow(key) - err := row.Scan(&tipset, &epoch, &index) + err := row.Scan(&tipset, &epoch) switch { case err == sql.ErrNoRows: return MsgInfo{}, ErrNotFound @@ -412,7 +419,6 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) { Message: m, TipSet: tipsetCid, Epoch: abi.ChainEpoch(epoch), - Index: int(index), }, nil }