refactor messagepool provider out of the main messagepool implementation
This commit is contained in:
parent
45cac72f7a
commit
42951d05a5
@ -18,7 +18,6 @@ import (
|
|||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
"github.com/ipfs/go-datastore/query"
|
"github.com/ipfs/go-datastore/query"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
||||||
lps "github.com/whyrusleeping/pubsub"
|
lps "github.com/whyrusleeping/pubsub"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -27,7 +26,6 @@ import (
|
|||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
"github.com/filecoin-project/lotus/chain/vm"
|
"github.com/filecoin-project/lotus/chain/vm"
|
||||||
"github.com/filecoin-project/lotus/lib/sigs"
|
"github.com/filecoin-project/lotus/lib/sigs"
|
||||||
@ -147,69 +145,6 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
|
|||||||
return !has, nil
|
return !has, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type Provider interface {
|
|
||||||
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
|
||||||
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
|
||||||
PubSubPublish(string, []byte) error
|
|
||||||
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
|
|
||||||
StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
|
||||||
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
|
||||||
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
|
|
||||||
LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
|
|
||||||
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type mpoolProvider struct {
|
|
||||||
sm *stmgr.StateManager
|
|
||||||
ps *pubsub.PubSub
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
|
||||||
return &mpoolProvider{sm: sm, ps: ps}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
|
||||||
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
|
|
||||||
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) PutMessage(m types.ChainMsg) (cid.Cid, error) {
|
|
||||||
return mpp.sm.ChainStore().PutMessage(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
|
||||||
return mpp.ps.Publish(k, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
|
||||||
var act types.Actor
|
|
||||||
return &act, mpp.sm.WithParentState(ts, mpp.sm.WithActor(addr, stmgr.GetActor(&act)))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
|
||||||
return mpp.sm.ResolveToKeyAddress(ctx, addr, ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
|
||||||
return mpp.sm.ChainStore().MessagesForBlock(h)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) {
|
|
||||||
return mpp.sm.ChainStore().MessagesForTipset(ts)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
|
|
||||||
return mpp.sm.ChainStore().LoadTipSet(tsk)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) {
|
|
||||||
baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts)
|
|
||||||
if err != nil {
|
|
||||||
return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err)
|
|
||||||
}
|
|
||||||
return baseFee, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
|
||||||
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
|
||||||
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
|
||||||
|
76
chain/messagepool/provider.go
Normal file
76
chain/messagepool/provider.go
Normal file
@ -0,0 +1,76 @@
|
|||||||
|
package messagepool
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Provider interface {
|
||||||
|
SubscribeHeadChanges(func(rev, app []*types.TipSet) error) *types.TipSet
|
||||||
|
PutMessage(m types.ChainMsg) (cid.Cid, error)
|
||||||
|
PubSubPublish(string, []byte) error
|
||||||
|
StateGetActor(address.Address, *types.TipSet) (*types.Actor, error)
|
||||||
|
StateAccountKey(context.Context, address.Address, *types.TipSet) (address.Address, error)
|
||||||
|
MessagesForBlock(*types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error)
|
||||||
|
MessagesForTipset(*types.TipSet) ([]types.ChainMsg, error)
|
||||||
|
LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error)
|
||||||
|
ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type mpoolProvider struct {
|
||||||
|
sm *stmgr.StateManager
|
||||||
|
ps *pubsub.PubSub
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProvider(sm *stmgr.StateManager, ps *pubsub.PubSub) Provider {
|
||||||
|
return &mpoolProvider{sm: sm, ps: ps}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) SubscribeHeadChanges(cb func(rev, app []*types.TipSet) error) *types.TipSet {
|
||||||
|
mpp.sm.ChainStore().SubscribeHeadChanges(cb)
|
||||||
|
return mpp.sm.ChainStore().GetHeaviestTipSet()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) PutMessage(m types.ChainMsg) (cid.Cid, error) {
|
||||||
|
return mpp.sm.ChainStore().PutMessage(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) PubSubPublish(k string, v []byte) error {
|
||||||
|
return mpp.ps.Publish(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) StateGetActor(addr address.Address, ts *types.TipSet) (*types.Actor, error) {
|
||||||
|
var act types.Actor
|
||||||
|
return &act, mpp.sm.WithParentState(ts, mpp.sm.WithActor(addr, stmgr.GetActor(&act)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) StateAccountKey(ctx context.Context, addr address.Address, ts *types.TipSet) (address.Address, error) {
|
||||||
|
return mpp.sm.ResolveToKeyAddress(ctx, addr, ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) MessagesForBlock(h *types.BlockHeader) ([]*types.Message, []*types.SignedMessage, error) {
|
||||||
|
return mpp.sm.ChainStore().MessagesForBlock(h)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) MessagesForTipset(ts *types.TipSet) ([]types.ChainMsg, error) {
|
||||||
|
return mpp.sm.ChainStore().MessagesForTipset(ts)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) LoadTipSet(tsk types.TipSetKey) (*types.TipSet, error) {
|
||||||
|
return mpp.sm.ChainStore().LoadTipSet(tsk)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mpp *mpoolProvider) ChainComputeBaseFee(ctx context.Context, ts *types.TipSet) (types.BigInt, error) {
|
||||||
|
baseFee, err := mpp.sm.ChainStore().ComputeBaseFee(ctx, ts)
|
||||||
|
if err != nil {
|
||||||
|
return types.NewInt(0), xerrors.Errorf("computing base fee at %s: %w", ts, err)
|
||||||
|
}
|
||||||
|
return baseFee, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user