hook the index into the rest of lotus
This commit is contained in:
parent
d032e4f119
commit
f76babceec
@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/consensus"
|
"github.com/filecoin-project/lotus/chain/consensus"
|
||||||
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
||||||
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
|
genesis2 "github.com/filecoin-project/lotus/chain/gen/genesis"
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
"github.com/filecoin-project/lotus/chain/rand"
|
"github.com/filecoin-project/lotus/chain/rand"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
@ -256,7 +257,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
|
|||||||
//return nil, xerrors.Errorf("creating drand beacon: %w", err)
|
//return nil, xerrors.Errorf("creating drand beacon: %w", err)
|
||||||
//}
|
//}
|
||||||
|
|
||||||
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds)
|
sm, err := stmgr.NewStateManager(cs, consensus.NewTipSetExecutor(filcns.RewardFunc), sys, us, beac, ds, index.DummyMsgIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("initing stmgr: %w", err)
|
return nil, xerrors.Errorf("initing stmgr: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -30,3 +30,15 @@ type MsgIndex interface {
|
|||||||
// Close closes the index
|
// Close closes the index
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type dummyMsgIndex struct{}
|
||||||
|
|
||||||
|
func (_ dummyMsgIndex) GetMsgInfo(ctx context.Context, m cid.Cid) (MsgInfo, error) {
|
||||||
|
return MsgInfo{}, ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
func (_ dummyMsgIndex) Close() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var DummyMsgIndex MsgIndex = dummyMsgIndex{}
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
@ -145,7 +146,20 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet
|
|||||||
return head, r, foundMsg, nil
|
return head, r, foundMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
fts, r, foundMsg, err := sm.searchBackForMsg(ctx, head, msg, lookbackLimit, allowReplaced)
|
fts, r, foundMsg, err := sm.searchForIndexedMsg(ctx, mcid, msg)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case err == nil:
|
||||||
|
return fts, r, foundMsg, nil
|
||||||
|
|
||||||
|
case errors.Is(err, index.ErrNotFound):
|
||||||
|
// ok for the index to have incomplete data
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Warnf("error searching message index: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fts, r, foundMsg, err = sm.searchBackForMsg(ctx, head, msg, lookbackLimit, allowReplaced)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("failed to look back through chain for message %s", mcid)
|
log.Warnf("failed to look back through chain for message %s", mcid)
|
||||||
@ -159,6 +173,21 @@ func (sm *StateManager) SearchForMessage(ctx context.Context, head *types.TipSet
|
|||||||
return fts, r, foundMsg, nil
|
return fts, r, foundMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sm *StateManager) searchForIndexedMsg(ctx context.Context, mcid cid.Cid, m types.ChainMsg) (*types.TipSet, *types.MessageReceipt, cid.Cid, error) {
|
||||||
|
minfo, err := sm.msgIndex.GetMsgInfo(ctx, mcid)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ts, err := sm.cs.GetTipSetByCid(ctx, minfo.TipSet)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, cid.Undef, err
|
||||||
|
}
|
||||||
|
|
||||||
|
r, foundMsg, err := sm.tipsetExecutedMessage(ctx, ts, mcid, m.VMMessage(), false)
|
||||||
|
return ts, r, foundMsg, err
|
||||||
|
}
|
||||||
|
|
||||||
// searchBackForMsg searches up to limit tipsets backwards from the given
|
// searchBackForMsg searches up to limit tipsets backwards from the given
|
||||||
// tipset for a message receipt.
|
// tipset for a message receipt.
|
||||||
// If limit is
|
// If limit is
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/paych"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||||
"github.com/filecoin-project/lotus/chain/beacon"
|
"github.com/filecoin-project/lotus/chain/beacon"
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
"github.com/filecoin-project/lotus/chain/rand"
|
"github.com/filecoin-project/lotus/chain/rand"
|
||||||
"github.com/filecoin-project/lotus/chain/state"
|
"github.com/filecoin-project/lotus/chain/state"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
@ -135,6 +136,8 @@ type StateManager struct {
|
|||||||
tsExec Executor
|
tsExec Executor
|
||||||
tsExecMonitor ExecMonitor
|
tsExecMonitor ExecMonitor
|
||||||
beacon beacon.Schedule
|
beacon beacon.Schedule
|
||||||
|
|
||||||
|
msgIndex index.MsgIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Caches a single state tree
|
// Caches a single state tree
|
||||||
@ -143,7 +146,7 @@ type treeCache struct {
|
|||||||
tree *state.StateTree
|
tree *state.StateTree
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule, metadataDs dstore.Batching) (*StateManager, error) {
|
func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, beacon beacon.Schedule, metadataDs dstore.Batching, msgIndex index.MsgIndex) (*StateManager, error) {
|
||||||
// If we have upgrades, make sure they're in-order and make sense.
|
// If we have upgrades, make sure they're in-order and make sense.
|
||||||
if err := us.Validate(); err != nil {
|
if err := us.Validate(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -198,11 +201,12 @@ func NewStateManager(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder,
|
|||||||
tree: nil,
|
tree: nil,
|
||||||
},
|
},
|
||||||
compWait: make(map[string]chan struct{}),
|
compWait: make(map[string]chan struct{}),
|
||||||
|
msgIndex: msgIndex,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching) (*StateManager, error) {
|
func NewStateManagerWithUpgradeScheduleAndMonitor(cs *store.ChainStore, exec Executor, sys vm.SyscallBuilder, us UpgradeSchedule, b beacon.Schedule, em ExecMonitor, metadataDs dstore.Batching, msgIndex index.MsgIndex) (*StateManager, error) {
|
||||||
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs)
|
sm, err := NewStateManager(cs, exec, sys, us, b, metadataDs, msgIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/consensus"
|
"github.com/filecoin-project/lotus/chain/consensus"
|
||||||
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
@ -539,7 +540,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: We need to supply the actual beacon after v14
|
// TODO: We need to supply the actual beacon after v14
|
||||||
stm, err := stmgr.NewStateManager(cst, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds)
|
stm, err := stmgr.NewStateManager(cst, consensus.NewTipSetExecutor(filcns.RewardFunc), vm.Syscalls(ffiwrapper.ProofVerifier), filcns.DefaultUpgradeSchedule(), nil, mds, index.DummyMsgIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/events"
|
"github.com/filecoin-project/lotus/chain/events"
|
||||||
"github.com/filecoin-project/lotus/chain/exchange"
|
"github.com/filecoin-project/lotus/chain/exchange"
|
||||||
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
"github.com/filecoin-project/lotus/chain/market"
|
"github.com/filecoin-project/lotus/chain/market"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/messagesigner"
|
"github.com/filecoin-project/lotus/chain/messagesigner"
|
||||||
@ -79,6 +80,7 @@ var ChainNode = Options(
|
|||||||
Override(new(stmgr.Executor), consensus.NewTipSetExecutor(filcns.RewardFunc)),
|
Override(new(stmgr.Executor), consensus.NewTipSetExecutor(filcns.RewardFunc)),
|
||||||
Override(new(consensus.Consensus), filcns.NewFilecoinExpectedConsensus),
|
Override(new(consensus.Consensus), filcns.NewFilecoinExpectedConsensus),
|
||||||
Override(new(*store.ChainStore), modules.ChainStore),
|
Override(new(*store.ChainStore), modules.ChainStore),
|
||||||
|
Override(new(index.MsgIndex), modules.MsgIndex),
|
||||||
Override(new(*stmgr.StateManager), modules.StateManager),
|
Override(new(*stmgr.StateManager), modules.StateManager),
|
||||||
Override(new(dtypes.ChainBitswap), modules.ChainBitswap),
|
Override(new(dtypes.ChainBitswap), modules.ChainBitswap),
|
||||||
Override(new(dtypes.ChainBlockService), modules.ChainBlockService), // todo: unused
|
Override(new(dtypes.ChainBlockService), modules.ChainBlockService), // todo: unused
|
||||||
|
@ -21,6 +21,7 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
"github.com/filecoin-project/lotus/chain/consensus/filcns"
|
||||||
"github.com/filecoin-project/lotus/chain/exchange"
|
"github.com/filecoin-project/lotus/chain/exchange"
|
||||||
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
"github.com/filecoin-project/lotus/chain/gen/slashfilter"
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
@ -123,7 +124,7 @@ func NetworkName(mctx helpers.MetricsCtx,
|
|||||||
|
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
sm, err := stmgr.NewStateManager(cs, tsexec, syscalls, us, nil, nil)
|
sm, err := stmgr.NewStateManager(cs, tsexec, syscalls, us, nil, nil, index.DummyMsgIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
31
node/modules/msgindex.go
Normal file
31
node/modules/msgindex.go
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
package modules
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"go.uber.org/fx"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
|
"github.com/filecoin-project/lotus/node/repo"
|
||||||
|
)
|
||||||
|
|
||||||
|
func MsgIndex(lc fx.Lifecycle, cs *store.ChainStore, r repo.LockedRepo) (index.MsgIndex, error) {
|
||||||
|
basePath, err := r.SqlitePath()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
msgIndex, err := index.NewMsgIndex(basePath, cs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lc.Append(fx.Hook{
|
||||||
|
OnStop: func(_ context.Context) error {
|
||||||
|
return msgIndex.Close()
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
return msgIndex, nil
|
||||||
|
}
|
@ -4,14 +4,15 @@ import (
|
|||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/beacon"
|
"github.com/filecoin-project/lotus/chain/beacon"
|
||||||
|
"github.com/filecoin-project/lotus/chain/index"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
)
|
)
|
||||||
|
|
||||||
func StateManager(lc fx.Lifecycle, cs *store.ChainStore, exec stmgr.Executor, sys vm.SyscallBuilder, us stmgr.UpgradeSchedule, b beacon.Schedule, metadataDs dtypes.MetadataDS) (*stmgr.StateManager, error) {
|
func StateManager(lc fx.Lifecycle, cs *store.ChainStore, exec stmgr.Executor, sys vm.SyscallBuilder, us stmgr.UpgradeSchedule, b beacon.Schedule, metadataDs dtypes.MetadataDS, msgIndex index.MsgIndex) (*stmgr.StateManager, error) {
|
||||||
sm, err := stmgr.NewStateManager(cs, exec, sys, us, b, metadataDs)
|
sm, err := stmgr.NewStateManager(cs, exec, sys, us, b, metadataDs, msgIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user