implement reconciliation
This commit is contained in:
parent
7a3af7bf0f
commit
c123fab1da
@ -35,6 +35,8 @@ var (
|
|||||||
type ChainStore interface {
|
type ChainStore interface {
|
||||||
SubscribeHeadChanges(f store.ReorgNotifee)
|
SubscribeHeadChanges(f store.ReorgNotifee)
|
||||||
MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||||
|
GetHeaviestTipSet() *types.TipSet
|
||||||
|
GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ ChainStore = (*store.ChainStore)(nil)
|
var _ ChainStore = (*store.ChainStore)(nil)
|
||||||
@ -103,6 +105,8 @@ opendb:
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("error creating msgindex database: %w", err)
|
return nil, xerrors.Errorf("error creating msgindex database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO we may consider populating the index in this case.
|
||||||
} else {
|
} else {
|
||||||
err = reconcileIndex(db, cs)
|
err = reconcileIndex(db, cs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -154,8 +158,73 @@ func createTables(db *sql.DB) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
||||||
// TODO
|
// Invariant: after reconciliation, every tipset in the index is in the current chain; ie either
|
||||||
return errors.New("TODO: index.reconcileIndex")
|
// the chain head or reachable by walking the chain.
|
||||||
|
// Algorithm:
|
||||||
|
// 1. Count mesages in index; if none, trivially reconciled.
|
||||||
|
// TODO we may consider populating the index in that case
|
||||||
|
// 2. Find the minimum tipset in the index; this will mark the end of the reconciliation walk
|
||||||
|
// 3. Walk from current tipset until we find a tipset in the index.
|
||||||
|
// 4. Delete (revert!) all tipsets above the found tipset.
|
||||||
|
// 5. If the walk ends in the boundary epoch, then delete everything.
|
||||||
|
//
|
||||||
|
|
||||||
|
row := db.QueryRow("SELECT COUNT(*) FROM Messages")
|
||||||
|
|
||||||
|
var result int64
|
||||||
|
if err := row.Scan(&result); err != nil {
|
||||||
|
return xerrors.Errorf("error counting messages: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if result == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
row = db.QueryRow("SELECT MIN(xepoch) FROM Messages")
|
||||||
|
if err := row.Scan(&result); err != nil {
|
||||||
|
return xerrors.Errorf("error finding boundary epoch: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
boundaryEpoch := abi.ChainEpoch(result)
|
||||||
|
|
||||||
|
countMsgsStmt, err := db.Prepare("SELECT COUNT(*) FROM Messages WHERE tipset = ?")
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error preparing statement: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
curTs := cs.GetHeaviestTipSet()
|
||||||
|
for curTs != nil && curTs.Height() >= boundaryEpoch {
|
||||||
|
tsCid, err := curTs.Key().Cid()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error computing tipset cid: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
key := tsCid.String()
|
||||||
|
row = countMsgsStmt.QueryRow(key)
|
||||||
|
if err := row.Scan(&result); err != nil {
|
||||||
|
return xerrors.Errorf("error counting messages: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if result > 0 {
|
||||||
|
// found it!
|
||||||
|
boundaryEpoch = curTs.Height() + 1
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// walk up
|
||||||
|
parents := curTs.Parents()
|
||||||
|
curTs, err = cs.GetTipSetFromKey(context.TODO(), parents)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("error walking chain: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete everything above the minEpoch
|
||||||
|
if _, err = db.Exec("DELETE FROM Messages WHERE xepoch >= ?", int64(boundaryEpoch)); err != nil {
|
||||||
|
return xerrors.Errorf("error deleting stale reorged out message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *msgIndex) prepareStatements() error {
|
func (x *msgIndex) prepareStatements() error {
|
||||||
@ -177,7 +246,6 @@ func (x *msgIndex) prepareStatements() error {
|
|||||||
}
|
}
|
||||||
x.deleteTipSetStmt = stmt
|
x.deleteTipSetStmt = stmt
|
||||||
|
|
||||||
// TODO reconciliation stmts
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user