more implementation

This commit is contained in:
vyzo 2023-03-11 19:11:08 +02:00
parent 7fcf228bc4
commit d97c6b2f69

View File

@ -7,6 +7,7 @@ import (
"io/fs" "io/fs"
"os" "os"
"path" "path"
"sync"
"time" "time"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -19,24 +20,6 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
) )
// chain store interface; we could use store.ChainStore directly,
// but this simplifies unit testing.
type ChainStore interface {
SubscribeHeadChanges(f store.ReorgNotifee)
}
var _ ChainStore = (*store.ChainStore)(nil)
type msgIndex struct {
cs ChainStore
db *sql.DB
selectMsgStmt *sql.Stmt
deleteTipSetStmt *sql.Stmt
}
var _ MsgIndex = (*msgIndex)(nil)
var log = logging.Logger("msgindex") var log = logging.Logger("msgindex")
var ( var (
@ -47,6 +30,37 @@ var (
coalesceMergeInterval = 100 * time.Millisecond coalesceMergeInterval = 100 * time.Millisecond
) )
// chain store interface; we could use store.ChainStore directly,
// but this simplifies unit testing.
type ChainStore interface {
SubscribeHeadChanges(f store.ReorgNotifee)
MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
}
var _ ChainStore = (*store.ChainStore)(nil)
type msgIndex struct {
cs ChainStore
db *sql.DB
selectMsgStmt *sql.Stmt
insertMsgStmt *sql.Stmt
deleteTipSetStmt *sql.Stmt
sema chan struct{}
mx sync.Mutex
pend []headChange
cancel func()
}
var _ MsgIndex = (*msgIndex)(nil)
type headChange struct {
rev []*types.TipSet
app []*types.TipSet
}
func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) { func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
var ( var (
mkdb bool mkdb bool
@ -96,7 +110,15 @@ opendb:
} }
} }
msgIndex := &msgIndex{db: db, cs: cs} ctx, cancel := context.WithCancel(context.Background())
msgIndex := &msgIndex{
db: db,
cs: cs,
sema: make(chan struct{}, 1),
cancel: cancel,
}
err = msgIndex.prepareStatements() err = msgIndex.prepareStatements()
if err != nil { if err != nil {
err2 := db.Close() err2 := db.Close()
@ -115,6 +137,8 @@ opendb:
) )
cs.SubscribeHeadChanges(rnf) cs.SubscribeHeadChanges(rnf)
go msgIndex.background(ctx)
return msgIndex, nil return msgIndex, nil
} }
@ -141,6 +165,12 @@ func (x *msgIndex) prepareStatements() error {
} }
x.selectMsgStmt = stmt x.selectMsgStmt = stmt
stmt, err = x.db.Prepare("INSERT INTO Messages VALUES (?, ?, ?, ?)")
if err != nil {
return err
}
x.insertMsgStmt = stmt
stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?") stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?")
if err != nil { if err != nil {
return err return err
@ -153,8 +183,124 @@ func (x *msgIndex) prepareStatements() error {
// head change notifee // head change notifee
func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error { func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error {
// TODO // do it in the background to avoid blocking head change processing
return errors.New("TODO: msgIndex.onHeadChange") x.mx.Lock()
x.pend = append(x.pend, headChange{rev: rev, app: app})
// TODO log loudly if this is building backlog (it shouldn't but better be safe on this)
x.mx.Unlock()
select {
case x.sema <- struct{}{}:
default:
}
return nil
}
func (x *msgIndex) background(ctx context.Context) {
for {
select {
case <-x.sema:
err := x.processHeadChanges(ctx)
if err != nil {
// TODO should we shut down the index altogether? we just log for now.
log.Errorf("error processing head change notifications: %s", err)
}
case <-ctx.Done():
return
}
}
}
func (x *msgIndex) processHeadChanges(ctx context.Context) error {
x.mx.Lock()
pend := x.pend
x.pend = nil
x.mx.Unlock()
txn, err := x.db.Begin()
if err != nil {
return xerrors.Errorf("error creating transaction: %w", err)
}
for _, hc := range pend {
for _, ts := range hc.rev {
if err := x.doRevert(ctx, ts); err != nil {
txn.Rollback()
return xerrors.Errorf("error reverting %s: %w", ts, err)
}
}
for _, ts := range hc.app {
if err := x.doApply(ctx, ts); err != nil {
txn.Rollback()
return xerrors.Errorf("error applying %s: %w", ts, err)
}
}
}
return txn.Commit()
}
func (x *msgIndex) doRevert(ctx context.Context, ts *types.TipSet) error {
tskey, err := ts.Key().Cid()
if err != nil {
return xerrors.Errorf("error computing tipset cid: %w", err)
}
key := tskey.String()
_, err = x.deleteTipSetStmt.Exec(key)
return err
}
func (x *msgIndex) doApply(ctx context.Context, ts *types.TipSet) error {
tscid, err := ts.Key().Cid()
if err != nil {
return xerrors.Errorf("error computing tipset cid: %w", err)
}
tskey := tscid.String()
xepoch := int64(ts.Height())
var xindex int64
seen := make(map[string]struct{})
insert := func(key string) error {
if _, ok := seen[key]; ok {
return nil
}
if _, err := x.insertMsgStmt.Exec(key, tskey, xepoch, xindex); err != nil {
return err
}
seen[key] = struct{}{}
xindex++
return nil
}
for _, blk := range ts.Blocks() {
bmsgs, smsgs, err := x.cs.MessagesForBlock(ctx, blk)
if err != nil {
return xerrors.Errorf("error retrieving messages for block %s in %s: %w", blk, ts, err)
}
for _, m := range bmsgs {
key := m.Cid().String()
if err := insert(key); err != nil {
return err
}
}
for _, m := range smsgs {
key := m.Cid().String()
if err := insert(key); err != nil {
return err
}
}
}
return nil
} }
// interface // interface