Merge pull request #10452 from filecoin-project/vyzo/feat/chain/msgindex

feat:chain: Message Index
This commit is contained in:
vyzo 2023-03-22 17:02:34 +02:00 committed by GitHub
commit 8abe0ea608
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1339 additions and 25 deletions

View File

@ -818,6 +818,12 @@ workflows:
- build
suite: itest-mpool_push_with_uuid
target: "./itests/mpool_push_with_uuid_test.go"
- test:
name: test-itest-msgindex
requires:
- build
suite: itest-msgindex
target: "./itests/msgindex_test.go"
- test:
name: test-itest-multisig
requires:

View File

@ -34,6 +34,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/rand"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
@ -256,7 +257,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
//return nil, xerrors.Errorf("creating drand beacon: %w", err)
//}
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds, index.DummyMsgIndex)
if err != nil {
return nil, xerrors.Errorf("initing stmgr: %w", err)
}

45
chain/index/interface.go Normal file
View File

@ -0,0 +1,45 @@
package index
import (
"context"
"errors"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
)
var ErrNotFound = errors.New("message not found")
var ErrClosed = errors.New("index closed")
// MsgInfo is the Message metadata the index tracks.
type MsgInfo struct {
// the message this record refers to
Message cid.Cid
// the tipset where this message was included
TipSet cid.Cid
// the epoch where this message was included
Epoch abi.ChainEpoch
}
// MsgIndex is the interface to the message index
type MsgIndex interface {
// GetMsgInfo retrieves the message metadata through the index.
// The lookup is done using the onchain message Cid; that is the signed message Cid
// for SECP messages and unsigned message Cid for BLS messages.
GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error)
// Close closes the index
Close() error
}
type dummyMsgIndex struct{}
func (dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
return MsgInfo{}, ErrNotFound
}
func (dummyMsgIndex) Close() error {
return nil
}
var DummyMsgIndex MsgIndex = dummyMsgIndex{}

466
chain/index/msgindex.go Normal file
View File

@ -0,0 +1,466 @@
package index
import (
"context"
"database/sql"
"errors"
"io/fs"
"os"
"path"
"sync"
"time"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
_ "github.com/mattn/go-sqlite3"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
var log = logging.Logger("msgindex")
var dbName = "msgindex.db"
var dbDefs = []string{
`CREATE TABLE IF NOT EXISTS messages (
cid VARCHAR(80) PRIMARY KEY ON CONFLICT REPLACE,
tipset_cid VARCHAR(80) NOT NULL,
epoch INTEGER NOT NULL
)`,
`CREATE INDEX IF NOT EXISTS tipset_cids ON messages (tipset_cid)
`,
`CREATE TABLE IF NOT EXISTS _meta (
version UINT64 NOT NULL UNIQUE
)`,
`INSERT OR IGNORE INTO _meta (version) VALUES (1)`,
}
var dbPragmas = []string{}
const (
// prepared stmts
dbqGetMessageInfo = "SELECT tipset_cid, epoch FROM messages WHERE cid = ?"
dbqInsertMessage = "INSERT INTO messages VALUES (?, ?, ?)"
dbqDeleteTipsetMessages = "DELETE FROM messages WHERE tipset_cid = ?"
// reconciliation
dbqCountMessages = "SELECT COUNT(*) FROM messages"
dbqMinEpoch = "SELECT MIN(epoch) FROM messages"
dbqCountTipsetMessages = "SELECT COUNT(*) FROM messages WHERE tipset_cid = ?"
dbqDeleteMessagesByEpoch = "DELETE FROM messages WHERE epoch >= ?"
)
// coalescer configuration (TODO: use observer instead)
// these are exposed to make tests snappy
var (
CoalesceMinDelay = time.Second
CoalesceMaxDelay = 15 * time.Second
CoalesceMergeInterval = time.Second
)
// chain store interface; we could use store.ChainStore directly,
// but this simplifies unit testing.
type ChainStore interface {
SubscribeHeadChanges(f store.ReorgNotifee)
MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error)
GetHeaviestTipSet() *types.TipSet
GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error)
}
var _ ChainStore = (*store.ChainStore)(nil)
type msgIndex struct {
cs ChainStore
db *sql.DB
selectMsgStmt *sql.Stmt
insertMsgStmt *sql.Stmt
deleteTipSetStmt *sql.Stmt
sema chan struct{}
mx sync.Mutex
pend []headChange
cancel func()
workers sync.WaitGroup
closeLk sync.RWMutex
closed bool
}
var _ MsgIndex = (*msgIndex)(nil)
type headChange struct {
rev []*types.TipSet
app []*types.TipSet
}
func NewMsgIndex(lctx context.Context, basePath string, cs ChainStore) (MsgIndex, error) {
var (
dbPath string
exists bool
err error
)
err = os.MkdirAll(basePath, 0755)
if err != nil {
return nil, xerrors.Errorf("error creating msgindex base directory: %w", err)
}
dbPath = path.Join(basePath, dbName)
_, err = os.Stat(dbPath)
switch {
case err == nil:
exists = true
case errors.Is(err, fs.ErrNotExist):
case err != nil:
return nil, xerrors.Errorf("error stating msgindex database: %w", err)
}
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
// TODO [nice to have]: automaticaly delete corrupt databases
// but for now we can just error and let the operator delete.
return nil, xerrors.Errorf("error opening msgindex database: %w", err)
}
if err := prepareDB(db); err != nil {
return nil, xerrors.Errorf("error creating msgindex database: %w", err)
}
// TODO we may consider populating the index when first creating the db
if exists {
if err := reconcileIndex(db, cs); err != nil {
return nil, xerrors.Errorf("error reconciling msgindex database: %w", err)
}
}
ctx, cancel := context.WithCancel(lctx)
msgIndex := &msgIndex{
db: db,
cs: cs,
sema: make(chan struct{}, 1),
cancel: cancel,
}
err = msgIndex.prepareStatements()
if err != nil {
if err := db.Close(); err != nil {
log.Errorf("error closing msgindex database: %s", err)
}
return nil, xerrors.Errorf("error preparing msgindex database statements: %w", err)
}
rnf := store.WrapHeadChangeCoalescer(
msgIndex.onHeadChange,
CoalesceMinDelay,
CoalesceMaxDelay,
CoalesceMergeInterval,
)
cs.SubscribeHeadChanges(rnf)
msgIndex.workers.Add(1)
go msgIndex.background(ctx)
return msgIndex, nil
}
// init utilities
func prepareDB(db *sql.DB) error {
for _, stmt := range dbDefs {
if _, err := db.Exec(stmt); err != nil {
return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err)
}
}
for _, stmt := range dbPragmas {
if _, err := db.Exec(stmt); err != nil {
return xerrors.Errorf("error executing sql statement '%s': %w", stmt, err)
}
}
return nil
}
func reconcileIndex(db *sql.DB, cs ChainStore) error {
// Invariant: after reconciliation, every tipset in the index is in the current chain; ie either
// the chain head or reachable by walking the chain.
// Algorithm:
// 1. Count messages in index; if none, trivially reconciled.
// TODO we may consider populating the index in that case
// 2. Find the minimum tipset in the index; this will mark the end of the reconciliation walk
// 3. Walk from current tipset until we find a tipset in the index.
// 4. Delete (revert!) all tipsets above the found tipset.
// 5. If the walk ends in the boundary epoch, then delete everything.
//
row := db.QueryRow(dbqCountMessages)
var result int64
if err := row.Scan(&result); err != nil {
return xerrors.Errorf("error counting messages: %w", err)
}
if result == 0 {
return nil
}
row = db.QueryRow(dbqMinEpoch)
if err := row.Scan(&result); err != nil {
return xerrors.Errorf("error finding boundary epoch: %w", err)
}
boundaryEpoch := abi.ChainEpoch(result)
countMsgsStmt, err := db.Prepare(dbqCountTipsetMessages)
if err != nil {
return xerrors.Errorf("error preparing statement: %w", err)
}
curTs := cs.GetHeaviestTipSet()
for curTs != nil && curTs.Height() >= boundaryEpoch {
tsCid, err := curTs.Key().Cid()
if err != nil {
return xerrors.Errorf("error computing tipset cid: %w", err)
}
key := tsCid.String()
row = countMsgsStmt.QueryRow(key)
if err := row.Scan(&result); err != nil {
return xerrors.Errorf("error counting messages: %w", err)
}
if result > 0 {
// found it!
boundaryEpoch = curTs.Height() + 1
break
}
// walk up
parents := curTs.Parents()
curTs, err = cs.GetTipSetFromKey(context.TODO(), parents)
if err != nil {
return xerrors.Errorf("error walking chain: %w", err)
}
}
// delete everything above the minEpoch
if _, err = db.Exec(dbqDeleteMessagesByEpoch, int64(boundaryEpoch)); err != nil {
return xerrors.Errorf("error deleting stale reorged out message: %w", err)
}
return nil
}
func (x *msgIndex) prepareStatements() error {
stmt, err := x.db.Prepare(dbqGetMessageInfo)
if err != nil {
return xerrors.Errorf("prepare selectMsgStmt: %w", err)
}
x.selectMsgStmt = stmt
stmt, err = x.db.Prepare(dbqInsertMessage)
if err != nil {
return xerrors.Errorf("prepare insertMsgStmt: %w", err)
}
x.insertMsgStmt = stmt
stmt, err = x.db.Prepare(dbqDeleteTipsetMessages)
if err != nil {
return xerrors.Errorf("prepare deleteTipSetStmt: %w", err)
}
x.deleteTipSetStmt = stmt
return nil
}
// head change notifee
func (x *msgIndex) onHeadChange(rev, app []*types.TipSet) error {
x.closeLk.RLock()
defer x.closeLk.RUnlock()
if x.closed {
return nil
}
// do it in the background to avoid blocking head change processing
x.mx.Lock()
x.pend = append(x.pend, headChange{rev: rev, app: app})
pendLen := len(x.pend)
x.mx.Unlock()
// complain loudly if this is building backlog
if pendLen > 10 {
log.Warnf("message index head change processing is building backlog: %d pending head changes", pendLen)
}
select {
case x.sema <- struct{}{}:
default:
}
return nil
}
func (x *msgIndex) background(ctx context.Context) {
defer x.workers.Done()
for {
select {
case <-x.sema:
err := x.processHeadChanges(ctx)
if err != nil {
// we can't rely on an inconsistent index, so shut it down.
log.Errorf("error processing head change notifications: %s; shutting down message index", err)
if err2 := x.Close(); err2 != nil {
log.Errorf("error shutting down index: %s", err2)
}
}
case <-ctx.Done():
return
}
}
}
func (x *msgIndex) processHeadChanges(ctx context.Context) error {
x.mx.Lock()
pend := x.pend
x.pend = nil
x.mx.Unlock()
tx, err := x.db.Begin()
if err != nil {
return xerrors.Errorf("error creating transaction: %w", err)
}
for _, hc := range pend {
for _, ts := range hc.rev {
if err := x.doRevert(ctx, tx, ts); err != nil {
if err2 := tx.Rollback(); err2 != nil {
log.Errorf("error rolling back transaction: %s", err2)
}
return xerrors.Errorf("error reverting %s: %w", ts, err)
}
}
for _, ts := range hc.app {
if err := x.doApply(ctx, tx, ts); err != nil {
if err2 := tx.Rollback(); err2 != nil {
log.Errorf("error rolling back transaction: %s", err2)
}
return xerrors.Errorf("error applying %s: %w", ts, err)
}
}
}
return tx.Commit()
}
func (x *msgIndex) doRevert(ctx context.Context, tx *sql.Tx, ts *types.TipSet) error {
tskey, err := ts.Key().Cid()
if err != nil {
return xerrors.Errorf("error computing tipset cid: %w", err)
}
key := tskey.String()
_, err = tx.Stmt(x.deleteTipSetStmt).Exec(key)
return err
}
func (x *msgIndex) doApply(ctx context.Context, tx *sql.Tx, ts *types.TipSet) error {
tscid, err := ts.Key().Cid()
if err != nil {
return xerrors.Errorf("error computing tipset cid: %w", err)
}
tskey := tscid.String()
epoch := int64(ts.Height())
msgs, err := x.cs.MessagesForTipset(ctx, ts)
if err != nil {
return xerrors.Errorf("error retrieving messages for tipset %s: %w", ts, err)
}
insertStmt := tx.Stmt(x.insertMsgStmt)
for _, msg := range msgs {
key := msg.Cid().String()
if _, err := insertStmt.Exec(key, tskey, epoch); err != nil {
return xerrors.Errorf("error inserting message: %w", err)
}
}
return nil
}
// interface
func (x *msgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
x.closeLk.RLock()
defer x.closeLk.RUnlock()
if x.closed {
return MsgInfo{}, ErrClosed
}
var (
tipset string
epoch int64
)
key := m.String()
row := x.selectMsgStmt.QueryRow(key)
err := row.Scan(&tipset, &epoch)
switch {
case err == sql.ErrNoRows:
return MsgInfo{}, ErrNotFound
case err != nil:
return MsgInfo{}, xerrors.Errorf("error querying msgindex database: %w", err)
}
tipsetCid, err := cid.Decode(tipset)
if err != nil {
return MsgInfo{}, xerrors.Errorf("error decoding tipset cid: %w", err)
}
return MsgInfo{
Message: m,
TipSet: tipsetCid,
Epoch: abi.ChainEpoch(epoch),
}, nil
}
func (x *msgIndex) Close() error {
x.closeLk.Lock()
defer x.closeLk.Unlock()
if x.closed {
return nil
}
x.closed = true
x.cancel()
x.workers.Wait()
return x.db.Close()
}
// informal apis for itests; not exposed in the main interface
func (x *msgIndex) CountMessages() (int64, error) {
x.closeLk.RLock()
defer x.closeLk.RUnlock()
if x.closed {
return 0, ErrClosed
}
var result int64
row := x.db.QueryRow(dbqCountMessages)
err := row.Scan(&result)
return result, err
}

View File

@ -0,0 +1,298 @@
package index
import (
"context"
"errors"
"math/rand"
"os"
"testing"
"time"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/blocks"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
)
func TestBasicMsgIndex(t *testing.T) {
// the most basic of tests:
// 1. Create an index with mock chain store
// 2. Advance the chain for a few tipsets
// 3. Verify that the index contains all messages with the correct tipset/epoch
cs := newMockChainStore()
cs.genesis()
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
msgIndex, err := NewMsgIndex(context.Background(), tmp, cs)
require.NoError(t, err)
defer msgIndex.Close() //nolint
for i := 0; i < 10; i++ {
t.Logf("advance to epoch %d", i+1)
err := cs.advance()
require.NoError(t, err)
// wait for the coalescer to notify
time.Sleep(CoalesceMinDelay + 10*time.Millisecond)
}
t.Log("verifying index")
verifyIndex(t, cs, msgIndex)
}
func TestReorgMsgIndex(t *testing.T) {
// slightly more nuanced test that includes reorgs
// 1. Create an index with mock chain store
// 2. Advance/Reorg the chain for a few tipsets
// 3. Verify that the index contains all messages with the correct tipst/epoch
cs := newMockChainStore()
cs.genesis()
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
msgIndex, err := NewMsgIndex(context.Background(), tmp, cs)
require.NoError(t, err)
defer msgIndex.Close() //nolint
for i := 0; i < 10; i++ {
t.Logf("advance to epoch %d", i+1)
err := cs.advance()
require.NoError(t, err)
// wait for the coalescer to notify
time.Sleep(CoalesceMinDelay + 10*time.Millisecond)
}
// a simple reorg
t.Log("doing reorg")
reorgme := cs.curTs
reorgmeParent, err := cs.GetTipSetFromKey(context.Background(), reorgme.Parents())
require.NoError(t, err)
cs.setHead(reorgmeParent)
reorgmeChild := cs.makeBlk()
err = cs.reorg([]*types.TipSet{reorgme}, []*types.TipSet{reorgmeChild})
require.NoError(t, err)
time.Sleep(CoalesceMinDelay + 10*time.Millisecond)
t.Log("verifying index")
verifyIndex(t, cs, msgIndex)
t.Log("verifying that reorged messages are not present")
verifyMissing(t, cs, msgIndex, reorgme)
}
func TestReconcileMsgIndex(t *testing.T) {
// test that exercises the reconciliation code paths
// 1. Create and populate a basic msgindex, similar to TestBasicMsgIndex.
// 2. Close it
// 3. Reorg the mock chain store
// 4. Reopen the index to trigger reconciliation
// 5. Enxure that only the stable messages remain.
cs := newMockChainStore()
cs.genesis()
tmp := t.TempDir()
t.Cleanup(func() { _ = os.RemoveAll(tmp) })
msgIndex, err := NewMsgIndex(context.Background(), tmp, cs)
require.NoError(t, err)
for i := 0; i < 10; i++ {
t.Logf("advance to epoch %d", i+1)
err := cs.advance()
require.NoError(t, err)
// wait for the coalescer to notify
time.Sleep(CoalesceMinDelay + 10*time.Millisecond)
}
// Close it and reorg
err = msgIndex.Close()
require.NoError(t, err)
cs.notify = nil
// a simple reorg
t.Log("doing reorg")
reorgme := cs.curTs
reorgmeParent, err := cs.GetTipSetFromKey(context.Background(), reorgme.Parents())
require.NoError(t, err)
cs.setHead(reorgmeParent)
reorgmeChild := cs.makeBlk()
err = cs.reorg([]*types.TipSet{reorgme}, []*types.TipSet{reorgmeChild})
require.NoError(t, err)
// reopen to reconcile
msgIndex, err = NewMsgIndex(context.Background(), tmp, cs)
require.NoError(t, err)
defer msgIndex.Close() //nolint
t.Log("verifying index")
// need to step one up because the last tipset is not known by the index
cs.setHead(reorgmeParent)
verifyIndex(t, cs, msgIndex)
t.Log("verifying that reorged and unknown messages are not present")
verifyMissing(t, cs, msgIndex, reorgme, reorgmeChild)
}
func verifyIndex(t *testing.T, cs *mockChainStore, msgIndex MsgIndex) {
for ts := cs.curTs; ts.Height() > 0; {
t.Logf("verify at height %d", ts.Height())
blks := ts.Blocks()
if len(blks) == 0 {
break
}
tsCid, err := ts.Key().Cid()
require.NoError(t, err)
msgs, err := cs.MessagesForTipset(context.Background(), ts)
require.NoError(t, err)
for _, m := range msgs {
minfo, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
require.NoError(t, err)
require.Equal(t, tsCid, minfo.TipSet)
require.Equal(t, ts.Height(), minfo.Epoch)
}
parents := ts.Parents()
ts, err = cs.GetTipSetFromKey(context.Background(), parents)
require.NoError(t, err)
}
}
func verifyMissing(t *testing.T, cs *mockChainStore, msgIndex MsgIndex, missing ...*types.TipSet) {
for _, ts := range missing {
msgs, err := cs.MessagesForTipset(context.Background(), ts)
require.NoError(t, err)
for _, m := range msgs {
_, err := msgIndex.GetMsgInfo(context.Background(), m.Cid())
require.Equal(t, ErrNotFound, err)
}
}
}
type mockChainStore struct {
notify store.ReorgNotifee
curTs *types.TipSet
tipsets map[types.TipSetKey]*types.TipSet
msgs map[types.TipSetKey][]types.ChainMsg
nonce uint64
}
var _ ChainStore = (*mockChainStore)(nil)
var systemAddr address.Address
var rng *rand.Rand
func init() {
systemAddr, _ = address.NewIDAddress(0)
rng = rand.New(rand.NewSource(314159))
// adjust those to make tests snappy
CoalesceMinDelay = 100 * time.Millisecond
CoalesceMaxDelay = time.Second
CoalesceMergeInterval = 100 * time.Millisecond
}
func newMockChainStore() *mockChainStore {
return &mockChainStore{
tipsets: make(map[types.TipSetKey]*types.TipSet),
msgs: make(map[types.TipSetKey][]types.ChainMsg),
}
}
func (cs *mockChainStore) genesis() {
genBlock := mock.MkBlock(nil, 0, 0)
genTs := mock.TipSet(genBlock)
cs.msgs[genTs.Key()] = nil
cs.setHead(genTs)
}
func (cs *mockChainStore) setHead(ts *types.TipSet) {
cs.curTs = ts
cs.tipsets[ts.Key()] = ts
}
func (cs *mockChainStore) advance() error {
ts := cs.makeBlk()
return cs.reorg(nil, []*types.TipSet{ts})
}
func (cs *mockChainStore) reorg(rev, app []*types.TipSet) error {
for _, ts := range rev {
parents := ts.Parents()
cs.curTs = cs.tipsets[parents]
}
for _, ts := range app {
cs.tipsets[ts.Key()] = ts
cs.curTs = ts
}
if cs.notify != nil {
return cs.notify(rev, app)
}
return nil
}
func (cs *mockChainStore) makeBlk() *types.TipSet {
height := cs.curTs.Height() + 1
blk := mock.MkBlock(cs.curTs, uint64(height), uint64(height))
blk.Messages = cs.makeGarbageCid()
ts := mock.TipSet(blk)
msg1 := cs.makeMsg()
msg2 := cs.makeMsg()
cs.msgs[ts.Key()] = []types.ChainMsg{msg1, msg2}
return ts
}
func (cs *mockChainStore) makeMsg() *types.Message {
nonce := cs.nonce
cs.nonce++
return &types.Message{To: systemAddr, From: systemAddr, Nonce: nonce}
}
func (cs *mockChainStore) makeGarbageCid() cid.Cid {
garbage := blocks.NewBlock([]byte{byte(rng.Intn(256)), byte(rng.Intn(256)), byte(rng.Intn(256))})
return garbage.Cid()
}
func (cs *mockChainStore) SubscribeHeadChanges(f store.ReorgNotifee) {
cs.notify = f
}
func (cs *mockChainStore) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) {
msgs, ok := cs.msgs[ts.Key()]
if !ok {
return nil, errors.New("unknown tipset")
}
return msgs, nil
}
func (cs *mockChainStore) GetHeaviestTipSet() *types.TipSet {
return cs.curTs
}
func (cs *mockChainStore) GetTipSetFromKey(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
ts, ok := cs.tipsets[tsk]
if !ok {
return nil, errors.New("unknown tipset")
}
return ts, nil
}

View File

@ -36,6 +36,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
. "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
@ -168,7 +169,7 @@ func TestForkHeightTriggers(t *testing.T) {
}
return st.Flush(ctx)
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore())
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex)
if err != nil {
t.Fatal(err)
}
@ -286,7 +287,7 @@ func testForkRefuseCall(t *testing.T, nullsBefore, nullsAfter int) {
root cid.Cid, height abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
migrationCount++
return root, nil
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore())
}}}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex)
if err != nil {
t.Fatal(err)
}
@ -504,7 +505,7 @@ func TestForkPreMigration(t *testing.T) {
return nil
},
}}},
}, cg.BeaconSchedule(), datastore.NewMapDatastore())
}, cg.BeaconSchedule(), datastore.NewMapDatastore(), index.DummyMsgIndex)
if err != nil {
t.Fatal(err)
}
@ -579,6 +580,7 @@ func TestDisablePreMigration(t *testing.T) {
},
cg.BeaconSchedule(),
datastore.NewMapDatastore(),
index.DummyMsgIndex,
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
@ -633,6 +635,7 @@ func TestMigrtionCache(t *testing.T) {
},
cg.BeaconSchedule(),
metadataDs,
index.DummyMsgIndex,
)
require.NoError(t, err)
require.NoError(t, sm.Start(context.Background()))
@ -685,6 +688,7 @@ func TestMigrtionCache(t *testing.T) {
},
cg.BeaconSchedule(),
metadataDs,
index.DummyMsgIndex,
)
require.NoError(t, err)
sm.SetVMConstructor(func(ctx context.Context, vmopt *vm.VMOpts) (vm.Interface, error) {

View File

@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
@ -18,6 +19,7 @@ import (
// happened, with an optional limit to how many epochs it will search. It guarantees that the message has been on
// chain for at least confidence epochs without being reverted before returning.
func (sm *StateManager) WaitForMessage(ctx context.Context, mcid cid.Cid, confidence uint64, lookbackLimit abi.ChainEpoch, allowReplaced bool) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
// TODO use the index to speed this up.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -145,7 +147,30 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet
return head, r, foundMsg, nil
}
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg, lookbackLimit, allowReplaced)
fts, r, foundMsg, err := sm.searchForIndexedMsg(ctx, mcid, msg)
switch {
case err == nil:
if r != nil && foundMsg.Defined() {
return fts, r, foundMsg, nil
}
// debug log this, it's noteworthy
if r == nil {
log.Debugf("missing receipt for message in index for %s", mcid)
}
if !foundMsg.Defined() {
log.Debugf("message %s not found", mcid)
}
case errors.Is(err, index.ErrNotFound):
// ok for the index to have incomplete data
default:
log.Warnf("error searching message index: %s", err)
}
fts, r, foundMsg, err = sm.searchBackForMsg(ctx, head, msg, lookbackLimit, allowReplaced)
if err != nil {
log.Warnf("failed to look back through chain for message %s", mcid)
@ -159,6 +184,41 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet
return fts, r, foundMsg, nil
}
func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
minfo, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error looking up message in index: %w", err)
}
// check the height against the current tipset; minimum execution confidence requires that the
// inclusion tipset height is lower than the current head + 1
curTs := sm.cs.GetHeaviestTipSet()
if curTs.Height() <= minfo.Epoch+1 {
return nil, nil, cid.Undef, xerrors.Errorf("indexed message does not appear before the current tipset; index epoch: %d, current epoch: %d", minfo.Epoch, curTs.Height())
}
// now get the execution tipset
// TODO optimization: the index should have it implicitly so we can return it in the msginfo.
xts, err := sm.cs.GetTipsetByHeight(ctx, minfo.Epoch+1, curTs, false)
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error looking up execution tipset: %w", err)
}
// check that the parent of the execution index is indeed the inclusion tipset
parent := xts.Parents()
parentCid, err := parent.Cid()
if err != nil {
return nil, nil, cid.Undef, xerrors.Errorf("error computing tipset cid: %w", err)
}
if !parentCid.Equals(minfo.TipSet) {
return nil, nil, cid.Undef, xerrors.Errorf("inclusion tipset mismatch: have %s, expected %s", parentCid, minfo.TipSet)
}
r, foundMsg, err := sm.tipsetExecutedMessage(ctx, xts, mcid, m.VMMessage(), false)
return xts, r, foundMsg, xerrors.Errorf("error in tipstExecutedMessage: %w", err)
}
// searchBackForMsg searches up to limit tipsets backwards from the given
// tipset for a message receipt.
// If limit is

View File

@ -25,6 +25,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/rand"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/store"
@ -136,6 +137,8 @@ type StateManager struct {
tsExec Executor
tsExecMonitor ExecMonitor
beacon beacon.Schedule
msgIndex index.MsgIndex
}
// Caches a single state tree
@ -144,7 +147,7 @@ type treeCache struct {
tree *state.StateTree
}
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule, metadataDs dstore.Batching) (*StateManager, error) {
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule, metadataDs dstore.Batching, msgIndex index.MsgIndex) (*StateManager, error) {
// If we have upgrades, make sure they're in-order and make sense.
if err := us.Validate(); err != nil {
return nil, err
@ -199,11 +202,12 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
tree: nil,
},
compWait: make(map[string]chan struct{}),
msgIndex: msgIndex,
}, nil
}
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs)
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching, msgIndex index.MsgIndex) (*StateManager, error) {
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs, msgIndex)
if err != nil {
return nil, err
}

View File

@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -214,7 +215,7 @@ func TestChainExportImportFull(t *testing.T) {
t.Fatal("imported chain differed from exported chain")
}
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule(), ds)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), nil, filcns.DefaultUpgradeSchedule(), cg.BeaconSchedule(), ds, index.DummyMsgIndex)
if err != nil {
t.Fatal(err)
}

View File

@ -36,6 +36,7 @@ import (
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -229,7 +230,7 @@ var importBenchCmd = &cli.Command{
defer cs.Close() //nolint:errcheck
// TODO: We need to supply the actual beacon after v14
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil, metadataDs)
stm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(verifier), filcns.DefaultUpgradeSchedule(), nil, metadataDs, index.DummyMsgIndex)
if err != nil {
return err
}

View File

@ -35,6 +35,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/gen/genesis"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
@ -513,7 +514,7 @@ var chainBalanceStateCmd = &cli.Command{
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
if err != nil {
return err
}
@ -737,7 +738,7 @@ var chainPledgeCmd = &cli.Command{
cst := cbor.NewCborStore(bs)
store := adt.WrapStore(ctx, cst)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
if err != nil {
return err
}

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/chain/beacon/drand"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -111,7 +112,7 @@ var gasTraceCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds, index.DummyMsgIndex)
if err != nil {
return err
}
@ -212,7 +213,7 @@ var replayOfflineCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), shd, mds, index.DummyMsgIndex)
if err != nil {
return err
}

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -90,7 +91,7 @@ var invariantsCmd = &cli.Command{
cs := store.NewChainStore(bs, bs, mds, filcns.Weight, nil)
defer cs.Close() //nolint:errcheck
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
if err != nil {
return err
}

View File

@ -83,6 +83,7 @@ func main() {
invariantsCmd,
gasTraceCmd,
replayOfflineCmd,
msgindexCmd,
}
app := &cli.App{

View File

@ -41,6 +41,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin/verifreg"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
@ -123,7 +124,7 @@ var migrationsCmd = &cli.Command{
defer cs.Close() //nolint:errcheck
// Note: we use a map datastore for the metadata to avoid writing / using cached migration results in the metadata store
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, datastore.NewMapDatastore())
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, datastore.NewMapDatastore(), index.DummyMsgIndex)
if err != nil {
return err
}

221
cmd/lotus-shed/msgindex.go Normal file
View File

@ -0,0 +1,221 @@
package main
import (
"database/sql"
"fmt"
"path"
"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
lcli "github.com/filecoin-project/lotus/cli"
)
var msgindexCmd = &cli.Command{
Name: "msgindex",
Usage: "Tools for managing the message index",
Subcommands: []*cli.Command{
msgindexBackfillCmd,
msgindexPruneCmd,
},
}
var msgindexBackfillCmd = &cli.Command{
Name: "backfill",
Usage: "Backfill the message index for a number of epochs starting from a specified height",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "from",
Value: 0,
Usage: "height to start the backfill; uses the current head if omitted",
},
&cli.IntFlag{
Name: "epochs",
Value: 1800,
Usage: "number of epochs to backfill; defaults to 1800 (2 finalities)",
},
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
Usage: "path to the repo",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
curTs, err := api.ChainHead(ctx)
if err != nil {
return err
}
startHeight := int64(cctx.Int("from"))
if startHeight == 0 {
startHeight = int64(curTs.Height()) - 1
}
epochs := cctx.Int("epochs")
basePath, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}
dbPath := path.Join(basePath, "sqlite", "msgindex.db")
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}
defer func() {
err := db.Close()
if err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()
tx, err := db.Begin()
if err != nil {
return err
}
insertStmt, err := tx.Prepare("INSERT INTO messages VALUES (?, ?, ?)")
if err != nil {
return err
}
insertMsg := func(cid, tsCid cid.Cid, epoch abi.ChainEpoch) error {
key := cid.String()
tskey := tsCid.String()
if _, err := insertStmt.Exec(key, tskey, int64(epoch)); err != nil {
return err
}
return nil
}
rollback := func() {
if err := tx.Rollback(); err != nil {
fmt.Printf("ERROR: rollback: %s", err)
}
}
for i := 0; i < epochs; i++ {
epoch := abi.ChainEpoch(startHeight - int64(i))
ts, err := api.ChainGetTipSetByHeight(ctx, epoch, curTs.Key())
if err != nil {
rollback()
return err
}
tsCid, err := ts.Key().Cid()
if err != nil {
rollback()
return err
}
msgs, err := api.ChainGetMessagesInTipset(ctx, ts.Key())
if err != nil {
rollback()
return err
}
for _, msg := range msgs {
if err := insertMsg(msg.Cid, tsCid, epoch); err != nil {
rollback()
return err
}
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
},
}
var msgindexPruneCmd = &cli.Command{
Name: "prune",
Usage: "Prune the message index for messages included before a given epoch",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "from",
Usage: "height to start the prune; if negative it indicates epochs from current head",
},
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
Usage: "path to the repo",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
startHeight := int64(cctx.Int("from"))
if startHeight < 0 {
curTs, err := api.ChainHead(ctx)
if err != nil {
return err
}
startHeight += int64(curTs.Height())
if startHeight < 0 {
return xerrors.Errorf("bogus start height %d", startHeight)
}
}
basePath, err := homedir.Expand(cctx.String("repo"))
if err != nil {
return err
}
dbPath := path.Join(basePath, "sqlite", "msgindex.db")
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return err
}
defer func() {
err := db.Close()
if err != nil {
fmt.Printf("ERROR: closing db: %s", err)
}
}()
tx, err := db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec("DELETE FROM messages WHERE epoch < ?", startHeight); err != nil {
if err := tx.Rollback(); err != nil {
fmt.Printf("ERROR: rollback: %s", err)
}
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
},
}

View File

@ -26,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -308,7 +309,7 @@ to reduce the number of decode operations performed by caching the decoded objec
}
tsExec := consensus.NewTipSetExecutor(filcns.RewardFunc)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
sm, err := stmgr.NewStateManager(cs, tsExec, vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
if err != nil {
return err
}

View File

@ -12,6 +12,7 @@ import (
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -106,7 +107,7 @@ func (nd *Node) LoadSim(ctx context.Context, name string) (*Simulation, error) {
if err != nil {
return nil, xerrors.Errorf("failed to create upgrade schedule for simulation %s: %w", name, err)
}
sim.StateManager, err = stmgr.NewStateManager(nd.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), us, nil, nd.MetadataDS)
sim.StateManager, err = stmgr.NewStateManager(nd.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), us, nil, nd.MetadataDS, index.DummyMsgIndex)
if err != nil {
return nil, xerrors.Errorf("failed to create state manager for simulation %s: %w", name, err)
}
@ -125,7 +126,7 @@ func (nd *Node) CreateSim(ctx context.Context, name string, head *types.TipSet)
if err != nil {
return nil, err
}
sm, err := stmgr.NewStateManager(nd.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), filcns.DefaultUpgradeSchedule(), nil, nd.MetadataDS)
sm, err := stmgr.NewStateManager(nd.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), filcns.DefaultUpgradeSchedule(), nil, nd.MetadataDS, index.DummyMsgIndex)
if err != nil {
return nil, xerrors.Errorf("creating state manager: %w", err)
}

View File

@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/vm"
@ -201,7 +202,7 @@ func (sim *Simulation) SetUpgradeHeight(nv network.Version, epoch abi.ChainEpoch
if err != nil {
return err
}
sm, err := stmgr.NewStateManager(sim.Node.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), newUpgradeSchedule, nil, sim.Node.MetadataDS)
sm, err := stmgr.NewStateManager(sim.Node.Chainstore, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(mock.Verifier), newUpgradeSchedule, nil, sim.Node.MetadataDS, index.DummyMsgIndex)
if err != nil {
return err
}

View File

@ -35,6 +35,7 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
@ -540,7 +541,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
}
// TODO: We need to supply the actual beacon after v14
stm, err := stmgr.NewStateManager(cst, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
stm, err := stmgr.NewStateManager(cst, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
if err != nil {
return err
}

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
@ -108,7 +109,7 @@ func (d *Driver) ExecuteTipset(bs blockstore.Blockstore, ds ds.Batching, params
cs = store.NewChainStore(bs, bs, ds, filcns.Weight, nil)
tse = consensus.NewTipSetExecutor(filcns.RewardFunc)
sm, err = stmgr.NewStateManager(cs, tse, syscalls, filcns.DefaultUpgradeSchedule(), nil, ds)
sm, err = stmgr.NewStateManager(cs, tse, syscalls, filcns.DefaultUpgradeSchedule(), nil, ds, index.DummyMsgIndex)
)
if err != nil {
return nil, err

View File

@ -390,3 +390,11 @@
#DatabasePath = ""
[Index]
# EnableMsgIndex enables indexing of messages on chain.
#
# type: bool
# env var: LOTUS_INDEX_ENABLEMSGINDEX
#EnableMsgIndex = false

124
itests/msgindex_test.go Normal file
View File

@ -0,0 +1,124 @@
package itests
import (
"context"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
lapi "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/filecoin-project/lotus/node"
)
func init() {
// adjust those to make tests snappy
index.CoalesceMinDelay = time.Millisecond
index.CoalesceMaxDelay = 10 * time.Millisecond
index.CoalesceMergeInterval = time.Millisecond
}
func testMsgIndex(
t *testing.T,
name string,
run func(t *testing.T, makeMsgIndex func(cs *store.ChainStore) (index.MsgIndex, error)),
check func(t *testing.T, i int, msgIndex index.MsgIndex),
) {
// create the message indices in the test context
var mx sync.Mutex
var tmpDirs []string
var msgIndices []index.MsgIndex
t.Cleanup(func() {
for _, msgIndex := range msgIndices {
_ = msgIndex.Close()
}
for _, tmp := range tmpDirs {
_ = os.RemoveAll(tmp)
}
})
makeMsgIndex := func(cs *store.ChainStore) (index.MsgIndex, error) {
var err error
tmp := t.TempDir()
msgIndex, err := index.NewMsgIndex(context.Background(), tmp, cs)
if err == nil {
mx.Lock()
tmpDirs = append(tmpDirs, tmp)
msgIndices = append(msgIndices, msgIndex)
mx.Unlock()
}
return msgIndex, err
}
t.Run(name, func(t *testing.T) {
run(t, makeMsgIndex)
})
if len(msgIndices) == 0 {
t.Fatal("no message indices")
}
for i, msgIndex := range msgIndices {
check(t, i, msgIndex)
}
}
func checkNonEmptyMsgIndex(t *testing.T, _ int, msgIndex index.MsgIndex) {
mi, ok := msgIndex.(interface{ CountMessages() (int64, error) })
if !ok {
t.Fatal("index does not allow counting")
}
count, err := mi.CountMessages()
require.NoError(t, err)
require.NotEqual(t, count, 0)
}
func TestMsgIndex(t *testing.T) {
testMsgIndex(t, "testSearchMsg", testSearchMsgWithIndex, checkNonEmptyMsgIndex)
}
func testSearchMsgWithIndex(t *testing.T, makeMsgIndex func(cs *store.ChainStore) (index.MsgIndex, error)) {
// copy of apiSuite.testSearchMsgWith; needs to be copied or else CI is angry, tests are built individually there
ctx := context.Background()
full, _, ens := kit.EnsembleMinimal(t, kit.ConstructorOpts(node.Override(new(index.MsgIndex), makeMsgIndex)))
senderAddr, err := full.WalletDefaultAddress(ctx)
require.NoError(t, err)
msg := &types.Message{
From: senderAddr,
To: senderAddr,
Value: big.Zero(),
}
ens.BeginMining(100 * time.Millisecond)
sm, err := full.MpoolPushMessage(ctx, msg, nil)
require.NoError(t, err)
//stm: @CHAIN_STATE_WAIT_MSG_001
res, err := full.StateWaitMsg(ctx, sm.Cid(), 1, lapi.LookbackNoLimit, true)
require.NoError(t, err)
require.Equal(t, exitcode.Ok, res.Receipt.ExitCode, "message not successful")
//stm: @CHAIN_STATE_SEARCH_MSG_001
searchRes, err := full.StateSearchMsg(ctx, types.EmptyTSK, sm.Cid(), lapi.LookbackNoLimit, true)
require.NoError(t, err)
require.NotNil(t, searchRes)
require.Equalf(t, res.TipSet, searchRes.TipSet, "search ts: %s, different from wait ts: %s", searchRes.TipSet, res.TipSet)
}

View File

@ -24,6 +24,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/journal/alerting"
@ -390,6 +391,7 @@ func Test() Option {
Unset(new(*peermgr.PeerMgr)),
Override(new(beacon.Schedule), testing.RandomBeacon),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
Override(new(index.MsgIndex), modules.DummyMsgIndex),
)
}

View File

@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/lotus/chain/events"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/messagesigner"
@ -275,6 +276,10 @@ func ConfigFullNode(c interface{}) Option {
Override(new(full.EthEventAPI), &full.EthModuleDummy{}),
),
),
// enable message index for full node when configured by the user, otherwise use dummy.
If(cfg.Index.EnableMsgIndex, Override(new(index.MsgIndex), modules.MsgIndex)),
If(!cfg.Index.EnableMsgIndex, Override(new(index.MsgIndex), modules.DummyMsgIndex)),
)
}

View File

@ -461,6 +461,20 @@ Set to 0 to keep all mappings`,
Comment: ``,
},
{
Name: "Index",
Type: "IndexConfig",
Comment: ``,
},
},
"IndexConfig": []DocField{
{
Name: "EnableMsgIndex",
Type: "bool",
Comment: `EnableMsgIndex enables indexing of messages on chain.`,
},
},
"IndexProviderConfig": []DocField{
{

View File

@ -28,6 +28,7 @@ type FullNode struct {
Chainstore Chainstore
Cluster UserRaftConfig
Fevm FevmConfig
Index IndexConfig
}
// // Common
@ -726,3 +727,8 @@ type Events struct {
// Set a timeout for subscription clients
// Set upper bound on index size
}
type IndexConfig struct {
// EnableMsgIndex enables indexing of messages on chain.
EnableMsgIndex bool
}

View File

@ -21,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
@ -123,7 +124,7 @@ func NetworkName(mctx helpers.MetricsCtx,
ctx := helpers.LifecycleCtx(mctx, lc)
sm, err := stmgr.NewStateManager(cs, tsexec, syscalls, us, nil, nil)
sm, err := stmgr.NewStateManager(cs, tsexec, syscalls, us, nil, nil, index.DummyMsgIndex)
if err != nil {
return "", err
}

36
node/modules/msgindex.go Normal file
View File

@ -0,0 +1,36 @@
package modules
import (
"context"
"go.uber.org/fx"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo"
)
func MsgIndex(lc fx.Lifecycle, mctx helpers.MetricsCtx, cs *store.ChainStore, r repo.LockedRepo) (index.MsgIndex, error) {
basePath, err := r.SqlitePath()
if err != nil {
return nil, err
}
msgIndex, err := index.NewMsgIndex(helpers.LifecycleCtx(mctx, lc), basePath, cs)
if err != nil {
return nil, err
}
lc.Append(fx.Hook{
OnStop: func(_ context.Context) error {
return msgIndex.Close()
},
})
return msgIndex, nil
}
func DummyMsgIndex() index.MsgIndex {
return index.DummyMsgIndex
}

View File

@ -4,14 +4,15 @@ import (
"go.uber.org/fx"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)
func StateManager(lc fx.Lifecycle, cs *store.ChainStore, exec stmgr.Executor, sys vm.SyscallBuilder, us stmgr.UpgradeSchedule, b beacon.Schedule, metadataDs dtypes.MetadataDS) (*stmgr.StateManager, error) {
sm, err := stmgr.NewStateManager(cs, exec, sys, us, b, metadataDs)
func StateManager(lc fx.Lifecycle, cs *store.ChainStore, exec stmgr.Executor, sys vm.SyscallBuilder, us stmgr.UpgradeSchedule, b beacon.Schedule, metadataDs dtypes.MetadataDS, msgIndex index.MsgIndex) (*stmgr.StateManager, error) {
sm, err := stmgr.NewStateManager(cs, exec, sys, us, b, metadataDs, msgIndex)
if err != nil {
return nil, err
}