implementation details

This commit is contained in:
vyzo 2023-03-11 18:21:16 +02:00 committed by Maciej Witowski
parent 46488d4356
commit 6a67cd9931
2 changed files with 59 additions and 10 deletions

View File

@ -2,19 +2,22 @@ package index
import (
"context"
"errors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/ipfs/go-cid"
)
var ErrNotFound = errors.New("message not found")
// MsgInfo is the Message metadata the index tracks.
type MsgInfo struct {
// the message this record refers to
Message cid.Cid
// the tipset where this messages was executed
Tipset cid.Cid
// the epoch whre this message was executed
Epoch abi.ChainEpoch
// the tipset where this messages executed
Tipset cid.Cid
// the canonical execution order of the message in the tipset
Index int
}

View File

@ -9,13 +9,14 @@ import (
"path"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/ipfs/go-cid"
)
// chain store interface; we could use store.ChainStore directly,
@ -29,7 +30,9 @@ var _ ChainStore = (*store.ChainStore)(nil)
type msgIndex struct {
cs ChainStore
db *sql.DB
db *sql.DB
selectMsgStmt *sql.Stmt
deleteTipSetStmt *sql.Stmt
}
var _ MsgIndex = (*msgIndex)(nil)
@ -117,8 +120,13 @@ opendb:
// init utilities
func createTables(db *sql.DB) error {
// TODO
return errors.New("TODO: index.createTables")
// Just a single table for now; ghetto, but this an index so we denormalize to avoid joins.
if _, err := db.Exec("CREATE TABLE Messages (cid VARCHAR(80) PRIMARY KEY, tipset VARCHAR(80), xepoch INTEGER, xindex INTEGER)"); err != nil {
return err
}
// TODO Should we add an index for tipset to speed up deletion on revert?
return nil
}
func reconcileIndex(db *sql.DB, cs ChainStore) error {
@ -127,8 +135,20 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
}
func (x *msgIndex) prepareStatements() error {
// TODO
return errors.New("TODO: msgIndex.prepareStatements")
stmt, err := x.db.Prepare("SELECT (tipset, xepoch, xindex) FROM Messages WHERE cid = ?")
if err != nil {
return err
}
x.selectMsgStmt = stmt
stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?")
if err != nil {
return err
}
x.deleteTipSetStmt = stmt
// TODO reconciliation stmts
return nil
}
// head change notifee
@ -139,8 +159,34 @@ func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error {
// interface
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
// TODO
return MsgInfo{}, errors.New("TODO: msgIndex.GetMsgInfo")
var (
tipset string
epoch int64
index int64
)
key := m.String()
row := x.selectMsgStmt.QueryRow(key)
err := row.Scan(&tipset, &epoch, &index)
switch {
case err == sql.ErrNoRows:
return MsgInfo{}, ErrNotFound
case err != nil:
return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err)
}
tipsetCid, err := cid.Decode(tipset)
if err != nil {
return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err)
}
return MsgInfo{
Message: m,
Tipset: tipsetCid,
Epoch: abi.ChainEpoch(epoch),
Index: int(index),
}, nil
}
func (x *msgIndex) Close() error {