Compare commits
2 Commits
master
...
ian/epoch_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c47a8eec0 | ||
|
|
61ca075d1a |
@ -24,18 +24,27 @@ type MsgInfo struct {
|
||||
|
||||
// MsgIndex is the interface to the message index
|
||||
type MsgIndex interface {
|
||||
// GetMsgInfo retrieves the message metadata through the index.
|
||||
// GetMsgInfo looks up the message in index and retrieves its metadata and execution
|
||||
// tipset cid.
|
||||
// The lookup is done using the onchain message Cid; that is the signed message Cid
|
||||
// for SECP messages and unsigned message Cid for BLS messages.
|
||||
GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error)
|
||||
GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error)
|
||||
// Close closes the index
|
||||
Close() error
|
||||
}
|
||||
|
||||
// TipsetIndex is the interface to the tipset index
|
||||
type TipsetIndex interface {
|
||||
// GetTipsetCID returns the tipset cid for the given epoch
|
||||
GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error)
|
||||
// Close closes the index
|
||||
Close() error
|
||||
}
|
||||
|
||||
type dummyMsgIndex struct{}
|
||||
|
||||
func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
||||
return MsgInfo{}, ErrNotFound
|
||||
func (dummyMsgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) {
|
||||
return MsgInfo{}, cid.Undef, ErrNotFound
|
||||
}
|
||||
|
||||
func (dummyMsgIndex) Close() error {
|
||||
@ -43,3 +52,15 @@ func (dummyMsgIndex) Close() error {
|
||||
}
|
||||
|
||||
var DummyMsgIndex MsgIndex = dummyMsgIndex{}
|
||||
|
||||
type dummyTipsetIndex struct{}
|
||||
|
||||
func (dummyTipsetIndex) GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error) {
|
||||
return nil, ErrNotFound
|
||||
}
|
||||
|
||||
func (dummyTipsetIndex) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var DummyTipsetIndex TipsetIndex = dummyTipsetIndex{}
|
||||
|
||||
@ -30,8 +30,8 @@ var dbDefs = []string{
|
||||
tipset_cid VARCHAR(80) NOT NULL,
|
||||
epoch INTEGER NOT NULL
|
||||
)`,
|
||||
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)
|
||||
`,
|
||||
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)`,
|
||||
`CREATE INDEX IF NOT EXISTS tipset_epochs ON messages (epoch)`,
|
||||
`CREATE TABLE IF NOT EXISTS _meta (
|
||||
version UINT64 NOT NULL UNIQUE
|
||||
)`,
|
||||
@ -44,6 +44,7 @@ const (
|
||||
dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?"
|
||||
dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)"
|
||||
dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?"
|
||||
dbqGetTipsetByEpoch = "SELECT tipset_cid FROM messages WHERE epoch = ? LIMIT 1"
|
||||
// reconciliation
|
||||
dbqCountMessages = "SELECT COUNT(*) FROM messages"
|
||||
dbqMinEpoch = "SELECT MIN(epoch) FROM messages"
|
||||
@ -75,6 +76,7 @@ type msgIndex struct {
|
||||
|
||||
db *sql.DB
|
||||
selectMsgStmt *sql.Stmt
|
||||
selectTipsetStmt *sql.Stmt
|
||||
insertMsgStmt *sql.Stmt
|
||||
deleteTipSetStmt *sql.Stmt
|
||||
|
||||
@ -350,6 +352,12 @@ func (x *msgIndex) prepareStatements() error {
|
||||
}
|
||||
x.selectMsgStmt = stmt
|
||||
|
||||
stmt, err = x.db.Prepare(dbqGetTipsetByEpoch)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("prepare selectTipsetStmt: %w", err)
|
||||
}
|
||||
x.selectTipsetStmt = stmt
|
||||
|
||||
stmt, err = x.db.Prepare(dbqInsertMessage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("prepare insertMsgStmt: %w", err)
|
||||
@ -485,40 +493,83 @@ func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) er
|
||||
}
|
||||
|
||||
// interface
|
||||
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
||||
func (x *msgIndex) GetMsgInfo(ctx context.Context, mCid cid.Cid) (MsgInfo, cid.Cid, error) {
|
||||
x.closeLk.RLock()
|
||||
defer x.closeLk.RUnlock()
|
||||
|
||||
if x.closed {
|
||||
return MsgInfo{}, ErrClosed
|
||||
return MsgInfo{}, cid.Undef, ErrClosed
|
||||
}
|
||||
|
||||
var (
|
||||
tipset string
|
||||
epoch int64
|
||||
)
|
||||
// fetch message from index (if it exists)
|
||||
//
|
||||
var tipset string
|
||||
var epoch int64
|
||||
err := x.selectMsgStmt.QueryRow(mCid.String()).Scan(&tipset, &epoch)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
// mCid not in index, its fine
|
||||
return MsgInfo{}, cid.Undef, ErrNotFound
|
||||
}
|
||||
|
||||
key := m.String()
|
||||
row := x.selectMsgStmt.QueryRow(key)
|
||||
err := row.Scan(&tipset, &epoch)
|
||||
return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying msgindex database: %w", err)
|
||||
}
|
||||
tsCid, err := cid.Decode(tipset)
|
||||
if err != nil {
|
||||
return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding tipset cid: %w", err)
|
||||
}
|
||||
|
||||
msgInfo := MsgInfo{
|
||||
Message: mCid,
|
||||
TipSet: tsCid,
|
||||
Epoch: abi.ChainEpoch(epoch),
|
||||
}
|
||||
|
||||
// fetch execution tipset of message (if it exists)
|
||||
//
|
||||
var xTipset string
|
||||
err = x.selectTipsetStmt.QueryRow(epoch + 1).Scan(&xTipset)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
// execution tipset not in index, its fine
|
||||
return msgInfo, cid.Undef, nil
|
||||
}
|
||||
|
||||
return MsgInfo{}, cid.Undef, xerrors.Errorf("error querying for execution tipset: %w", err)
|
||||
}
|
||||
xtsCid, err := cid.Decode(xTipset)
|
||||
if err != nil {
|
||||
return MsgInfo{}, cid.Undef, xerrors.Errorf("error decoding execution tipset cid: %w", err)
|
||||
}
|
||||
|
||||
return msgInfo, xtsCid, nil
|
||||
}
|
||||
|
||||
func (x *msgIndex) GetTipsetCID(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error) {
|
||||
x.closeLk.RLock()
|
||||
defer x.closeLk.RUnlock()
|
||||
|
||||
if x.closed {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
|
||||
var tipset string
|
||||
|
||||
row := x.selectTipsetStmt.QueryRow(epoch)
|
||||
err := row.Scan(&tipset)
|
||||
switch {
|
||||
case err == sql.ErrNoRows:
|
||||
return MsgInfo{}, ErrNotFound
|
||||
return nil, ErrNotFound
|
||||
|
||||
case err != nil:
|
||||
return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err)
|
||||
return nil, xerrors.Errorf("error querying msgindex database: %w", err)
|
||||
}
|
||||
|
||||
tipsetCid, err := cid.Decode(tipset)
|
||||
if err != nil {
|
||||
return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err)
|
||||
return nil, xerrors.Errorf("error decoding tipset cid: %w", err)
|
||||
}
|
||||
|
||||
return MsgInfo{
|
||||
Message: m,
|
||||
TipSet: tipsetCid,
|
||||
Epoch: abi.ChainEpoch(epoch),
|
||||
}, nil
|
||||
return &tipsetCid, nil
|
||||
}
|
||||
|
||||
func (x *msgIndex) Close() error {
|
||||
|
||||
@ -157,7 +157,7 @@ func verifyIndex(t *testing.T, cs *mockChainStore, msgIndex MsgIndex) {
|
||||
msgs, err := cs.MessagesForTipset(context.Background(), ts)
|
||||
require.NoError(t, err)
|
||||
for _, m := range msgs {
|
||||
minfo, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||
minfo, _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tsCid, minfo.TipSet)
|
||||
require.Equal(t, ts.Height(), minfo.Epoch)
|
||||
@ -174,7 +174,7 @@ func verifyMissing(t *testing.T, cs *mockChainStore, msgIndex MsgIndex, missing
|
||||
msgs, err := cs.MessagesForTipset(context.Background(), ts)
|
||||
require.NoError(t, err)
|
||||
for _, m := range msgs {
|
||||
_, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||
_, _, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
|
||||
require.Equal(t, ErrNotFound, err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -190,7 +190,7 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet
|
||||
}
|
||||
|
||||
func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
|
||||
minfo, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
|
||||
minfo, xtsCid, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
|
||||
if err != nil {
|
||||
return nil, nil, cid.Undef, xerrors.Errorf("error looking up message in index: %w", err)
|
||||
}
|
||||
@ -203,10 +203,18 @@ func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m
|
||||
}
|
||||
|
||||
// now get the execution tipset
|
||||
// TODO optimization: the index should have it implicitly so we can return it in the msginfo.
|
||||
xts, err := sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false)
|
||||
if err != nil {
|
||||
return nil, nil, cid.Undef, xerrors.Errorf("error looking up execution tipset: %w", err)
|
||||
var xts *types.TipSet = nil
|
||||
if xtsCid != cid.Undef {
|
||||
// lookup by cid which is faster
|
||||
xts, err = sm.cs.GetTipSetByCid(ctx, xtsCid)
|
||||
if err != nil {
|
||||
return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipSetByCid: %w", err)
|
||||
}
|
||||
} else {
|
||||
xts, err = sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false)
|
||||
if err != nil {
|
||||
return nil, nil, cid.Undef, xerrors.Errorf("error calling GetTipsetByHeight: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// check that the parent of the execution index is indeed the inclusion tipset
|
||||
|
||||
@ -6,10 +6,11 @@ import (
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/index"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
@ -30,16 +31,20 @@ type ChainIndex struct {
|
||||
indexCacheLk sync.Mutex
|
||||
indexCache map[types.TipSetKey]*lbEntry
|
||||
|
||||
loadTipSet loadTipSetFunc
|
||||
loadTipSet loadTipSetFunc
|
||||
lookupTipSet lookupTipSetCIDFunc
|
||||
|
||||
skipLength abi.ChainEpoch
|
||||
}
|
||||
type loadTipSetFunc func(context.Context, types.TipSetKey) (*types.TipSet, error)
|
||||
|
||||
func NewChainIndex(lts loadTipSetFunc) *ChainIndex {
|
||||
type lookupTipSetCIDFunc func(ctx context.Context, epoch abi.ChainEpoch) (*cid.Cid, error)
|
||||
|
||||
func NewChainIndex(lts loadTipSetFunc, luts lookupTipSetCIDFunc) *ChainIndex {
|
||||
return &ChainIndex{
|
||||
indexCache: make(map[types.TipSetKey]*lbEntry, DefaultChainIndexCacheSize),
|
||||
loadTipSet: lts,
|
||||
lookupTipSet: luts,
|
||||
skipLength: 20,
|
||||
}
|
||||
}
|
||||
@ -50,6 +55,25 @@ type lbEntry struct {
|
||||
}
|
||||
|
||||
func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) {
|
||||
if ci.lookupTipSet != nil {
|
||||
tsc, err := ci.lookupTipSet(ctx, to)
|
||||
switch {
|
||||
case err == index.ErrNotFound:
|
||||
// fall through
|
||||
case err != nil:
|
||||
return nil, xerrors.Errorf("failed to load tipset cid: %w", err)
|
||||
default:
|
||||
ts, err := ci.loadTipSet(ctx, types.NewTipSetKey(*tsc))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to load tipset: %w", err)
|
||||
}
|
||||
// make sure that the tipset is correct!
|
||||
if ts.Height() == to {
|
||||
return ts, nil
|
||||
}
|
||||
// otherwise, fall through
|
||||
}
|
||||
}
|
||||
if from.Height()-to <= ci.skipLength {
|
||||
return ci.walkBack(ctx, from, to)
|
||||
}
|
||||
|
||||
@ -161,7 +161,8 @@ func NewChainStore(chainBs bstore.Blockstore, stateBs bstore.Blockstore, ds dsto
|
||||
evtTypeHeadChange: j.RegisterEventType("sync", "head_change"),
|
||||
}
|
||||
|
||||
ci := NewChainIndex(cs.LoadTipSet)
|
||||
// TODO: what's the best way to get the message (tipset) index loaded into here?
|
||||
ci := NewChainIndex(cs.LoadTipSet, nil)
|
||||
|
||||
cs.cindex = ci
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user