implementation details
This commit is contained in:
parent
9144f9cea7
commit
3649ae373b
@ -2,19 +2,22 @@ package index
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var ErrNotFound = errors.New("message not found")
|
||||||
|
|
||||||
// MsgInfo is the Message metadata the index tracks.
|
// MsgInfo is the Message metadata the index tracks.
|
||||||
type MsgInfo struct {
|
type MsgInfo struct {
|
||||||
// the message this record refers to
|
// the message this record refers to
|
||||||
Message cid.Cid
|
Message cid.Cid
|
||||||
|
// the tipset where this messages was executed
|
||||||
|
Tipset cid.Cid
|
||||||
// the epoch whre this message was executed
|
// the epoch whre this message was executed
|
||||||
Epoch abi.ChainEpoch
|
Epoch abi.ChainEpoch
|
||||||
// the tipset where this messages executed
|
|
||||||
Tipset cid.Cid
|
|
||||||
// the canonical execution order of the message in the tipset
|
// the canonical execution order of the message in the tipset
|
||||||
Index int
|
Index int
|
||||||
}
|
}
|
||||||
|
@ -9,13 +9,14 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
)
|
)
|
||||||
|
|
||||||
// chain store interface; we could use store.ChainStore directly,
|
// chain store interface; we could use store.ChainStore directly,
|
||||||
@ -29,7 +30,9 @@ var _ ChainStore = (*store.ChainStore)(nil)
|
|||||||
type msgIndex struct {
|
type msgIndex struct {
|
||||||
cs ChainStore
|
cs ChainStore
|
||||||
|
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
|
selectMsgStmt *sql.Stmt
|
||||||
|
deleteTipSetStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ MsgIndex = (*msgIndex)(nil)
|
var _ MsgIndex = (*msgIndex)(nil)
|
||||||
@ -117,8 +120,13 @@ opendb:
|
|||||||
|
|
||||||
// init utilities
|
// init utilities
|
||||||
func createTables(db *sql.DB) error {
|
func createTables(db *sql.DB) error {
|
||||||
// TODO
|
// Just a single table for now; ghetto, but this an index so we denormalize to avoid joins.
|
||||||
return errors.New("TODO: index.createTables")
|
if _, err := db.Exec("CREATE TABLE Messages (cid VARCHAR(80) PRIMARY KEY, tipset VARCHAR(80), xepoch INTEGER, xindex INTEGER)"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO Should we add an index for tipset to speed up deletion on revert?
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
||||||
@ -127,8 +135,20 @@ func reconcileIndex(db *sql.DB, cs ChainStore) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (x *msgIndex) prepareStatements() error {
|
func (x *msgIndex) prepareStatements() error {
|
||||||
// TODO
|
stmt, err := x.db.Prepare("SELECT (tipset, xepoch, xindex) FROM Messages WHERE cid = ?")
|
||||||
return errors.New("TODO: msgIndex.prepareStatements")
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
x.selectMsgStmt = stmt
|
||||||
|
|
||||||
|
stmt, err = x.db.Prepare("DELETE FROM Messages WHERE tipset = ?")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
x.deleteTipSetStmt = stmt
|
||||||
|
|
||||||
|
// TODO reconciliation stmts
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// head change notifee
|
// head change notifee
|
||||||
@ -139,8 +159,34 @@ func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error {
|
|||||||
|
|
||||||
// interface
|
// interface
|
||||||
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
||||||
// TODO
|
var (
|
||||||
return MsgInfo{}, errors.New("TODO: msgIndex.GetMsgInfo")
|
tipset string
|
||||||
|
epoch int64
|
||||||
|
index int64
|
||||||
|
)
|
||||||
|
|
||||||
|
key := m.String()
|
||||||
|
row := x.selectMsgStmt.QueryRow(key)
|
||||||
|
err := row.Scan(&tipset, &epoch, &index)
|
||||||
|
switch {
|
||||||
|
case err == sql.ErrNoRows:
|
||||||
|
return MsgInfo{}, ErrNotFound
|
||||||
|
|
||||||
|
case err != nil:
|
||||||
|
return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tipsetCid, err := cid.Decode(tipset)
|
||||||
|
if err != nil {
|
||||||
|
return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return MsgInfo{
|
||||||
|
Message: m,
|
||||||
|
Tipset: tipsetCid,
|
||||||
|
Epoch: abi.ChainEpoch(epoch),
|
||||||
|
Index: int(index),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *msgIndex) Close() error {
|
func (x *msgIndex) Close() error {
|
||||||
|
Loading…
Reference in New Issue
Block a user