From 59640a8b224730a744f91fed03451532282131f3 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Thu, 23 Mar 2023 11:33:17 +0000 Subject: [PATCH 1/6] Populate the index on snapshot import Fixes: https://github.com/filecoin-project/lotus/issues/10537 --- chain/index/msgindex.go | 62 +++++++++++++++++++++++++++++++++++++++++ cmd/lotus/daemon.go | 7 +++++ 2 files changed, 69 insertions(+) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index d5c6a252e..71a0a8602 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -169,6 +169,68 @@ func NewMsgIndex(lctx context.Context, basePath string, cs ChainStore) (MsgIndex return msgIndex, nil } +func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) error { + err := os.MkdirAll(basePath, 0755) + if err != nil { + return xerrors.Errorf("error creating msgindex base directory: %w", err) + } + + dbPath := path.Join(basePath, dbName) + if _, err := os.Stat(dbPath); err == nil { + return xerrors.Errorf("msgindex already exists at %s", dbPath) + } + + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return xerrors.Errorf("error opening msgindex database: %w", err) + } + defer db.Close() + + if err := prepareDB(db); err != nil { + return xerrors.Errorf("error creating msgindex database: %w", err) + } + + insertStmt, err := db.Prepare(dbqInsertMessage) + if err != nil { + return xerrors.Errorf("prepare insertMsgStmt: %w", err) + } + defer insertStmt.Close() + + curTs := cs.GetHeaviestTipSet() + startHeight := curTs.Height() + for curTs != nil { + tscid, err := curTs.Key().Cid() + if err != nil { + return xerrors.Errorf("error computing tipset cid: %w", err) + } + + tskey := tscid.String() + epoch := int64(curTs.Height()) + + //log.Infof("epoch %d-%d, populating msgindex with tipset %s", curTs.Height(), startHeight-curTs.Height(), tskey) + + msgs, err := cs.MessagesForTipset(lctx, curTs) + if err != nil { + log.Infof("stopping import after %d tipsets", startHeight-curTs.Height()) + break + } + + for _, msg := range msgs { + key := msg.Cid().String() + if _, err := insertStmt.Exec(key, tskey, epoch); err != nil { + return xerrors.Errorf("error inserting message: %w", err) + } + } + + curTs, err = cs.GetTipSetFromKey(lctx, curTs.Parents()) + if err != nil { + return xerrors.Errorf("error walking chain: %w", err) + } + } + + return nil +} + // init utilities func prepareDB(db *sql.DB) error { for _, stmt := range dbDefs { diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 585d4b2ce..0fc96775d 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -13,6 +13,7 @@ import ( "io/ioutil" "net/http" "os" + "path" "runtime/pprof" "strings" @@ -558,5 +559,11 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) return err } + log.Info("populating message index...") + if err := index.PopulateAfterSnapshot(ctx, path.Join(lr.Path(), "sqlite"), cst); err != nil { + return err + } + log.Info("populating message index done") + return nil } From 48c57d394be4c80f3db63793bdb85fa8b68cbd22 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Fri, 24 Mar 2023 15:36:31 +0000 Subject: [PATCH 2/6] Improve performance when populating message indax --- chain/index/msgindex.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 71a0a8602..477738338 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -190,9 +190,20 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) return xerrors.Errorf("error creating msgindex database: %w", err) } - insertStmt, err := db.Prepare(dbqInsertMessage) + tx, err := db.Begin() if err != nil { - return xerrors.Errorf("prepare insertMsgStmt: %w", err) + return xerrors.Errorf("error when starting transaction: %w", err) + } + + rollback := func() { + if err := tx.Rollback(); err != nil { + log.Errorf("error in rollback: %s", err) + } + } + + insertStmt, err := tx.Prepare(dbqInsertMessage) + if err != nil { + return xerrors.Errorf("error preparing insertMsgStmt: %w", err) } defer insertStmt.Close() @@ -201,14 +212,13 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) for curTs != nil { tscid, err := curTs.Key().Cid() if err != nil { + rollback() return xerrors.Errorf("error computing tipset cid: %w", err) } tskey := tscid.String() epoch := int64(curTs.Height()) - //log.Infof("epoch %d-%d, populating msgindex with tipset %s", curTs.Height(), startHeight-curTs.Height(), tskey) - msgs, err := cs.MessagesForTipset(lctx, curTs) if err != nil { log.Infof("stopping import after %d tipsets", startHeight-curTs.Height()) @@ -218,16 +228,23 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) for _, msg := range msgs { key := msg.Cid().String() if _, err := insertStmt.Exec(key, tskey, epoch); err != nil { + rollback() return xerrors.Errorf("error inserting message: %w", err) } } curTs, err = cs.GetTipSetFromKey(lctx, curTs.Parents()) if err != nil { + rollback() return xerrors.Errorf("error walking chain: %w", err) } } + err = tx.Commit() + if err != nil { + return xerrors.Errorf("error commiting transaction: %w", err) + } + return nil } From b8137f6b4dbd5239d78c5cfd5fcf0b8eba5ed62a Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Fri, 24 Mar 2023 16:44:26 +0000 Subject: [PATCH 3/6] Delete existing message index when loading from snapshot --- chain/index/msgindex.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 477738338..89c2bdc9f 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -176,8 +176,12 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) } dbPath := path.Join(basePath, dbName) + + // if a database already exists, we try to delete it and create a new one if _, err := os.Stat(dbPath); err == nil { - return xerrors.Errorf("msgindex already exists at %s", dbPath) + if err = os.Remove(dbPath); err != nil { + return xerrors.Errorf("msgindex already exists at %s and can't be deleted", dbPath) + } } db, err := sql.Open("sqlite3", dbPath) From 90171c8babee8c6003aa85b00aec2f3f3780342c Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Sat, 25 Mar 2023 11:17:54 +0000 Subject: [PATCH 4/6] Addressing lint errors --- chain/index/msgindex.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 89c2bdc9f..42a536c16 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -188,7 +188,11 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) if err != nil { return xerrors.Errorf("error opening msgindex database: %w", err) } - defer db.Close() + defer func() { + if err := db.Close(); err != nil { + log.Errorf("error closing msgindex database: %s", err) + } + }() if err := prepareDB(db); err != nil { return xerrors.Errorf("error creating msgindex database: %w", err) @@ -207,9 +211,13 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) insertStmt, err := tx.Prepare(dbqInsertMessage) if err != nil { - return xerrors.Errorf("error preparing insertMsgStmt: %w", err) + return xerrors.Errorf("error preparing insertStmt: %w", err) } - defer insertStmt.Close() + defer func() { + if err := insertStmt.Close(); err != nil { + log.Errorf("error closing insert statement: %s", err) + } + }() curTs := cs.GetHeaviestTipSet() startHeight := curTs.Height() @@ -246,7 +254,7 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) err = tx.Commit() if err != nil { - return xerrors.Errorf("error commiting transaction: %w", err) + return xerrors.Errorf("error committing transaction: %w", err) } return nil From 83e2408f819451eb59efdfe6cea15b0aa010d4f6 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 28 Mar 2023 16:28:47 +0200 Subject: [PATCH 5/6] Only populate message index if config EnableMsgIndex is set --- cmd/lotus/daemon.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 0fc96775d..704d9b470 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -48,6 +48,7 @@ import ( "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/testing" @@ -559,11 +560,23 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool) return err } - log.Info("populating message index...") - if err := index.PopulateAfterSnapshot(ctx, path.Join(lr.Path(), "sqlite"), cst); err != nil { + // populate the message index if user has EnableMsgIndex enabled + // + c, err := lr.Config() + if err != nil { return err } - log.Info("populating message index done") + cfg, ok := c.(*config.FullNode) + if !ok { + return xerrors.Errorf("invalid config for repo, got: %T", c) + } + if cfg.Index.EnableMsgIndex { + log.Info("populating message index...") + if err := index.PopulateAfterSnapshot(ctx, path.Join(lr.Path(), "sqlite"), cst); err != nil { + return err + } + log.Info("populating message index done") + } return nil } From ecd13079e7347ee9bf59fb186af675674db51d3f Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Tue, 28 Mar 2023 17:08:53 +0200 Subject: [PATCH 6/6] Address review comments --- chain/index/msgindex.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index 42a536c16..39ba487f2 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -211,13 +211,9 @@ func PopulateAfterSnapshot(lctx context.Context, basePath string, cs ChainStore) insertStmt, err := tx.Prepare(dbqInsertMessage) if err != nil { + rollback() return xerrors.Errorf("error preparing insertStmt: %w", err) } - defer func() { - if err := insertStmt.Close(); err != nil { - log.Errorf("error closing insert statement: %s", err) - } - }() curTs := cs.GetHeaviestTipSet() startHeight := curTs.Height()