From 59640a8b224730a744f91fed03451532282131f3 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Thu, 23 Mar 2023 11:33:17 +0000 Subject: [PATCH] Populate the index on snapshot import Fixes: https://github.com/filecoin-project/lotus/issues/10537 --- chain/index/msgindex.go | 62 +++++++++++++++++++++++++++++++++++++++++ cmd/lotus/daemon.go | 7 +++++ 2 files changed, 69 insertions(+) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index d5c6a252e..71a0a8602 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -169,6 +169,68 @@ func NewMsgIndex(lctx context.Context, basePath string, cs ChainStore) (MsgIndex return msgIndex, nil } +func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) error { + err := os.MkdirAll(basePath, 0755) + if err != nil { + return xerrors.Errorf("error creating msgindex base directory: %w", err) + } + + dbPath := path.Join(basePath, dbName) + if _, err := os.Stat(dbPath); err == nil { + return xerrors.Errorf("msgindex already exists at %s", dbPath) + } + + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return xerrors.Errorf("error opening msgindex database: %w", err) + } + defer db.Close() + + if err := prepareDB(db); err != nil { + return xerrors.Errorf("error creating msgindex database: %w", err) + } + + insertStmt, err := db.Prepare(dbqInsertMessage) + if err != nil { + return xerrors.Errorf("prepare insertMsgStmt: %w", err) + } + defer insertStmt.Close() + + curTs := cs.GetHeaviestTipSet() + startHeight := curTs.Height() + for curTs != nil { + tscid, err := curTs.Key().Cid() + if err != nil { + return xerrors.Errorf("error computing tipset cid: %w", err) + } + + tskey := tscid.String() + epoch := int64(curTs.Height()) + + //log.Infof("epoch %d-%d, populating msgindex with tipset %s", curTs.Height(), startHeight-curTs.Height(), tskey) + + msgs, err := cs.MessagesForTipset(lctx, curTs) + if err != nil { + log.Infof("stopping import after %d tipsets", startHeight-curTs.Height()) + break + } + + for _, msg := range msgs { + key := msg.Cid().String() + if _, err := insertStmt.Exec(key, tskey, epoch); err != nil { + return xerrors.Errorf("error inserting message: %w", err) + } + } + + curTs, err = cs.GetTipSetFromKey(lctx, curTs.Parents()) + if err != nil { + return xerrors.Errorf("error walking chain: %w", err) + } + } + + return nil +} + // init utilities func prepareDB(db *sql.DB) error { for _, stmt := range dbDefs { diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 585d4b2ce..0fc96775d 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -13,6 +13,7 @@ import ( "io/ioutil" "net/http" "os" + "path" "runtime/pprof" "strings" @@ -558,5 +559,11 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) return err } + log.Info("populating message index...") + if err := index.PopulateAfterSnapshot(ctx, path.Join(lr.Path(), "sqlite"), cst); err != nil { + return err + } + log.Info("populating message index done") + return nil }