more implementation
This commit is contained in:
parent
28321c1eea
commit
7a3af7bf0f
@ -7,6 +7,7 @@ import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
@ -19,24 +20,6 @@ import (
|
||||
"github.com/ipfs/go-cid"
|
||||
)
|
||||
|
||||
// chain store interface; we could use store.ChainStore directly,
|
||||
// but this simplifies unit testing.
|
||||
type ChainStore interface {
|
||||
SubscribeHeadChanges(f store.ReorgNotifee)
|
||||
}
|
||||
|
||||
var _ ChainStore = (*store.ChainStore)(nil)
|
||||
|
||||
type msgIndex struct {
|
||||
cs ChainStore
|
||||
|
||||
db *sql.DB
|
||||
selectMsgStmt *sql.Stmt
|
||||
deleteTipSetStmt *sql.Stmt
|
||||
}
|
||||
|
||||
var _ MsgIndex = (*msgIndex)(nil)
|
||||
|
||||
var log = logging.Logger("msgindex")
|
||||
|
||||
var (
|
||||
@ -47,6 +30,37 @@ var (
|
||||
coalesceMergeInterval = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
// chain store interface; we could use store.ChainStore directly,
|
||||
// but this simplifies unit testing.
|
||||
type ChainStore interface {
|
||||
SubscribeHeadChanges(f store.ReorgNotifee)
|
||||
MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||
}
|
||||
|
||||
var _ ChainStore = (*store.ChainStore)(nil)
|
||||
|
||||
type msgIndex struct {
|
||||
cs ChainStore
|
||||
|
||||
db *sql.DB
|
||||
selectMsgStmt *sql.Stmt
|
||||
insertMsgStmt *sql.Stmt
|
||||
deleteTipSetStmt *sql.Stmt
|
||||
|
||||
sema chan struct{}
|
||||
mx sync.Mutex
|
||||
pend []headChange
|
||||
|
||||
cancel func()
|
||||
}
|
||||
|
||||
var _ MsgIndex = (*msgIndex)(nil)
|
||||
|
||||
type headChange struct {
|
||||
rev []*types.TipSet
|
||||
app []*types.TipSet
|
||||
}
|
||||
|
||||
func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
|
||||
var (
|
||||
mkdb bool
|
||||
@ -96,7 +110,15 @@ opendb:
|
||||
}
|
||||
}
|
||||
|
||||
msgIndex := &msgIndex{db: db, cs: cs}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
msgIndex := &msgIndex{
|
||||
db: db,
|
||||
cs: cs,
|
||||
sema: make(chan struct{}, 1),
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
err = msgIndex.prepareStatements()
|
||||
if err != nil {
|
||||
err2 := db.Close()
|
||||
@ -115,6 +137,8 @@ opendb:
|
||||
)
|
||||
cs.SubscribeHeadChanges(rnf)
|
||||
|
||||
go msgIndex.background(ctx)
|
||||
|
||||
return msgIndex, nil
|
||||
}
|
||||
|
||||
@ -141,6 +165,12 @@ func (x *msgIndex) prepareStatements() error {
|
||||
}
|
||||
x.selectMsgStmt = stmt
|
||||
|
||||
stmt, err = x.db.Prepare("INSERT INTO Messages VALUES (?, ?, ?, ?)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
x.insertMsgStmt = stmt
|
||||
|
||||
stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?")
|
||||
if err != nil {
|
||||
return err
|
||||
@ -153,8 +183,124 @@ func (x *msgIndex) prepareStatements() error {
|
||||
|
||||
// head change notifee
|
||||
func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error {
|
||||
// TODO
|
||||
return errors.New("TODO: msgIndex.onHeadChange")
|
||||
// do it in the background to avoid blocking head change processing
|
||||
x.mx.Lock()
|
||||
x.pend = append(x.pend, headChange{rev: rev, app: app})
|
||||
// TODO log loudly if this is building backlog (it shouldn't but better be safe on this)
|
||||
x.mx.Unlock()
|
||||
|
||||
select {
|
||||
case x.sema <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *msgIndex) background(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-x.sema:
|
||||
err := x.processHeadChanges(ctx)
|
||||
if err != nil {
|
||||
// TODO should we shut down the index altogether? we just log for now.
|
||||
log.Errorf("error processing head change notifications: %s", err)
|
||||
}
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (x *msgIndex) processHeadChanges(ctx context.Context) error {
|
||||
x.mx.Lock()
|
||||
pend := x.pend
|
||||
x.pend = nil
|
||||
x.mx.Unlock()
|
||||
|
||||
txn, err := x.db.Begin()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error creating transaction: %w", err)
|
||||
}
|
||||
|
||||
for _, hc := range pend {
|
||||
for _, ts := range hc.rev {
|
||||
if err := x.doRevert(ctx, ts); err != nil {
|
||||
txn.Rollback()
|
||||
return xerrors.Errorf("error reverting %s: %w", ts, err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, ts := range hc.app {
|
||||
if err := x.doApply(ctx, ts); err != nil {
|
||||
txn.Rollback()
|
||||
return xerrors.Errorf("error applying %s: %w", ts, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return txn.Commit()
|
||||
}
|
||||
|
||||
func (x *msgIndex) doRevert(ctx context.Context, ts *types.TipSet) error {
|
||||
tskey, err := ts.Key().Cid()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error computing tipset cid: %w", err)
|
||||
}
|
||||
|
||||
key := tskey.String()
|
||||
_, err = x.deleteTipSetStmt.Exec(key)
|
||||
return err
|
||||
}
|
||||
|
||||
func (x *msgIndex) doApply(ctx context.Context, ts *types.TipSet) error {
|
||||
tscid, err := ts.Key().Cid()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error computing tipset cid: %w", err)
|
||||
}
|
||||
|
||||
tskey := tscid.String()
|
||||
xepoch := int64(ts.Height())
|
||||
var xindex int64
|
||||
|
||||
seen := make(map[string]struct{})
|
||||
insert := func(key string) error {
|
||||
if _, ok := seen[key]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := x.insertMsgStmt.Exec(key, tskey, xepoch, xindex); err != nil {
|
||||
return err
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
xindex++
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, blk := range ts.Blocks() {
|
||||
bmsgs, smsgs, err := x.cs.MessagesForBlock(ctx, blk)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error retrieving messages for block %s in %s: %w", blk, ts, err)
|
||||
}
|
||||
|
||||
for _, m := range bmsgs {
|
||||
key := m.Cid().String()
|
||||
if err := insert(key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, m := range smsgs {
|
||||
key := m.Cid().String()
|
||||
if err := insert(key); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// interface
|
||||
|
Loading…
Reference in New Issue
Block a user