diff --git a/api/test/deals.go b/api/test/deals.go index 10b761a21..60368027f 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -25,7 +25,6 @@ import ( sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/impl" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" ipld "github.com/ipfs/go-ipld-format" @@ -102,8 +101,8 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio Full: 0, Opts: node.Override( new(*storageadapter.DealPublisher), - storageadapter.NewDealPublisher(nil, &config.PublishMsgConfig{ - PublishPeriod: config.Duration(publishPeriod), + storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{ + Period: publishPeriod, MaxDealsPerMsg: maxDealsPerMsg, })), Preseal: PresealGenesis, diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index 0d1e61655..e197479d6 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -2,6 +2,7 @@ package storageadapter import ( "context" + "fmt" "strings" "sync" "time" @@ -52,9 +53,39 @@ type DealPublisher struct { publishPeriodStart time.Time } +// A deal that is queued to be published +type pendingDeal struct { + ctx context.Context + deal market2.ClientDealProposal + Result chan publishResult +} + +// The result of publishing a deal +type publishResult struct { + msgCid cid.Cid + err error +} + +func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { + return &pendingDeal{ + ctx: ctx, + deal: deal, + Result: make(chan publishResult), + } +} + +type PublishMsgConfig struct { + // The amount of time to wait for more deals to arrive before + // publishing + Period time.Duration + // The maximum number of deals to include in a single PublishStorageDeals + // message + MaxDealsPerMsg uint64 +} + func NewDealPublisher( feeConfig *config.MinerFeeConfig, - publishMsgCfg *config.PublishMsgConfig, + publishMsgCfg PublishMsgConfig, ) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { maxFee := abi.NewTokenAmount(0) @@ -75,22 +106,16 @@ func NewDealPublisher( func newDealPublisher( dpapi dealPublisherAPI, - publishMsgCfg *config.PublishMsgConfig, + publishMsgCfg PublishMsgConfig, publishSpec *api.MessageSendSpec, ) *DealPublisher { - publishPeriod := time.Duration(0) - maxDealsPerMsg := uint64(1) - if publishMsgCfg != nil { - publishPeriod = time.Duration(publishMsgCfg.PublishPeriod) - maxDealsPerMsg = publishMsgCfg.MaxDealsPerMsg - } ctx, cancel := context.WithCancel(context.Background()) return &DealPublisher{ api: dpapi, ctx: ctx, Shutdown: cancel, - maxDealsPerPublishMsg: maxDealsPerMsg, - publishPeriod: publishPeriod, + maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg, + publishPeriod: publishMsgCfg.Period, publishSpec: publishSpec, } } @@ -227,6 +252,16 @@ func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals)) provider := deals[0].Proposal.Provider + for _, dl := range deals { + if dl.Proposal.Provider != provider { + msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) + + "not all deals are for same provider: " + + fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) + + fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider) + return cid.Undef, xerrors.Errorf(msg) + } + } + mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK) if err != nil { return cid.Undef, err @@ -273,22 +308,3 @@ func (p *DealPublisher) filterCancelledDeals() { } p.pending = p.pending[:i] } - -type publishResult struct { - msgCid cid.Cid - err error -} - -type pendingDeal struct { - ctx context.Context - deal market2.ClientDealProposal - Result chan publishResult -} - -func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { - return &pendingDeal{ - ctx: ctx, - deal: deal, - Result: make(chan publishResult), - } -} diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index 77e8e65f8..bf9fac6a6 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -22,7 +22,6 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/node/config" ) func TestDealPublisher(t *testing.T) { @@ -70,6 +69,14 @@ func TestDealPublisher(t *testing.T) { expiredWithinPublishPeriod: 2, dealCountAfterPublishPeriod: 1, expectedDealsPerMsg: []int{2, 1}, + }, { + name: "zero config", + publishPeriod: 0, + maxDealsPerMsg: 0, + dealCountWithinPublishPeriod: 2, + expiredWithinPublishPeriod: 0, + dealCountAfterPublishPeriod: 2, + expectedDealsPerMsg: []int{1, 1, 1, 1}, }} for _, tc := range testCases { @@ -82,8 +89,8 @@ func TestDealPublisher(t *testing.T) { dpapi := newDPAPI(t, worker) // Create a deal publisher - dp := newDealPublisher(dpapi, &config.PublishMsgConfig{ - PublishPeriod: config.Duration(tc.publishPeriod), + dp := newDealPublisher(dpapi, PublishMsgConfig{ + Period: tc.publishPeriod, MaxDealsPerMsg: tc.maxDealsPerMsg, }, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)}) diff --git a/node/builder.go b/node/builder.go index 440ab6df3..332550d7c 100644 --- a/node/builder.go +++ b/node/builder.go @@ -377,7 +377,7 @@ func Online() Option { Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(storagemarket.StorageProvider), modules.StorageProvider), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds), Override(HandleRetrievalKey, modules.HandleRetrieval), @@ -520,7 +520,10 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, &cfg.PublishMsg)), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{ + Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod), + MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg, + })), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)), Override(new(sectorstorage.SealerConfig), cfg.Storage), @@ -648,6 +651,6 @@ func Test() Option { Unset(RunPeerMgrKey), Unset(new(*peermgr.PeerMgr)), Override(new(beacon.Schedule), testing.RandomBeacon), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), ) } diff --git a/node/config/def.go b/node/config/def.go index a8eaa3066..13587e315 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -33,7 +33,6 @@ type StorageMiner struct { Common Dealmaking DealmakingConfig - PublishMsg PublishMsgConfig Sealing SealingConfig Storage sectorstorage.SealerConfig Fees MinerFeeConfig @@ -49,20 +48,17 @@ type DealmakingConfig struct { ConsiderUnverifiedStorageDeals bool PieceCidBlocklist []cid.Cid ExpectedSealDuration Duration + // The amount of time to wait for more deals to arrive before + // publishing + PublishMsgPeriod Duration + // The maximum number of deals to include in a single PublishStorageDeals + // message + MaxDealsPerPublishMsg uint64 Filter string RetrievalFilter string } -type PublishMsgConfig struct { - // The amount of time to wait for more deals to arrive before - // publishing - PublishPeriod Duration - // The maximum number of deals to include in a single PublishStorageDeals - // message - MaxDealsPerMsg uint64 -} - type SealingConfig struct { // 0 = no limit MaxWaitDealsSectors uint64 @@ -216,12 +212,9 @@ func DefaultStorageMiner() *StorageMiner { ConsiderUnverifiedStorageDeals: true, PieceCidBlocklist: []cid.Cid{}, // TODO: It'd be nice to set this based on sector size - ExpectedSealDuration: Duration(time.Hour * 24), - }, - - PublishMsg: PublishMsgConfig{ - PublishPeriod: Duration(time.Hour), - MaxDealsPerMsg: 8, + ExpectedSealDuration: Duration(time.Hour * 24), + PublishMsgPeriod: Duration(time.Hour), + MaxDealsPerPublishMsg: 8, }, Fees: MinerFeeConfig{