package messagepool import ( "context" "time" "github.com/ipfs/go-cid" pubsub "github.com/libp2p/go-libp2p-pubsub" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" ) var ( HeadChangeCoalesceMinDelay = 2 * time.Second HeadChangeCoalesceMaxDelay = 6 * time.Second HeadChangeCoalesceMergeInterval = time.Second ) type Provider interface { SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error) PubSubPublish(string, []byte) error GetActorAfter(address.Address, *types.TipSet) (*types.Actor, error) StateAccountKeyAtFinality(context.Context, address.Address, *types.TipSet) (address.Address, error) StateNetworkVersion(context.Context, abi.ChainEpoch) network.Version MessagesForBlock(context.Context, *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) MessagesForTipset(context.Context, *types.TipSet) ([]types.ChainMsg, error) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) IsLite() bool } type mpoolProvider struct { sm *stmgr.StateManager ps *pubsub.PubSub lite MpoolNonceAPI } var _ Provider = (*mpoolProvider)(nil) func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider { return &mpoolProvider{sm: sm, ps: ps} } func NewProviderLite(sm *stmgr.StateManager, ps *pubsub.PubSub, noncer MpoolNonceAPI) Provider { return &mpoolProvider{sm: sm, ps: ps, lite: noncer} } func (mpp *mpoolProvider) IsLite() bool { return mpp.lite != nil } func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet { mpp.sm.ChainStore().SubscribeHeadChanges( store.WrapHeadChangeCoalescer( cb, HeadChangeCoalesceMinDelay, HeadChangeCoalesceMaxDelay, HeadChangeCoalesceMergeInterval, )) return mpp.sm.ChainStore().GetHeaviestTipSet() } func (mpp *mpoolProvider) PutMessage(ctx context.Context, m types.ChainMsg) (cid.Cid, error) { return mpp.sm.ChainStore().PutMessage(ctx, m) } func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error { return mpp.ps.Publish(k, v) //nolint } func (mpp *mpoolProvider) GetActorAfter(addr address.Address, ts *types.TipSet) (*types.Actor, error) { if mpp.IsLite() { n, err := mpp.lite.GetNonce(context.TODO(), addr, ts.Key()) if err != nil { return nil, xerrors.Errorf("getting nonce over lite: %w", err) } a, err := mpp.lite.GetActor(context.TODO(), addr, ts.Key()) if err != nil { return nil, xerrors.Errorf("getting actor over lite: %w", err) } a.Nonce = n return a, nil } stcid, _, err := mpp.sm.TipSetState(context.TODO(), ts) if err != nil { return nil, xerrors.Errorf("computing tipset state for GetActor: %w", err) } st, err := mpp.sm.StateTree(stcid) if err != nil { return nil, xerrors.Errorf("failed to load state tree: %w", err) } return st.GetActor(addr) } func (mpp *mpoolProvider) StateAccountKeyAtFinality(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) { return mpp.sm.ResolveToKeyAddressAtFinality(ctx, addr, ts) } func (mpp *mpoolProvider) StateNetworkVersion(ctx context.Context, height abi.ChainEpoch) network.Version { return mpp.sm.GetNetworkVersion(ctx, height) } func (mpp *mpoolProvider) MessagesForBlock(ctx context.Context, h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) { return mpp.sm.ChainStore().MessagesForBlock(ctx, h) } func (mpp *mpoolProvider) MessagesForTipset(ctx context.Context, ts *types.TipSet) ([]types.ChainMsg, error) { return mpp.sm.ChainStore().MessagesForTipset(ctx, ts) } func (mpp *mpoolProvider) LoadTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) { return mpp.sm.ChainStore().LoadTipSet(ctx, tsk) } func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) { baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts) if err != nil { return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err) } return baseFee, nil }