refactor database
- drop the execution index; we don't need it - it is inclusion tipset - use MessagesForTipset - hoist db sql stuffs on top for clarity - add index for tipset on messages
This commit is contained in:
parent
7219fae0a1
commit
e682036cef
@ -16,12 +16,10 @@ var ErrClosed = errors.New("index closed")
|
|||||||
type MsgInfo struct {
|
type MsgInfo struct {
|
||||||
// the message this record refers to
|
// the message this record refers to
|
||||||
Message cid.Cid
|
Message cid.Cid
|
||||||
// the tipset where this messages was executed
|
// the tipset where this messages was included
|
||||||
TipSet cid.Cid
|
TipSet cid.Cid
|
||||||
// the epoch whre this message was executed
|
// the epoch whre this message was included
|
||||||
Epoch abi.ChainEpoch
|
Epoch abi.ChainEpoch
|
||||||
// the canonical execution order of the message in the tipset
|
|
||||||
Index int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MsgIndex is the interface to the message index
|
// MsgIndex is the interface to the message index
|
||||||
|
@ -23,9 +23,36 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("msgindex")
|
var log = logging.Logger("msgindex")
|
||||||
|
|
||||||
var (
|
var dbName = "msgindex.db"
|
||||||
dbName = "msgindex.db"
|
var dbDefs = []string{
|
||||||
|
`CREATE TABLE IF NOT EXISTS messages (
|
||||||
|
cid VARCHAR(80) PRIMARY KEY,
|
||||||
|
tipset_cid VARCHAR(80) NOT NULL,
|
||||||
|
epoch INTEGER NOT NULL
|
||||||
|
)`,
|
||||||
|
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)
|
||||||
|
`,
|
||||||
|
`CREATE TABLE IF NOT EXISTS _meta (
|
||||||
|
version UINT64 NOT NULL UNIQUE
|
||||||
|
)`,
|
||||||
|
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
|
||||||
|
}
|
||||||
|
var dbPragmas = []string{}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// prepared stmts
|
||||||
|
dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?"
|
||||||
|
dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)"
|
||||||
|
dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?"
|
||||||
|
// reconciliation
|
||||||
|
dbqCountMessages = "SELECT COUNT(*) FROM messages"
|
||||||
|
dbqMinEpoch = "SELECT MIN(epoch) FROM messages"
|
||||||
|
dbqCountTipsetMessages = "SELECT COUNT(*) FROM messages WHERE tipset_cid = ?"
|
||||||
|
dbqDeleteMessagesByEpoch = "DELETE FROM messages WHERE epoch >= ?"
|
||||||
|
)
|
||||||
|
|
||||||
|
// coalescer configuration (TODO: use observer instead)
|
||||||
|
var (
|
||||||
coalesceMinDelay = 100 * time.Millisecond
|
coalesceMinDelay = 100 * time.Millisecond
|
||||||
coalesceMaxDelay = time.Second
|
coalesceMaxDelay = time.Second
|
||||||
coalesceMergeInterval = 100 * time.Millisecond
|
coalesceMergeInterval = 100 * time.Millisecond
|
||||||
@ -35,7 +62,7 @@ var (
|
|||||||
// but this simplifies unit testing.
|
// but this simplifies unit testing.
|
||||||
type ChainStore interface {
|
type ChainStore interface {
|
||||||
SubscribeHeadChanges(f store.ReorgNotifee)
|
SubscribeHeadChanges(f store.ReorgNotifee)
|
||||||
MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error)
|
||||||
GetHeaviestTipSet() *types.TipSet
|
GetHeaviestTipSet() *types.TipSet
|
||||||
GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
|
GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
|
||||||
}
|
}
|
||||||
@ -69,8 +96,8 @@ type headChange struct {
|
|||||||
|
|
||||||
func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
|
func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
|
||||||
var (
|
var (
|
||||||
mkdb bool
|
|
||||||
dbPath string
|
dbPath string
|
||||||
|
exists bool
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -82,8 +109,10 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
|
|||||||
dbPath = path.Join(basePath, dbName)
|
dbPath = path.Join(basePath, dbName)
|
||||||
_, err = os.Stat(dbPath)
|
_, err = os.Stat(dbPath)
|
||||||
switch {
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
exists = true
|
||||||
|
|
||||||
case errors.Is(err, fs.ErrNotExist):
|
case errors.Is(err, fs.ErrNotExist):
|
||||||
mkdb = true
|
|
||||||
|
|
||||||
case err != nil:
|
case err != nil:
|
||||||
return nil, xerrors.Errorf("error stating msgindex database: %w", err)
|
return nil, xerrors.Errorf("error stating msgindex database: %w", err)
|
||||||
@ -96,16 +125,13 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
|
|||||||
return nil, xerrors.Errorf("error opening msgindex database: %w", err)
|
return nil, xerrors.Errorf("error opening msgindex database: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if mkdb {
|
if err := prepareDB(db); err != nil {
|
||||||
err = createTables(db)
|
return nil, xerrors.Errorf("error creating msgindex database: %w", err)
|
||||||
if err != nil {
|
}
|
||||||
return nil, xerrors.Errorf("error creating msgindex database: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO we may consider populating the index in this case.
|
// TODO we may consider populating the index when first creating the db
|
||||||
} else {
|
if exists {
|
||||||
err = reconcileIndex(db, cs)
|
if err := reconcileIndex(db, cs); err != nil {
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("error reconciling msgindex database: %w", err)
|
return nil, xerrors.Errorf("error reconciling msgindex database: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -144,13 +170,19 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// init utilities
|
// init utilities
|
||||||
func createTables(db *sql.DB) error {
|
func prepareDB(db *sql.DB) error {
|
||||||
// Just a single table for now; ghetto, but this an index so we denormalize to avoid joins.
|
for _, stmt := range dbDefs {
|
||||||
if _, err := db.Exec("CREATE TABLE Messages (cid VARCHAR(80) PRIMARY KEY, tipset VARCHAR(80), xepoch INTEGER, xindex INTEGER)"); err != nil {
|
if _, err := db.Exec(stmt); err != nil {
|
||||||
return err
|
return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, stmt := range dbPragmas {
|
||||||
|
if _, err := db.Exec(stmt); err != nil {
|
||||||
|
return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO Should we add an index for tipset to speed up deletion on revert?
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,7 +198,7 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
|||||||
// 5. If the walk ends in the boundary epoch, then delete everything.
|
// 5. If the walk ends in the boundary epoch, then delete everything.
|
||||||
//
|
//
|
||||||
|
|
||||||
row := db.QueryRow("SELECT COUNT(*) FROM Messages")
|
row := db.QueryRow(dbqCountMessages)
|
||||||
|
|
||||||
var result int64
|
var result int64
|
||||||
if err := row.Scan(&result); err != nil {
|
if err := row.Scan(&result); err != nil {
|
||||||
@ -177,14 +209,14 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
row = db.QueryRow("SELECT MIN(xepoch) FROM Messages")
|
row = db.QueryRow(dbqMinEpoch)
|
||||||
if err := row.Scan(&result); err != nil {
|
if err := row.Scan(&result); err != nil {
|
||||||
return xerrors.Errorf("error finding boundary epoch: %w", err)
|
return xerrors.Errorf("error finding boundary epoch: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
boundaryEpoch := abi.ChainEpoch(result)
|
boundaryEpoch := abi.ChainEpoch(result)
|
||||||
|
|
||||||
countMsgsStmt, err := db.Prepare("SELECT COUNT(*) FROM Messages WHERE tipset = ?")
|
countMsgsStmt, err := db.Prepare(dbqCountTipsetMessages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("error preparing statement: %w", err)
|
return xerrors.Errorf("error preparing statement: %w", err)
|
||||||
}
|
}
|
||||||
@ -217,7 +249,7 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// delete everything above the minEpoch
|
// delete everything above the minEpoch
|
||||||
if _, err = db.Exec("DELETE FROM Messages WHERE xepoch >= ?", int64(boundaryEpoch)); err != nil {
|
if _, err = db.Exec(dbqDeleteMessagesByEpoch, int64(boundaryEpoch)); err != nil {
|
||||||
return xerrors.Errorf("error deleting stale reorged out message: %w", err)
|
return xerrors.Errorf("error deleting stale reorged out message: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -225,19 +257,19 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (x *msgIndex) prepareStatements() error {
|
func (x *msgIndex) prepareStatements() error {
|
||||||
stmt, err := x.db.Prepare("SELECT tipset, xepoch, xindex FROM Messages WHERE cid = ?")
|
stmt, err := x.db.Prepare(dbqGetMessageInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("prepare selectMsgStmt: %w", err)
|
return xerrors.Errorf("prepare selectMsgStmt: %w", err)
|
||||||
}
|
}
|
||||||
x.selectMsgStmt = stmt
|
x.selectMsgStmt = stmt
|
||||||
|
|
||||||
stmt, err = x.db.Prepare("INSERT INTO Messages VALUES (?, ?, ?, ?)")
|
stmt, err = x.db.Prepare(dbqInsertMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("prepare insertMsgStmt: %w", err)
|
return xerrors.Errorf("prepare insertMsgStmt: %w", err)
|
||||||
}
|
}
|
||||||
x.insertMsgStmt = stmt
|
x.insertMsgStmt = stmt
|
||||||
|
|
||||||
stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?")
|
stmt, err = x.db.Prepare(dbqDeleteTipsetMessages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("prepare deleteTipSetStmt: %w", err)
|
return xerrors.Errorf("prepare deleteTipSetStmt: %w", err)
|
||||||
}
|
}
|
||||||
@ -335,42 +367,18 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
tskey := tscid.String()
|
tskey := tscid.String()
|
||||||
xepoch := int64(ts.Height())
|
epoch := int64(ts.Height())
|
||||||
var xindex int64
|
|
||||||
|
|
||||||
seen := make(map[string]struct{})
|
msgs, err := x.cs.MessagesForTipset(ctx, ts)
|
||||||
insert := func(key string) error {
|
if err != nil {
|
||||||
if _, ok := seen[key]; ok {
|
return xerrors.Errorf("error retrieving messages for tipset %s: %w", ts, err)
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := tx.Stmt(x.insertMsgStmt).Exec(key, tskey, xepoch, xindex); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
seen[key] = struct{}{}
|
|
||||||
xindex++
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, blk := range ts.Blocks() {
|
insertStmt := tx.Stmt(x.insertMsgStmt)
|
||||||
bmsgs, smsgs, err := x.cs.MessagesForBlock(ctx, blk)
|
for _, msg := range msgs {
|
||||||
if err != nil {
|
key := msg.Cid().String()
|
||||||
return xerrors.Errorf("error retrieving messages for block %s in %s: %w", blk.Cid(), ts, err)
|
if _, err := insertStmt.Exec(key, tskey, epoch); err != nil {
|
||||||
}
|
return xerrors.Errorf("error inserting message: %w", err)
|
||||||
|
|
||||||
for _, m := range bmsgs {
|
|
||||||
key := m.Cid().String()
|
|
||||||
if err := insert(key); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, m := range smsgs {
|
|
||||||
key := m.Cid().String()
|
|
||||||
if err := insert(key); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -389,12 +397,11 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
|||||||
var (
|
var (
|
||||||
tipset string
|
tipset string
|
||||||
epoch int64
|
epoch int64
|
||||||
index int64
|
|
||||||
)
|
)
|
||||||
|
|
||||||
key := m.String()
|
key := m.String()
|
||||||
row := x.selectMsgStmt.QueryRow(key)
|
row := x.selectMsgStmt.QueryRow(key)
|
||||||
err := row.Scan(&tipset, &epoch, &index)
|
err := row.Scan(&tipset, &epoch)
|
||||||
switch {
|
switch {
|
||||||
case err == sql.ErrNoRows:
|
case err == sql.ErrNoRows:
|
||||||
return MsgInfo{}, ErrNotFound
|
return MsgInfo{}, ErrNotFound
|
||||||
@ -412,7 +419,6 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
|||||||
Message: m,
|
Message: m,
|
||||||
TipSet: tipsetCid,
|
TipSet: tipsetCid,
|
||||||
Epoch: abi.ChainEpoch(epoch),
|
Epoch: abi.ChainEpoch(epoch),
|
||||||
Index: int(index),
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user