refactor database

- drop the execution index; we don't need it
- it is inclusion tipset
- use MessagesForTipset
- hoist db sql stuffs on top for clarity
- add index for tipset on messages
This commit is contained in:
vyzo 2023-03-13 05:51:24 +02:00
parent 88d7a4e610
commit df6dfdf8a9
2 changed files with 70 additions and 66 deletions

View File

@ -16,12 +16,10 @@ var ErrClosed = errors.New("index closed")
type MsgInfo struct {
// the message this record refers to
Message cid.Cid
// the tipset where this messages was executed
// the tipset where this messages was included
TipSet cid.Cid
// the epoch whre this message was executed
// the epoch whre this message was included
Epoch abi.ChainEpoch
// the canonical execution order of the message in the tipset
Index int
}
// MsgIndex is the interface to the message index

View File

@ -23,9 +23,36 @@ import (
var log = logging.Logger("msgindex")
var (
dbName = "msgindex.db"
var dbName = "msgindex.db"
var dbDefs = []string{
`CREATE TABLE IF NOT EXISTS messages (
cid VARCHAR(80) PRIMARY KEY,
tipset_cid VARCHAR(80) NOT NULL,
epoch INTEGER NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)
`,
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
}
var dbPragmas = []string{}
const (
// prepared stmts
dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?"
dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)"
dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?"
// reconciliation
dbqCountMessages = "SELECT COUNT(*) FROM messages"
dbqMinEpoch = "SELECT MIN(epoch) FROM messages"
dbqCountTipsetMessages = "SELECT COUNT(*) FROM messages WHERE tipset_cid = ?"
dbqDeleteMessagesByEpoch = "DELETE FROM messages WHERE epoch >= ?"
)
// coalescer configuration (TODO: use observer instead)
var (
coalesceMinDelay = 100 * time.Millisecond
coalesceMaxDelay = time.Second
coalesceMergeInterval = 100 * time.Millisecond
@ -35,7 +62,7 @@ var (
// but this simplifies unit testing.
type ChainStore interface {
SubscribeHeadChanges(f store.ReorgNotifee)
MessagesForBlock(ctx context.Context, b *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error)
GetHeaviestTipSet() *types.TipSet
GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
}
@ -69,8 +96,8 @@ type headChange struct {
func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
var (
mkdb bool
dbPath string
exists bool
err error
)
@ -82,8 +109,10 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
dbPath = path.Join(basePath, dbName)
_, err = os.Stat(dbPath)
switch {
case err == nil:
exists = true
case errors.Is(err, fs.ErrNotExist):
mkdb = true
case err != nil:
return nil, xerrors.Errorf("error stating msgindex database: %w", err)
@ -96,16 +125,13 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
return nil, xerrors.Errorf("error opening msgindex database: %w", err)
}
if mkdb {
err = createTables(db)
if err != nil {
return nil, xerrors.Errorf("error creating msgindex database: %w", err)
}
if err := prepareDB(db); err != nil {
return nil, xerrors.Errorf("error creating msgindex database: %w", err)
}
// TODO we may consider populating the index in this case.
} else {
err = reconcileIndex(db, cs)
if err != nil {
// TODO we may consider populating the index when first creating the db
if exists {
if err := reconcileIndex(db, cs); err != nil {
return nil, xerrors.Errorf("error reconciling msgindex database: %w", err)
}
}
@ -144,13 +170,19 @@ func NewMsgIndex(basePath string, cs ChainStore) (MsgIndex, error) {
}
// init utilities
func createTables(db *sql.DB) error {
// Just a single table for now; ghetto, but this an index so we denormalize to avoid joins.
if _, err := db.Exec("CREATE TABLE Messages (cid VARCHAR(80) PRIMARY KEY, tipset VARCHAR(80), xepoch INTEGER, xindex INTEGER)"); err != nil {
return err
func prepareDB(db *sql.DB) error {
for _, stmt := range dbDefs {
if _, err := db.Exec(stmt); err != nil {
return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err)
}
}
for _, stmt := range dbPragmas {
if _, err := db.Exec(stmt); err != nil {
return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err)
}
}
// TODO Should we add an index for tipset to speed up deletion on revert?
return nil
}
@ -166,7 +198,7 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
// 5. If the walk ends in the boundary epoch, then delete everything.
//
row := db.QueryRow("SELECT COUNT(*) FROM Messages")
row := db.QueryRow(dbqCountMessages)
var result int64
if err := row.Scan(&result); err != nil {
@ -177,14 +209,14 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
return nil
}
row = db.QueryRow("SELECT MIN(xepoch) FROM Messages")
row = db.QueryRow(dbqMinEpoch)
if err := row.Scan(&result); err != nil {
return xerrors.Errorf("error finding boundary epoch: %w", err)
}
boundaryEpoch := abi.ChainEpoch(result)
countMsgsStmt, err := db.Prepare("SELECT COUNT(*) FROM Messages WHERE tipset = ?")
countMsgsStmt, err := db.Prepare(dbqCountTipsetMessages)
if err != nil {
return xerrors.Errorf("error preparing statement: %w", err)
}
@ -217,7 +249,7 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
}
// delete everything above the minEpoch
if _, err = db.Exec("DELETE FROM Messages WHERE xepoch >= ?", int64(boundaryEpoch)); err != nil {
if _, err = db.Exec(dbqDeleteMessagesByEpoch, int64(boundaryEpoch)); err != nil {
return xerrors.Errorf("error deleting stale reorged out message: %w", err)
}
@ -225,19 +257,19 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
}
func (x *msgIndex) prepareStatements() error {
stmt, err := x.db.Prepare("SELECT tipset, xepoch, xindex FROM Messages WHERE cid = ?")
stmt, err := x.db.Prepare(dbqGetMessageInfo)
if err != nil {
return xerrors.Errorf("prepare selectMsgStmt: %w", err)
}
x.selectMsgStmt = stmt
stmt, err = x.db.Prepare("INSERT INTO Messages VALUES (?, ?, ?, ?)")
stmt, err = x.db.Prepare(dbqInsertMessage)
if err != nil {
return xerrors.Errorf("prepare insertMsgStmt: %w", err)
}
x.insertMsgStmt = stmt
stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?")
stmt, err = x.db.Prepare(dbqDeleteTipsetMessages)
if err != nil {
return xerrors.Errorf("prepare deleteTipSetStmt: %w", err)
}
@ -335,42 +367,18 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er
}
tskey := tscid.String()
xepoch := int64(ts.Height())
var xindex int64
epoch := int64(ts.Height())
seen := make(map[string]struct{})
insert := func(key string) error {
if _, ok := seen[key]; ok {
return nil
}
if _, err := tx.Stmt(x.insertMsgStmt).Exec(key, tskey, xepoch, xindex); err != nil {
return err
}
seen[key] = struct{}{}
xindex++
return nil
msgs, err := x.cs.MessagesForTipset(ctx, ts)
if err != nil {
return xerrors.Errorf("error retrieving messages for tipset %s: %w", ts, err)
}
for _, blk := range ts.Blocks() {
bmsgs, smsgs, err := x.cs.MessagesForBlock(ctx, blk)
if err != nil {
return xerrors.Errorf("error retrieving messages for block %s in %s: %w", blk.Cid(), ts, err)
}
for _, m := range bmsgs {
key := m.Cid().String()
if err := insert(key); err != nil {
return err
}
}
for _, m := range smsgs {
key := m.Cid().String()
if err := insert(key); err != nil {
return err
}
insertStmt := tx.Stmt(x.insertMsgStmt)
for _, msg := range msgs {
key := msg.Cid().String()
if _, err := insertStmt.Exec(key, tskey, epoch); err != nil {
return xerrors.Errorf("error inserting message: %w", err)
}
}
@ -389,12 +397,11 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
var (
tipset string
epoch int64
index int64
)
key := m.String()
row := x.selectMsgStmt.QueryRow(key)
err := row.Scan(&tipset, &epoch, &index)
err := row.Scan(&tipset, &epoch)
switch {
case err == sql.ErrNoRows:
return MsgInfo{}, ErrNotFound
@ -412,7 +419,6 @@ func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
Message: m,
TipSet: tipsetCid,
Epoch: abi.ChainEpoch(epoch),
Index: int(index),
}, nil
}