diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index d5c6a252e..39ba487f2 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -169,6 +169,93 @@ func NewMsgIndex(lctx context.Context, basePath string, cs ChainStore) (MsgIndex return msgIndex, nil } +func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) error { + err := os.MkdirAll(basePath, 0755) + if err != nil { + return xerrors.Errorf("error creating msgindex base directory: %w", err) + } + + dbPath := path.Join(basePath, dbName) + + // if a database already exists, we try to delete it and create a new one + if _, err := os.Stat(dbPath); err == nil { + if err = os.Remove(dbPath); err != nil { + return xerrors.Errorf("msgindex already exists at %s and can't be deleted", dbPath) + } + } + + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return xerrors.Errorf("error opening msgindex database: %w", err) + } + defer func() { + if err := db.Close(); err != nil { + log.Errorf("error closing msgindex database: %s", err) + } + }() + + if err := prepareDB(db); err != nil { + return xerrors.Errorf("error creating msgindex database: %w", err) + } + + tx, err := db.Begin() + if err != nil { + return xerrors.Errorf("error when starting transaction: %w", err) + } + + rollback := func() { + if err := tx.Rollback(); err != nil { + log.Errorf("error in rollback: %s", err) + } + } + + insertStmt, err := tx.Prepare(dbqInsertMessage) + if err != nil { + rollback() + return xerrors.Errorf("error preparing insertStmt: %w", err) + } + + curTs := cs.GetHeaviestTipSet() + startHeight := curTs.Height() + for curTs != nil { + tscid, err := curTs.Key().Cid() + if err != nil { + rollback() + return xerrors.Errorf("error computing tipset cid: %w", err) + } + + tskey := tscid.String() + epoch := int64(curTs.Height()) + + msgs, err := cs.MessagesForTipset(lctx, curTs) + if err != nil { + log.Infof("stopping import after %d tipsets", startHeight-curTs.Height()) + break + } + + for _, msg := range msgs { + key := msg.Cid().String() + if _, err := insertStmt.Exec(key, tskey, epoch); err != nil { + rollback() + return xerrors.Errorf("error inserting message: %w", err) + } + } + + curTs, err = cs.GetTipSetFromKey(lctx, curTs.Parents()) + if err != nil { + rollback() + return xerrors.Errorf("error walking chain: %w", err) + } + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("error committing transaction: %w", err) + } + + return nil +} + // init utilities func prepareDB(db *sql.DB) error { for _, stmt := range dbDefs { diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 585d4b2ce..704d9b470 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -13,6 +13,7 @@ import ( "io/ioutil" "net/http" "os" + "path" "runtime/pprof" "strings" @@ -47,6 +48,7 @@ import ( "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/testing" @@ -558,5 +560,23 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) return err } + // populate the message index if user has EnableMsgIndex enabled + // + c, err := lr.Config() + if err != nil { + return err + } + cfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + if cfg.Index.EnableMsgIndex { + log.Info("populating message index...") + if err := index.PopulateAfterSnapshot(ctx, path.Join(lr.Path(), "sqlite"), cst); err != nil { + return err + } + log.Info("populating message index done") + } + return nil }