From 719215122bc3a64720a0d326b9e2fe8bbd5fe1f0 Mon Sep 17 00:00:00 2001 From: Fridrik Asmundsson Date: Mon, 24 Jul 2023 12:53:49 +0000 Subject: [PATCH] Address PR feedback and move each epoch inside own tx --- cmd/lotus-shed/indexes.go | 203 ++++++++++++++++++++------------------ 1 file changed, 106 insertions(+), 97 deletions(-) diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 15eb4f2c0..be7d43e05 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" + lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" @@ -106,23 +107,6 @@ var backfillEventsCmd = &cli.Command{ } }() - stmtSelectEvent, err := db.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false") - if err != nil { - return err - } - stmtSelectEntry, err := db.Prepare("SELECT EXISTS(SELECT 1 from event_entry WHERE event_id=? and indexed=? and flags=? and key=? and codec=? and value=?)") - if err != nil { - return err - } - stmtEvent, err := db.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)") - if err != nil { - return err - } - stmtEntry, err := db.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)") - if err != nil { - return err - } - addressLookups := make(map[abi.ActorID]address.Address) resolveFn := func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool) { @@ -156,34 +140,31 @@ var backfillEventsCmd = &cli.Command{ return b&(types.EventFlagIndexedKey|types.EventFlagIndexedValue) > 0 } - var eventsAffected int64 - var entriesAffected int64 - for i := 0; i < epochs; i++ { - select { - case <-ctx.Done(): - return nil - default: - } + var totalEventsAffected int64 + var totalEntriesAffected int64 - log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", i, currTs.Height(), eventsAffected, entriesAffected) - - blockCid := prevTs.Blocks()[0].Cid() - - // get messages for the parent of the previous tipset (which will be currTs) - msgs, err := api.ChainGetParentMessages(ctx, blockCid) + processHeight := func(ctx context.Context, cnt int, msgs []lapi.Message, receipts []*types.MessageReceipt) error { + tx, err := db.BeginTx(ctx, nil) if err != nil { - return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err) + return fmt.Errorf("failed to start transaction: %w", err) } + defer tx.Rollback() //nolint:errcheck - // get receipts for the parent of the previous tipset (which will be currTs) - receipts, err := api.ChainGetParentReceipts(ctx, blockCid) + stmtSelectEvent, err := tx.Prepare("SELECT MAX(id) from event WHERE height=? AND tipset_key=? and tipset_key_cid=? and emitter_addr=? and event_index=? and message_cid=? and message_index=? and reverted=false") if err != nil { - return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err) + return err + } + stmtEvent, err := tx.Prepare("INSERT INTO event (height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)") + if err != nil { + return err + } + stmtEntry, err := tx.Prepare("INSERT INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)") + if err != nil { + return err } - if len(msgs) != len(receipts) { - return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts)) - } + var eventsAffected int64 + var entriesAffected int64 // loop over each message receipt and backfill the events for idx, receipt := range receipts { @@ -234,75 +215,103 @@ var backfillEventsCmd = &cli.Command{ return fmt.Errorf("error checking if event exists: %w", err) } - if !entryID.Valid { - // event does not exist, lets backfill it - res, err := stmtEvent.Exec( - currTs.Height(), // height - currTs.Key().Bytes(), // tipset_key - tsKeyCid.Bytes(), // tipset_key_cid - addr.Bytes(), // emitter_addr - eventIdx, // event_index - msg.Cid.Bytes(), // message_cid - idx, // message_index - false, // reverted + // we already have this event + if entryID.Valid { + continue + } + + // event does not exist, lets backfill it + res, err := tx.Stmt(stmtEvent).Exec( + currTs.Height(), // height + currTs.Key().Bytes(), // tipset_key + tsKeyCid.Bytes(), // tipset_key_cid + addr.Bytes(), // emitter_addr + eventIdx, // event_index + msg.Cid.Bytes(), // message_cid + idx, // message_index + false, // reverted + ) + if err != nil { + return fmt.Errorf("error inserting event: %w", err) + } + + entryID.Int64, err = res.LastInsertId() + if err != nil { + return fmt.Errorf("could not get last insert id: %w", err) + } + + rowsAffected, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("could not get rows affected: %w", err) + } + eventsAffected += rowsAffected + + // backfill the event entries + for _, entry := range event.Entries { + _, err := tx.Stmt(stmtEntry).Exec( + entryID.Int64, // event_id + isIndexedValue(entry.Flags), // indexed + []byte{entry.Flags}, // flags + entry.Key, // key + entry.Codec, // codec + entry.Value, // value ) - if err != nil { - return fmt.Errorf("error inserting event: %w", err) - } - - entryID.Int64, err = res.LastInsertId() - if err != nil { - return fmt.Errorf("could not get last insert id: %w", err) + return fmt.Errorf("error inserting entry: %w", err) } rowsAffected, err := res.RowsAffected() if err != nil { - return fmt.Errorf("error getting rows affected: %s", err) - } - - eventsAffected += rowsAffected - } - - for _, entry := range event.Entries { - // check if entry exists - var exists bool - err = stmtSelectEntry.QueryRow( - entryID.Int64, - isIndexedValue(entry.Flags), - []byte{entry.Flags}, - entry.Key, - entry.Codec, - entry.Value, - ).Scan(&exists) - if err != nil { - return fmt.Errorf("error checking if entry exists: %w", err) - } - - if !exists { - // entry does not exist, lets backfill it - res, err := stmtEntry.Exec( - entryID.Int64, // event_id - isIndexedValue(entry.Flags), // indexed - []byte{entry.Flags}, // flags - entry.Key, // key - entry.Codec, // codec - entry.Value, // value - ) - if err != nil { - return fmt.Errorf("error inserting entry: %w", err) - } - - rowsAffected, err := res.RowsAffected() - if err != nil { - return fmt.Errorf("error getting rows affected: %s", err) - } - entriesAffected += rowsAffected + return fmt.Errorf("could not get rows affected: %w", err) } + entriesAffected += rowsAffected } } } + err = tx.Commit() + if err != nil { + return fmt.Errorf("failed to commit transaction: %w", err) + } + + log.Infof("[%d] backfilling actor events epoch:%d, eventsAffected:%d, entriesAffected:%d", cnt, currTs.Height(), eventsAffected, entriesAffected) + + totalEventsAffected += eventsAffected + totalEntriesAffected += entriesAffected + + return nil + } + + for i := 0; i < epochs; i++ { + select { + case <-ctx.Done(): + return nil + default: + } + + blockCid := prevTs.Blocks()[0].Cid() + + // get messages for the parent of the previous tipset (which will be currTs) + msgs, err := api.ChainGetParentMessages(ctx, blockCid) + if err != nil { + return fmt.Errorf("failed to get parent messages for block %s: %w", blockCid, err) + } + + // get receipts for the parent of the previous tipset (which will be currTs) + receipts, err := api.ChainGetParentReceipts(ctx, blockCid) + if err != nil { + return fmt.Errorf("failed to get parent receipts for block %s: %w", blockCid, err) + } + + if len(msgs) != len(receipts) { + return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(msgs), len(receipts)) + } + + err = processHeight(ctx, i, msgs, receipts) + if err != nil { + return err + } + // advance prevTs and currTs up the chain prevTs = currTs currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) @@ -311,7 +320,7 @@ var backfillEventsCmd = &cli.Command{ } } - log.Infof("backfilling events complete, eventsAffected:%d, entriesAffected:%d", eventsAffected, entriesAffected) + log.Infof("backfilling events complete, totalEventsAffected:%d, totalEntriesAffected:%d", totalEventsAffected, totalEntriesAffected) return nil },