From e365135cdd776a9a311d23235e78c02fd001a8dd Mon Sep 17 00:00:00 2001 From: vyzo Date: Mon, 13 Mar 2023 16:51:21 +0200 Subject: [PATCH] lotus-shed tools for msgindex --- cmd/lotus-shed/main.go | 1 + cmd/lotus-shed/msgindex.go | 205 +++++++++++++++++++++++++++++++++++++ 2 files changed, 206 insertions(+) create mode 100644 cmd/lotus-shed/msgindex.go diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 19072dd71..736d874e3 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -83,6 +83,7 @@ func main() { invariantsCmd, gasTraceCmd, replayOfflineCmd, + msgindexCmd, } app := &cli.App{ diff --git a/cmd/lotus-shed/msgindex.go b/cmd/lotus-shed/msgindex.go new file mode 100644 index 000000000..c3f288422 --- /dev/null +++ b/cmd/lotus-shed/msgindex.go @@ -0,0 +1,205 @@ +package main + +import ( + "database/sql" + "fmt" + "path" + + _ "github.com/mattn/go-sqlite3" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/abi" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/ipfs/go-cid" +) + +var msgindexCmd = &cli.Command{ + Name: "msgindex", + Usage: "Tools for managing the message index", + Subcommands: []*cli.Command{ + msgindexBackfillCmd, + msgindexPruneCmd, + }, +} + +var msgindexBackfillCmd = &cli.Command{ + Name: "backfill", + Usage: "Backfill the message index for a number of epochs starting from a specified height", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "from", + Value: 0, + Usage: "height to start the backfill; uses the current head if omitted", + }, + &cli.IntFlag{ + Name: "epochs", + Value: 1800, + Usage: "number of epochs to backfill; defaults to 1800 (2 finalities)", + }, + &cli.StringFlag{ + Name: "repo", + Value: "~/.lotus", + Usage: "path to the repo", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + curTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + + startHeight := int64(cctx.Int("from")) + if startHeight == 0 { + startHeight = int64(curTs.Height()) - 1 + } + epochs := cctx.Int("epochs") + + dbPath := path.Join(cctx.String("repo"), "sqlite", "msgindex.db") + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + tx, err := db.Begin() + if err != nil { + return err + } + + insertStmt, err := tx.Prepare("INSERT INTO messages VALUES (?, ?, ?)") + insertMsg := func(cid, tsCid cid.Cid, epoch abi.ChainEpoch) error { + key := cid.String() + tskey := tsCid.String() + if _, err := insertStmt.Exec(key, tskey, int64(epoch)); err != nil { + return err + } + + return nil + } + rollback := func() { + if err := tx.Rollback(); err != nil { + fmt.Printf("ERROR: rollback: %s", err) + } + } + + for i := 0; i < epochs; i++ { + epoch := abi.ChainEpoch(startHeight - int64(i)) + + ts, err := api.ChainGetTipSetByHeight(ctx, epoch, curTs.Key()) + if err != nil { + rollback() + return err + } + + tsCid, err := ts.Key().Cid() + if err != nil { + rollback() + return err + } + + msgs, err := api.ChainGetMessagesInTipset(ctx, ts.Key()) + if err != nil { + rollback() + return err + } + + for _, msg := range msgs { + if err := insertMsg(msg.Cid, tsCid, epoch); err != nil { + rollback() + return err + } + } + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil + }, +} + +var msgindexPruneCmd = &cli.Command{ + Name: "prune", + Usage: "Prune the message index for messages included before a given epoch", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "from", + Usage: "height to start the prune; if negative it indicates epochs from current head", + }, + &cli.StringFlag{ + Name: "repo", + Value: "~/.lotus", + Usage: "path to the repo", + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + startHeight := int64(cctx.Int("from")) + if startHeight < 0 { + curTs, err := api.ChainHead(ctx) + if err != nil { + return err + } + + startHeight += int64(curTs.Height()) + + if startHeight < 0 { + return xerrors.Errorf("bogus start height %d", startHeight) + } + } + + dbPath := path.Join(cctx.String("repo"), "sqlite", "msgindex.db") + db, err := sql.Open("sqlite3", dbPath) + if err != nil { + return err + } + + defer func() { + err := db.Close() + if err != nil { + fmt.Printf("ERROR: closing db: %s", err) + } + }() + + tx, err := db.Begin() + if err != nil { + return err + } + + if _, err := tx.Exec("DELETE FROM messages WHERE epoch < ?", int64(startHeight)); err != nil { + if err := tx.Rollback(); err != nil { + fmt.Printf("ERROR: rollback: %s", err) + } + return err + } + + if err := tx.Commit(); err != nil { + return err + } + + return nil + }, +}