refactor: move publish msg config inside deals config

This commit is contained in:
Dirk McCormick 2021-02-01 10:23:05 +01:00
parent adac340f3f
commit 987f41011a
5 changed files with 72 additions and 54 deletions

View File

@ -25,7 +25,6 @@ import (
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
ipld "github.com/ipfs/go-ipld-format"
@ -102,8 +101,8 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio
Full: 0,
Opts: node.Override(
new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, &config.PublishMsgConfig{
PublishPeriod: config.Duration(publishPeriod),
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: maxDealsPerMsg,
})),
Preseal: PresealGenesis,

View File

@ -2,6 +2,7 @@ package storageadapter
import (
"context"
"fmt"
"strings"
"sync"
"time"
@ -52,9 +53,39 @@ type DealPublisher struct {
publishPeriodStart time.Time
}
// A deal that is queued to be published
type pendingDeal struct {
ctx context.Context
deal market2.ClientDealProposal
Result chan publishResult
}
// The result of publishing a deal
type publishResult struct {
msgCid cid.Cid
err error
}
func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal {
return &pendingDeal{
ctx: ctx,
deal: deal,
Result: make(chan publishResult),
}
}
type PublishMsgConfig struct {
// The amount of time to wait for more deals to arrive before
// publishing
Period time.Duration
// The maximum number of deals to include in a single PublishStorageDeals
// message
MaxDealsPerMsg uint64
}
func NewDealPublisher(
feeConfig *config.MinerFeeConfig,
publishMsgCfg *config.PublishMsgConfig,
publishMsgCfg PublishMsgConfig,
) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
maxFee := abi.NewTokenAmount(0)
@ -75,22 +106,16 @@ func NewDealPublisher(
func newDealPublisher(
dpapi dealPublisherAPI,
publishMsgCfg *config.PublishMsgConfig,
publishMsgCfg PublishMsgConfig,
publishSpec *api.MessageSendSpec,
) *DealPublisher {
publishPeriod := time.Duration(0)
maxDealsPerMsg := uint64(1)
if publishMsgCfg != nil {
publishPeriod = time.Duration(publishMsgCfg.PublishPeriod)
maxDealsPerMsg = publishMsgCfg.MaxDealsPerMsg
}
ctx, cancel := context.WithCancel(context.Background())
return &DealPublisher{
api: dpapi,
ctx: ctx,
Shutdown: cancel,
maxDealsPerPublishMsg: maxDealsPerMsg,
publishPeriod: publishPeriod,
maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg,
publishPeriod: publishMsgCfg.Period,
publishSpec: publishSpec,
}
}
@ -227,6 +252,16 @@ func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal)
log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals))
provider := deals[0].Proposal.Provider
for _, dl := range deals {
if dl.Proposal.Provider != provider {
msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) +
"not all deals are for same provider: " +
fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) +
fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider)
return cid.Undef, xerrors.Errorf(msg)
}
}
mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK)
if err != nil {
return cid.Undef, err
@ -273,22 +308,3 @@ func (p *DealPublisher) filterCancelledDeals() {
}
p.pending = p.pending[:i]
}
type publishResult struct {
msgCid cid.Cid
err error
}
type pendingDeal struct {
ctx context.Context
deal market2.ClientDealProposal
Result chan publishResult
}
func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal {
return &pendingDeal{
ctx: ctx,
deal: deal,
Result: make(chan publishResult),
}
}

View File

@ -22,7 +22,6 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/config"
)
func TestDealPublisher(t *testing.T) {
@ -70,6 +69,14 @@ func TestDealPublisher(t *testing.T) {
expiredWithinPublishPeriod: 2,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1},
}, {
name: "zero config",
publishPeriod: 0,
maxDealsPerMsg: 0,
dealCountWithinPublishPeriod: 2,
expiredWithinPublishPeriod: 0,
dealCountAfterPublishPeriod: 2,
expectedDealsPerMsg: []int{1, 1, 1, 1},
}}
for _, tc := range testCases {
@ -82,8 +89,8 @@ func TestDealPublisher(t *testing.T) {
dpapi := newDPAPI(t, worker)
// Create a deal publisher
dp := newDealPublisher(dpapi, &config.PublishMsgConfig{
PublishPeriod: config.Duration(tc.publishPeriod),
dp := newDealPublisher(dpapi, PublishMsgConfig{
Period: tc.publishPeriod,
MaxDealsPerMsg: tc.maxDealsPerMsg,
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})

View File

@ -377,7 +377,7 @@ func Online() Option {
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds),
Override(HandleRetrievalKey, modules.HandleRetrieval),
@ -520,7 +520,10 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))),
),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, &cfg.PublishMsg)),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{
Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod),
MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg,
})),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)),
Override(new(sectorstorage.SealerConfig), cfg.Storage),
@ -648,6 +651,6 @@ func Test() Option {
Unset(RunPeerMgrKey),
Unset(new(*peermgr.PeerMgr)),
Override(new(beacon.Schedule), testing.RandomBeacon),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
)
}

View File

@ -33,7 +33,6 @@ type StorageMiner struct {
Common
Dealmaking DealmakingConfig
PublishMsg PublishMsgConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
@ -49,20 +48,17 @@ type DealmakingConfig struct {
ConsiderUnverifiedStorageDeals bool
PieceCidBlocklist []cid.Cid
ExpectedSealDuration Duration
// The amount of time to wait for more deals to arrive before
// publishing
PublishMsgPeriod Duration
// The maximum number of deals to include in a single PublishStorageDeals
// message
MaxDealsPerPublishMsg uint64
Filter string
RetrievalFilter string
}
type PublishMsgConfig struct {
// The amount of time to wait for more deals to arrive before
// publishing
PublishPeriod Duration
// The maximum number of deals to include in a single PublishStorageDeals
// message
MaxDealsPerMsg uint64
}
type SealingConfig struct {
// 0 = no limit
MaxWaitDealsSectors uint64
@ -216,12 +212,9 @@ func DefaultStorageMiner() *StorageMiner {
ConsiderUnverifiedStorageDeals: true,
PieceCidBlocklist: []cid.Cid{},
// TODO: It'd be nice to set this based on sector size
ExpectedSealDuration: Duration(time.Hour * 24),
},
PublishMsg: PublishMsgConfig{
PublishPeriod: Duration(time.Hour),
MaxDealsPerMsg: 8,
ExpectedSealDuration: Duration(time.Hour * 24),
PublishMsgPeriod: Duration(time.Hour),
MaxDealsPerPublishMsg: 8,
},
Fees: MinerFeeConfig{