From 6bc04c8060d59099f5d4eadf7b1ad716f8799b33 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 2 May 2024 10:54:38 +1000 Subject: [PATCH] feat: cli,events: speed up backfill with temporary index --- chain/events/filter/index.go | 2 ++ cmd/lotus-shed/indexes.go | 57 +++++++++++++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index 9bf7213c8..98cc54d20 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -32,6 +32,8 @@ var pragmas = []string{ "PRAGMA read_uncommitted = ON", } +// Any changes to this schema should be matched for the `lotus-shed indexes backfill-events` command + var ddls = []string{ `CREATE TABLE IF NOT EXISTS event ( id INTEGER PRIMARY KEY, diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 12ebe0082..ed10ab215 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -7,6 +7,7 @@ import ( "path" "path/filepath" "strings" + "time" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" @@ -54,6 +55,16 @@ var backfillEventsCmd = &cli.Command{ Value: 2000, Usage: "the number of epochs to backfill", }, + &cli.BoolFlag{ + Name: "temporary-index", + Value: false, + Usage: "use a temporary index to speed up the backfill process", + }, + &cli.BoolFlag{ + Name: "vacuum", + Value: false, + Usage: "run VACUUM on the database after backfilling is complete; this will reclaim space from deleted rows, but may take a long time", + }, }, Action: func(cctx *cli.Context) error { srv, err := lcli.GetFullNodeServices(cctx) @@ -92,8 +103,12 @@ var backfillEventsCmd = &cli.Command{ return err } + log.Infof( + "WARNING: If this command is run against a node that is currently collecting events with DisableHistoricFilterAPI=false, " + + "it may cause the node to fail to record recent events due to the need to obtain an exclusive lock on the database for writes.") + dbPath := path.Join(basePath, "sqlite", "events.db") - db, err := sql.Open("sqlite3", dbPath) + db, err := sql.Open("sqlite3", dbPath+"?_txlock=immediate") if err != nil { return err } @@ -105,6 +120,14 @@ var backfillEventsCmd = &cli.Command{ } }() + if cctx.Bool("temporary-index") { + log.Info("creating temporary index (tmp_event_backfill_index) on event table to speed up backfill") + _, err := db.Exec("CREATE INDEX IF NOT EXISTS tmp_event_backfill_index ON event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted);") + if err != nil { + return err + } + } + addressLookups := make(map[abi.ActorID]address.Address) // TODO: We don't need this address resolution anymore once https://github.com/filecoin-project/lotus/issues/11594 lands @@ -134,9 +157,19 @@ var backfillEventsCmd = &cli.Command{ var totalEntriesAffected int64 processHeight := func(ctx context.Context, cnt int, msgs []lapi.Message, receipts []*types.MessageReceipt) error { - tx, err := db.BeginTx(ctx, nil) - if err != nil { - return fmt.Errorf("failed to start transaction: %w", err) + var tx *sql.Tx + for { + var err error + tx, err = db.BeginTx(ctx, nil) + if err != nil { + if err.Error() == "database is locked" { + log.Warnf("database is locked, retrying in 200ms") + time.Sleep(200 * time.Millisecond) + continue + } + return err + } + break } defer tx.Rollback() //nolint:errcheck @@ -312,6 +345,22 @@ var backfillEventsCmd = &cli.Command{ log.Infof("backfilling events complete, totalEventsAffected:%d, totalEntriesAffected:%d", totalEventsAffected, totalEntriesAffected) + if cctx.Bool("temporary-index") { + log.Info("dropping temporary index (tmp_event_backfill_index) on event table") + _, err := db.Exec("DROP INDEX IF EXISTS tmp_event_backfill_index;") + if err != nil { + fmt.Printf("ERROR: dropping index: %s", err) + } + } + + if cctx.Bool("vacuum") { + log.Info("running VACUUM on the database") + _, err := db.Exec("VACUUM;") + if err != nil { + return fmt.Errorf("failed to run VACUUM on the database: %w", err) + } + } + return nil }, }