pull messagepool into separate package

This commit is contained in:
whyrusleeping 2019-12-01 16:11:43 -07:00
parent 8da3cc875e
commit b58e7344e8
8 changed files with 23 additions and 15 deletions

View File

@ -1,4 +1,4 @@
package chain package messagepool
import ( import (
"bytes" "bytes"
@ -13,6 +13,7 @@ import (
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query" "github.com/ipfs/go-datastore/query"
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
lps "github.com/whyrusleeping/pubsub" lps "github.com/whyrusleeping/pubsub"
"go.uber.org/multierr" "go.uber.org/multierr"
@ -20,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
@ -27,6 +29,8 @@ import (
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
) )
var log = logging.Logger("messagepool")
var ( var (
ErrMessageTooBig = errors.New("message too big") ErrMessageTooBig = errors.New("message too big")
@ -567,7 +571,7 @@ func (mp *MessagePool) Updates(ctx context.Context) (<-chan api.MpoolUpdate, err
sub := mp.changes.Sub(localUpdates) sub := mp.changes.Sub(localUpdates)
go func() { go func() {
defer mp.changes.Unsub(sub, localIncoming) defer mp.changes.Unsub(sub, chain.LocalIncoming)
for { for {
select { select {

View File

@ -1,4 +1,4 @@
package chain package messagepool
import ( import (
"testing" "testing"

View File

@ -7,6 +7,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -54,7 +55,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
} }
} }
func HandleIncomingMessages(ctx context.Context, mpool *chain.MessagePool, msub *pubsub.Subscription) { func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
for { for {
msg, err := msub.Next(ctx) msg, err := msub.Next(ctx)
if err != nil { if err != nil {

View File

@ -39,7 +39,7 @@ import (
var log = logging.Logger("chain") var log = logging.Logger("chain")
var localIncoming = "incoming" var LocalIncoming = "incoming"
type Syncer struct { type Syncer struct {
// The heaviest known tipset in the network. // The heaviest known tipset in the network.
@ -119,7 +119,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
} }
} }
syncer.incoming.Pub(fts.TipSet().Blocks(), localIncoming) syncer.incoming.Pub(fts.TipSet().Blocks(), LocalIncoming)
if from == syncer.self { if from == syncer.self {
// TODO: this is kindof a hack... // TODO: this is kindof a hack...
@ -152,11 +152,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) {
} }
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
sub := syncer.incoming.Sub(localIncoming) sub := syncer.incoming.Sub(LocalIncoming)
out := make(chan *types.BlockHeader, 10) out := make(chan *types.BlockHeader, 10)
go func() { go func() {
defer syncer.incoming.Unsub(sub, localIncoming) defer syncer.incoming.Unsub(sub, LocalIncoming)
for { for {
select { select {

View File

@ -24,6 +24,7 @@ import (
"github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/chain/gen" "github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/market" "github.com/filecoin-project/lotus/chain/market"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/metrics" "github.com/filecoin-project/lotus/chain/metrics"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
@ -202,7 +203,7 @@ func Online() Option {
// Filecoin services // Filecoin services
Override(new(*chain.Syncer), modules.NewSyncer), Override(new(*chain.Syncer), modules.NewSyncer),
Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient), Override(new(*blocksync.BlockSync), blocksync.NewBlockSyncClient),
Override(new(*chain.MessagePool), modules.MessagePool), Override(new(*messagepool.MessagePool), modules.MessagePool),
Override(new(modules.Genesis), modules.ErrorGenesis), Override(new(modules.Genesis), modules.ErrorGenesis),
Override(SetGenesisKey, modules.SetGenesis), Override(SetGenesisKey, modules.SetGenesis),

View File

@ -7,8 +7,8 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
) )
@ -17,7 +17,7 @@ type MpoolAPI struct {
WalletAPI WalletAPI
Mpool *chain.MessagePool Mpool *messagepool.MessagePool
} }
func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) { func (a *MpoolAPI) MpoolPending(ctx context.Context, ts *types.TipSet) ([]*types.SignedMessage, error) {

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/blocksync"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -41,9 +42,9 @@ func ChainExchange(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt
return exch return exch
} }
func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*chain.MessagePool, error) { func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds dtypes.MetadataDS) (*messagepool.MessagePool, error) {
mpp := chain.NewMpoolProvider(sm, ps) mpp := messagepool.NewMpoolProvider(sm, ps)
mp, err := chain.NewMessagePool(mpp, ds) mp, err := messagepool.NewMessagePool(mpp, ds)
if err != nil { if err != nil {
return nil, xerrors.Errorf("constructing mpool: %w", err) return nil, xerrors.Errorf("constructing mpool: %w", err)
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/blocksync"
"github.com/filecoin-project/lotus/chain/deals" "github.com/filecoin-project/lotus/chain/deals"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/sub" "github.com/filecoin-project/lotus/chain/sub"
"github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/hello"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
@ -53,7 +54,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubs
go sub.HandleIncomingBlocks(ctx, blocksub, s) go sub.HandleIncomingBlocks(ctx, blocksub, s)
} }
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *chain.MessagePool) { func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, pubsub *pubsub.PubSub, mpool *messagepool.MessagePool) {
ctx := helpers.LifecycleCtx(mctx, lc) ctx := helpers.LifecycleCtx(mctx, lc)
msgsub, err := pubsub.Subscribe("/fil/messages") msgsub, err := pubsub.Subscribe("/fil/messages")