diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index ff5ddbdd9..704145b89 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -35,6 +35,8 @@ var ( type ChainStore interface { SubscribeHeadChanges(f store.ReorgNotifee) MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) + GetHeaviestTipSet() *types.TipSet + GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) } var _ ChainStore = (*store.ChainStore)(nil) @@ -103,6 +105,8 @@ opendb: if err != nil { return nil, xerrors.Errorf("error creating msgindex database: %w", err) } + + // TODO we may consider populating the index in this case. } else { err = reconcileIndex(db, cs) if err != nil { @@ -154,8 +158,73 @@ func createTables(db *sql.DB) error { } func reconcileIndex(db *sql.DB, cs ChainStore) error { - // TODO - return errors.New("TODO: index.reconcileIndex") + // Invariant: after reconciliation, every tipset in the index is in the current chain; ie either + // the chain head or reachable by walking the chain. + // Algorithm: + // 1. Count mesages in index; if none, trivially reconciled. + // TODO we may consider populating the index in that case + // 2. Find the minimum tipset in the index; this will mark the end of the reconciliation walk + // 3. Walk from current tipset until we find a tipset in the index. + // 4. Delete (revert!) all tipsets above the found tipset. + // 5. If the walk ends in the boundary epoch, then delete everything. + // + + row := db.QueryRow("SELECT COUNT(*) FROM Messages") + + var result int64 + if err := row.Scan(&result); err != nil { + return xerrors.Errorf("error counting messages: %w", err) + } + + if result == 0 { + return nil + } + + row = db.QueryRow("SELECT MIN(xepoch) FROM Messages") + if err := row.Scan(&result); err != nil { + return xerrors.Errorf("error finding boundary epoch: %w", err) + } + + boundaryEpoch := abi.ChainEpoch(result) + + countMsgsStmt, err := db.Prepare("SELECT COUNT(*) FROM Messages WHERE tipset = ?") + if err != nil { + return xerrors.Errorf("error preparing statement: %w", err) + } + + curTs := cs.GetHeaviestTipSet() + for curTs != nil && curTs.Height() >= boundaryEpoch { + tsCid, err := curTs.Key().Cid() + if err != nil { + return xerrors.Errorf("error computing tipset cid: %w", err) + } + + key := tsCid.String() + row = countMsgsStmt.QueryRow(key) + if err := row.Scan(&result); err != nil { + return xerrors.Errorf("error counting messages: %w", err) + } + + if result > 0 { + // found it! + boundaryEpoch = curTs.Height() + 1 + break + } + + // walk up + parents := curTs.Parents() + curTs, err = cs.GetTipSetFromKey(context.TODO(), parents) + if err != nil { + return xerrors.Errorf("error walking chain: %w", err) + } + } + + // delete everything above the minEpoch + if _, err = db.Exec("DELETE FROM Messages WHERE xepoch >= ?", int64(boundaryEpoch)); err != nil { + return xerrors.Errorf("error deleting stale reorged out message: %w", err) + } + + return nil } func (x *msgIndex) prepareStatements() error { @@ -177,7 +246,6 @@ func (x *msgIndex) prepareStatements() error { } x.deleteTipSetStmt = stmt - // TODO reconciliation stmts return nil }