Merge pull request #10941 from filecoin-project/fix-msgindex-backfill
fix: improve perf of msgindex backfill
This commit is contained in:
commit
cc66654a6c
@ -37,7 +37,17 @@ var dbDefs = []string{
|
||||
)`,
|
||||
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
|
||||
}
|
||||
var dbPragmas = []string{}
|
||||
|
||||
var dbPragmas = []string{
|
||||
"PRAGMA synchronous = normal",
|
||||
"PRAGMA temp_store = memory",
|
||||
"PRAGMA mmap_size = 30000000000",
|
||||
"PRAGMA page_size = 32768",
|
||||
"PRAGMA auto_vacuum = NONE",
|
||||
"PRAGMA automatic_index = OFF",
|
||||
"PRAGMA journal_mode = WAL",
|
||||
"PRAGMA read_uncommitted = ON",
|
||||
}
|
||||
|
||||
const (
|
||||
// prepared stmts
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"path"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"github.com/urfave/cli/v2"
|
||||
@ -83,63 +82,50 @@ var msgindexBackfillCmd = &cli.Command{
|
||||
}
|
||||
}()
|
||||
|
||||
tx, err := db.Begin()
|
||||
insertStmt, err := db.Prepare("INSERT OR IGNORE INTO messages (cid, tipset_cid, epoch) VALUES (?, ?, ?)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
insertStmt, err := tx.Prepare("INSERT INTO messages VALUES (?, ?, ?)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
insertMsg := func(cid, tsCid cid.Cid, epoch abi.ChainEpoch) error {
|
||||
key := cid.String()
|
||||
tskey := tsCid.String()
|
||||
if _, err := insertStmt.Exec(key, tskey, int64(epoch)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
rollback := func() {
|
||||
if err := tx.Rollback(); err != nil {
|
||||
fmt.Printf("ERROR: rollback: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
var nrRowsAffected int64
|
||||
for i := 0; i < epochs; i++ {
|
||||
epoch := abi.ChainEpoch(startHeight - int64(i))
|
||||
|
||||
if i%100 == 0 {
|
||||
log.Infof("%d/%d processing epoch:%d, nrRowsAffected:%d", i, epochs, epoch, nrRowsAffected)
|
||||
}
|
||||
|
||||
ts, err := api.ChainGetTipSetByHeight(ctx, epoch, curTs.Key())
|
||||
if err != nil {
|
||||
rollback()
|
||||
return err
|
||||
return fmt.Errorf("failed to get tipset at epoch %d: %w", epoch, err)
|
||||
}
|
||||
|
||||
tsCid, err := ts.Key().Cid()
|
||||
if err != nil {
|
||||
rollback()
|
||||
return err
|
||||
return fmt.Errorf("failed to get tipset cid at epoch %d: %w", epoch, err)
|
||||
}
|
||||
|
||||
msgs, err := api.ChainGetMessagesInTipset(ctx, ts.Key())
|
||||
if err != nil {
|
||||
rollback()
|
||||
return err
|
||||
return fmt.Errorf("failed to get messages in tipset at epoch %d: %w", epoch, err)
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
if err := insertMsg(msg.Cid, tsCid, epoch); err != nil {
|
||||
rollback()
|
||||
return err
|
||||
key := msg.Cid.String()
|
||||
tskey := tsCid.String()
|
||||
res, err := insertStmt.Exec(key, tskey, int64(epoch))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert message cid %s in tipset %s at epoch %d: %w", key, tskey, epoch, err)
|
||||
}
|
||||
rowsAffected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get rows affected for message cid %s in tipset %s at epoch %d: %w", key, tskey, epoch, err)
|
||||
}
|
||||
nrRowsAffected += rowsAffected
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Done backfilling, nrRowsAffected:%d", nrRowsAffected)
|
||||
|
||||
return nil
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user