206 lines
4.1 KiB
Go
206 lines
4.1 KiB
Go
|
package main
|
||
|
|
||
|
import (
|
||
|
"database/sql"
|
||
|
"fmt"
|
||
|
"path"
|
||
|
|
||
|
_ "github.com/mattn/go-sqlite3"
|
||
|
"github.com/urfave/cli/v2"
|
||
|
"golang.org/x/xerrors"
|
||
|
|
||
|
"github.com/filecoin-project/go-state-types/abi"
|
||
|
lcli "github.com/filecoin-project/lotus/cli"
|
||
|
"github.com/ipfs/go-cid"
|
||
|
)
|
||
|
|
||
|
var msgindexCmd = &cli.Command{
|
||
|
Name: "msgindex",
|
||
|
Usage: "Tools for managing the message index",
|
||
|
Subcommands: []*cli.Command{
|
||
|
msgindexBackfillCmd,
|
||
|
msgindexPruneCmd,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
var msgindexBackfillCmd = &cli.Command{
|
||
|
Name: "backfill",
|
||
|
Usage: "Backfill the message index for a number of epochs starting from a specified height",
|
||
|
Flags: []cli.Flag{
|
||
|
&cli.IntFlag{
|
||
|
Name: "from",
|
||
|
Value: 0,
|
||
|
Usage: "height to start the backfill; uses the current head if omitted",
|
||
|
},
|
||
|
&cli.IntFlag{
|
||
|
Name: "epochs",
|
||
|
Value: 1800,
|
||
|
Usage: "number of epochs to backfill; defaults to 1800 (2 finalities)",
|
||
|
},
|
||
|
&cli.StringFlag{
|
||
|
Name: "repo",
|
||
|
Value: "~/.lotus",
|
||
|
Usage: "path to the repo",
|
||
|
},
|
||
|
},
|
||
|
Action: func(cctx *cli.Context) error {
|
||
|
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
defer closer()
|
||
|
ctx := lcli.ReqContext(cctx)
|
||
|
|
||
|
curTs, err := api.ChainHead(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
startHeight := int64(cctx.Int("from"))
|
||
|
if startHeight == 0 {
|
||
|
startHeight = int64(curTs.Height()) - 1
|
||
|
}
|
||
|
epochs := cctx.Int("epochs")
|
||
|
|
||
|
dbPath := path.Join(cctx.String("repo"), "sqlite", "msgindex.db")
|
||
|
db, err := sql.Open("sqlite3", dbPath)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
err := db.Close()
|
||
|
if err != nil {
|
||
|
fmt.Printf("ERROR: closing db: %s", err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
tx, err := db.Begin()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
insertStmt, err := tx.Prepare("INSERT INTO messages VALUES (?, ?, ?)")
|
||
|
insertMsg := func(cid, tsCid cid.Cid, epoch abi.ChainEpoch) error {
|
||
|
key := cid.String()
|
||
|
tskey := tsCid.String()
|
||
|
if _, err := insertStmt.Exec(key, tskey, int64(epoch)); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
rollback := func() {
|
||
|
if err := tx.Rollback(); err != nil {
|
||
|
fmt.Printf("ERROR: rollback: %s", err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
for i := 0; i < epochs; i++ {
|
||
|
epoch := abi.ChainEpoch(startHeight - int64(i))
|
||
|
|
||
|
ts, err := api.ChainGetTipSetByHeight(ctx, epoch, curTs.Key())
|
||
|
if err != nil {
|
||
|
rollback()
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
tsCid, err := ts.Key().Cid()
|
||
|
if err != nil {
|
||
|
rollback()
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
msgs, err := api.ChainGetMessagesInTipset(ctx, ts.Key())
|
||
|
if err != nil {
|
||
|
rollback()
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
for _, msg := range msgs {
|
||
|
if err := insertMsg(msg.Cid, tsCid, epoch); err != nil {
|
||
|
rollback()
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := tx.Commit(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
},
|
||
|
}
|
||
|
|
||
|
var msgindexPruneCmd = &cli.Command{
|
||
|
Name: "prune",
|
||
|
Usage: "Prune the message index for messages included before a given epoch",
|
||
|
Flags: []cli.Flag{
|
||
|
&cli.IntFlag{
|
||
|
Name: "from",
|
||
|
Usage: "height to start the prune; if negative it indicates epochs from current head",
|
||
|
},
|
||
|
&cli.StringFlag{
|
||
|
Name: "repo",
|
||
|
Value: "~/.lotus",
|
||
|
Usage: "path to the repo",
|
||
|
},
|
||
|
},
|
||
|
Action: func(cctx *cli.Context) error {
|
||
|
api, closer, err := lcli.GetFullNodeAPI(cctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
defer closer()
|
||
|
ctx := lcli.ReqContext(cctx)
|
||
|
|
||
|
startHeight := int64(cctx.Int("from"))
|
||
|
if startHeight < 0 {
|
||
|
curTs, err := api.ChainHead(ctx)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
startHeight += int64(curTs.Height())
|
||
|
|
||
|
if startHeight < 0 {
|
||
|
return xerrors.Errorf("bogus start height %d", startHeight)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
dbPath := path.Join(cctx.String("repo"), "sqlite", "msgindex.db")
|
||
|
db, err := sql.Open("sqlite3", dbPath)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
err := db.Close()
|
||
|
if err != nil {
|
||
|
fmt.Printf("ERROR: closing db: %s", err)
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
tx, err := db.Begin()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if _, err := tx.Exec("DELETE FROM messages WHERE epoch < ?", int64(startHeight)); err != nil {
|
||
|
if err := tx.Rollback(); err != nil {
|
||
|
fmt.Printf("ERROR: rollback: %s", err)
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if err := tx.Commit(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
},
|
||
|
}
|