diff --git a/chain/index/msgindex.go b/chain/index/msgindex.go index accaf5fc9..ff5ddbdd9 100644 --- a/chain/index/msgindex.go +++ b/chain/index/msgindex.go @@ -7,6 +7,7 @@ import ( "io/fs" "os" "path" + "sync" "time" logging "github.com/ipfs/go-log/v2" @@ -19,24 +20,6 @@ import ( "github.com/ipfs/go-cid" ) -// chain store interface; we could use store.ChainStore directly, -// but this simplifies unit testing. -type ChainStore interface { - SubscribeHeadChanges(f store.ReorgNotifee) -} - -var _ ChainStore = (*store.ChainStore)(nil) - -type msgIndex struct { - cs ChainStore - - db *sql.DB - selectMsgStmt *sql.Stmt - deleteTipSetStmt *sql.Stmt -} - -var _ MsgIndex = (*msgIndex)(nil) - var log = logging.Logger("msgindex") var ( @@ -47,6 +30,37 @@ var ( coalesceMergeInterval = 100 * time.Millisecond ) +// chain store interface; we could use store.ChainStore directly, +// but this simplifies unit testing. +type ChainStore interface { + SubscribeHeadChanges(f store.ReorgNotifee) + MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) +} + +var _ ChainStore = (*store.ChainStore)(nil) + +type msgIndex struct { + cs ChainStore + + db *sql.DB + selectMsgStmt *sql.Stmt + insertMsgStmt *sql.Stmt + deleteTipSetStmt *sql.Stmt + + sema chan struct{} + mx sync.Mutex + pend []headChange + + cancel func() +} + +var _ MsgIndex = (*msgIndex)(nil) + +type headChange struct { + rev []*types.TipSet + app []*types.TipSet +} + func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) { var ( mkdb bool @@ -96,7 +110,15 @@ opendb: } } - msgIndex := &msgIndex{db: db, cs: cs} + ctx, cancel := context.WithCancel(context.Background()) + + msgIndex := &msgIndex{ + db: db, + cs: cs, + sema: make(chan struct{}, 1), + cancel: cancel, + } + err = msgIndex.prepareStatements() if err != nil { err2 := db.Close() @@ -115,6 +137,8 @@ opendb: ) cs.SubscribeHeadChanges(rnf) + go msgIndex.background(ctx) + return msgIndex, nil } @@ -141,6 +165,12 @@ func (x *msgIndex) prepareStatements() error { } x.selectMsgStmt = stmt + stmt, err = x.db.Prepare("INSERT INTO Messages VALUES (?, ?, ?, ?)") + if err != nil { + return err + } + x.insertMsgStmt = stmt + stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?") if err != nil { return err @@ -153,8 +183,124 @@ func (x *msgIndex) prepareStatements() error { // head change notifee func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error { - // TODO - return errors.New("TODO: msgIndex.onHeadChange") + // do it in the background to avoid blocking head change processing + x.mx.Lock() + x.pend = append(x.pend, headChange{rev: rev, app: app}) + // TODO log loudly if this is building backlog (it shouldn't but better be safe on this) + x.mx.Unlock() + + select { + case x.sema <- struct{}{}: + default: + } + + return nil +} + +func (x *msgIndex) background(ctx context.Context) { + for { + select { + case <-x.sema: + err := x.processHeadChanges(ctx) + if err != nil { + // TODO should we shut down the index altogether? we just log for now. + log.Errorf("error processing head change notifications: %s", err) + } + + case <-ctx.Done(): + return + } + } +} + +func (x *msgIndex) processHeadChanges(ctx context.Context) error { + x.mx.Lock() + pend := x.pend + x.pend = nil + x.mx.Unlock() + + txn, err := x.db.Begin() + if err != nil { + return xerrors.Errorf("error creating transaction: %w", err) + } + + for _, hc := range pend { + for _, ts := range hc.rev { + if err := x.doRevert(ctx, ts); err != nil { + txn.Rollback() + return xerrors.Errorf("error reverting %s: %w", ts, err) + } + } + + for _, ts := range hc.app { + if err := x.doApply(ctx, ts); err != nil { + txn.Rollback() + return xerrors.Errorf("error applying %s: %w", ts, err) + } + } + } + + return txn.Commit() +} + +func (x *msgIndex) doRevert(ctx context.Context, ts *types.TipSet) error { + tskey, err := ts.Key().Cid() + if err != nil { + return xerrors.Errorf("error computing tipset cid: %w", err) + } + + key := tskey.String() + _, err = x.deleteTipSetStmt.Exec(key) + return err +} + +func (x *msgIndex) doApply(ctx context.Context, ts *types.TipSet) error { + tscid, err := ts.Key().Cid() + if err != nil { + return xerrors.Errorf("error computing tipset cid: %w", err) + } + + tskey := tscid.String() + xepoch := int64(ts.Height()) + var xindex int64 + + seen := make(map[string]struct{}) + insert := func(key string) error { + if _, ok := seen[key]; ok { + return nil + } + + if _, err := x.insertMsgStmt.Exec(key, tskey, xepoch, xindex); err != nil { + return err + } + seen[key] = struct{}{} + xindex++ + + return nil + } + + for _, blk := range ts.Blocks() { + bmsgs, smsgs, err := x.cs.MessagesForBlock(ctx, blk) + if err != nil { + return xerrors.Errorf("error retrieving messages for block %s in %s: %w", blk, ts, err) + } + + for _, m := range bmsgs { + key := m.Cid().String() + if err := insert(key); err != nil { + return err + } + } + + for _, m := range smsgs { + key := m.Cid().String() + if err := insert(key); err != nil { + return err + } + } + } + + return nil } // interface