From 59640a8b224730a744f91fed03451532282131f3 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Thu, 23 Mar 2023 11:33:17 +0000 Subject: [PATCH 1/3] Populate the index on snapshot import Fixes: https://github.com/filecoin-project/lotus/issues/10537 --- chain/index/msgindex.go | 62 +++++++++++++++++++++++++++++++++++++++++ cmd/lotus/daemon.go | 7 +++++ 2 files changed, 69 insertions(+) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index d5c6a252e..71a0a8602 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -169,6 +169,68 @@ func NewMsgIndex(lctx context.Context, basePath string, cs ChainStore) (MsgIndex return msgIndex, nil } +func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) error { + err := os.MkdirAll(basePath, 0755) + if err != nil { + return xerrors.Errorf("error creating msgindex base directory: %w", err) + } + + dbPath := path.Join(basePath, dbName) + if _, err := os.Stat(dbPath); err == nil { + return xerrors.Errorf("msgindex already exists at %s", dbPath) + } + + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return xerrors.Errorf("error opening msgindex database: %w", err) + } + defer db.Close() + + if err := prepareDB(db); err != nil { + return xerrors.Errorf("error creating msgindex database: %w", err) + } + + insertStmt, err := db.Prepare(dbqInsertMessage) + if err != nil { + return xerrors.Errorf("prepare insertMsgStmt: %w", err) + } + defer insertStmt.Close() + + curTs := cs.GetHeaviestTipSet() + startHeight := curTs.Height() + for curTs != nil { + tscid, err := curTs.Key().Cid() + if err != nil { + return xerrors.Errorf("error computing tipset cid: %w", err) + } + + tskey := tscid.String() + epoch := int64(curTs.Height()) + + //log.Infof("epoch %d-%d, populating msgindex with tipset %s", curTs.Height(), startHeight-curTs.Height(), tskey) + + msgs, err := cs.MessagesForTipset(lctx, curTs) + if err != nil { + log.Infof("stopping import after %d tipsets", startHeight-curTs.Height()) + break + } + + for _, msg := range msgs { + key := msg.Cid().String() + if _, err := insertStmt.Exec(key, tskey, epoch); err != nil { + return xerrors.Errorf("error inserting message: %w", err) + } + } + + curTs, err = cs.GetTipSetFromKey(lctx, curTs.Parents()) + if err != nil { + return xerrors.Errorf("error walking chain: %w", err) + } + } + + return nil +} + // init utilities func prepareDB(db *sql.DB) error { for _, stmt := range dbDefs { diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 585d4b2ce..0fc96775d 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -13,6 +13,7 @@ import ( "io/ioutil" "net/http" "os" + "path" "runtime/pprof" "strings" @@ -558,5 +559,11 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) return err } + log.Info("populating message index...") + if err := index.PopulateAfterSnapshot(ctx, path.Join(lr.Path(), "sqlite"), cst); err != nil { + return err + } + log.Info("populating message index done") + return nil } From 48c57d394be4c80f3db63793bdb85fa8b68cbd22 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Fri, 24 Mar 2023 15:36:31 +0000 Subject: [PATCH 2/3] Improve performance when populating message indax --- chain/index/msgindex.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 71a0a8602..477738338 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -190,9 +190,20 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) return xerrors.Errorf("error creating msgindex database: %w", err) } - insertStmt, err := db.Prepare(dbqInsertMessage) + tx, err := db.Begin() if err != nil { - return xerrors.Errorf("prepare insertMsgStmt: %w", err) + return xerrors.Errorf("error when starting transaction: %w", err) + } + + rollback := func() { + if err := tx.Rollback(); err != nil { + log.Errorf("error in rollback: %s", err) + } + } + + insertStmt, err := tx.Prepare(dbqInsertMessage) + if err != nil { + return xerrors.Errorf("error preparing insertMsgStmt: %w", err) } defer insertStmt.Close() @@ -201,14 +212,13 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) for curTs != nil { tscid, err := curTs.Key().Cid() if err != nil { + rollback() return xerrors.Errorf("error computing tipset cid: %w", err) } tskey := tscid.String() epoch := int64(curTs.Height()) - //log.Infof("epoch %d-%d, populating msgindex with tipset %s", curTs.Height(), startHeight-curTs.Height(), tskey) - msgs, err := cs.MessagesForTipset(lctx, curTs) if err != nil { log.Infof("stopping import after %d tipsets", startHeight-curTs.Height()) @@ -218,16 +228,23 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) for _, msg := range msgs { key := msg.Cid().String() if _, err := insertStmt.Exec(key, tskey, epoch); err != nil { + rollback() return xerrors.Errorf("error inserting message: %w", err) } } curTs, err = cs.GetTipSetFromKey(lctx, curTs.Parents()) if err != nil { + rollback() return xerrors.Errorf("error walking chain: %w", err) } } + err = tx.Commit() + if err != nil { + return xerrors.Errorf("error commiting transaction: %w", err) + } + return nil } From b8137f6b4dbd5239d78c5cfd5fcf0b8eba5ed62a Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Fri, 24 Mar 2023 16:44:26 +0000 Subject: [PATCH 3/3] Delete existing message index when loading from snapshot --- chain/index/msgindex.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 477738338..89c2bdc9f 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -176,8 +176,12 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) } dbPath := path.Join(basePath, dbName) + + // if a database already exists, we try to delete it and create a new one if _, err := os.Stat(dbPath); err == nil { - return xerrors.Errorf("msgindex already exists at %s", dbPath) + if err = os.Remove(dbPath); err != nil { + return xerrors.Errorf("msgindex already exists at %s and can't be deleted", dbPath) + } } db, err := sql.Open("sqlite3", dbPath)