From 05bf177686703b706eb7fe897a39c3a293122ca6 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 8 Jan 2021 16:28:38 +0100 Subject: [PATCH 1/3] feat: batch publish deal messages --- markets/storageadapter/dealpublisher.go | 124 ++++++------------- markets/storageadapter/dealpublisher_test.go | 83 ++++--------- node/builder.go | 9 +- node/config/def.go | 15 +++ 4 files changed, 81 insertions(+), 150 deletions(-) diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index fcc88653e..5be6a494c 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -2,7 +2,6 @@ package storageadapter import ( "context" - "fmt" "strings" "sync" "time" @@ -25,7 +24,6 @@ import ( ) type dealPublisherAPI interface { - ChainHead(context.Context) (*types.TipSet, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) } @@ -54,27 +52,6 @@ type DealPublisher struct { publishPeriodStart time.Time } -// A deal that is queued to be published -type pendingDeal struct { - ctx context.Context - deal market2.ClientDealProposal - Result chan publishResult -} - -// The result of publishing a deal -type publishResult struct { - msgCid cid.Cid - err error -} - -func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { - return &pendingDeal{ - ctx: ctx, - deal: deal, - Result: make(chan publishResult), - } -} - type PublishMsgConfig struct { // The amount of time to wait for more deals to arrive before // publishing @@ -86,7 +63,7 @@ type PublishMsgConfig struct { func NewDealPublisher( feeConfig *config.MinerFeeConfig, - publishMsgCfg PublishMsgConfig, + publishMsgCfg *config.PublishMsgConfig, ) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { maxFee := abi.NewTokenAmount(0) @@ -107,16 +84,22 @@ func NewDealPublisher( func newDealPublisher( dpapi dealPublisherAPI, - publishMsgCfg PublishMsgConfig, + publishMsgCfg *config.PublishMsgConfig, publishSpec *api.MessageSendSpec, ) *DealPublisher { + publishPeriod := time.Duration(0) + maxDealsPerMsg := uint64(1) + if publishMsgCfg != nil { + publishPeriod = time.Duration(publishMsgCfg.PublishPeriod) + maxDealsPerMsg = publishMsgCfg.MaxDealsPerMsg + } ctx, cancel := context.WithCancel(context.Background()) return &DealPublisher{ api: dpapi, ctx: ctx, Shutdown: cancel, - maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg, - publishPeriod: publishMsgCfg.Period, + maxDealsPerPublishMsg: maxDealsPerMsg, + publishPeriod: publishPeriod, publishSpec: publishSpec, } } @@ -224,33 +207,8 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) { return } - // onComplete is called when the publish message has been sent or there - // was an error - onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) { - // Send the publish result on the pending deal's Result channel - res := publishResult{ - msgCid: msgCid, - err: err, - } - select { - case <-p.ctx.Done(): - case <-pd.ctx.Done(): - case pd.Result <- res: - } - } - - // Validate each deal to make sure it can be published - validated := make([]*pendingDeal, 0, len(ready)) deals := make([]market2.ClientDealProposal, 0, len(ready)) for _, pd := range ready { - // Validate the deal - if err := p.validateDeal(pd.deal); err != nil { - // Validation failed, complete immediately with an error - go onComplete(pd, cid.Undef, err) - continue - } - - validated = append(validated, pd) deals = append(deals, pd.deal) } @@ -258,45 +216,26 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) { msgCid, err := p.publishDealProposals(deals) // Signal that each deal has been published - for _, pd := range validated { - go onComplete(pd, msgCid, err) + for _, pd := range ready { + pd := pd + go func() { + res := publishResult{ + msgCid: msgCid, + err: err, + } + select { + case <-p.ctx.Done(): + case pd.Result <- res: + } + }() } } -// validateDeal checks that the deal proposal start epoch hasn't already -// elapsed -func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error { - head, err := p.api.ChainHead(p.ctx) - if err != nil { - return err - } - if head.Height() > deal.Proposal.StartEpoch { - return xerrors.Errorf( - "cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d", - deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch) - } - return nil -} - // Sends the publish message func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) { - if len(deals) == 0 { - return cid.Undef, nil - } - log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals)) provider := deals[0].Proposal.Provider - for _, dl := range deals { - if dl.Proposal.Provider != provider { - msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) + - "not all deals are for same provider: " + - fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) + - fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider) - return cid.Undef, xerrors.Errorf(msg) - } - } - mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK) if err != nil { return cid.Undef, err @@ -343,3 +282,22 @@ func (p *DealPublisher) filterCancelledDeals() { } p.pending = p.pending[:i] } + +type publishResult struct { + msgCid cid.Cid + err error +} + +type pendingDeal struct { + ctx context.Context + deal market2.ClientDealProposal + Result chan publishResult +} + +func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { + return &pendingDeal{ + ctx: ctx, + deal: deal, + Result: make(chan publishResult), + } +} diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index 7aa331162..77e8e65f8 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -22,18 +22,18 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/node/config" ) func TestDealPublisher(t *testing.T) { testCases := []struct { - name string - publishPeriod time.Duration - maxDealsPerMsg uint64 - dealCountWithinPublishPeriod int - ctxCancelledWithinPublishPeriod int - expiredDeals int - dealCountAfterPublishPeriod int - expectedDealsPerMsg []int + name string + publishPeriod time.Duration + maxDealsPerMsg uint64 + dealCountWithinPublishPeriod int + expiredWithinPublishPeriod int + dealCountAfterPublishPeriod int + expectedDealsPerMsg []int }{{ name: "publish one deal within publish period", publishPeriod: 10 * time.Millisecond, @@ -62,30 +62,14 @@ func TestDealPublisher(t *testing.T) { dealCountWithinPublishPeriod: 3, dealCountAfterPublishPeriod: 1, expectedDealsPerMsg: []int{2, 1, 1}, - }, { - name: "ignore deals with cancelled context", - publishPeriod: 10 * time.Millisecond, - maxDealsPerMsg: 5, - dealCountWithinPublishPeriod: 2, - ctxCancelledWithinPublishPeriod: 2, - dealCountAfterPublishPeriod: 1, - expectedDealsPerMsg: []int{2, 1}, }, { name: "ignore expired deals", publishPeriod: 10 * time.Millisecond, maxDealsPerMsg: 5, dealCountWithinPublishPeriod: 2, - expiredDeals: 2, + expiredWithinPublishPeriod: 2, dealCountAfterPublishPeriod: 1, expectedDealsPerMsg: []int{2, 1}, - }, { - name: "zero config", - publishPeriod: 0, - maxDealsPerMsg: 0, - dealCountWithinPublishPeriod: 2, - ctxCancelledWithinPublishPeriod: 0, - dealCountAfterPublishPeriod: 2, - expectedDealsPerMsg: []int{1, 1, 1, 1}, }} for _, tc := range testCases { @@ -98,44 +82,38 @@ func TestDealPublisher(t *testing.T) { dpapi := newDPAPI(t, worker) // Create a deal publisher - dp := newDealPublisher(dpapi, PublishMsgConfig{ - Period: tc.publishPeriod, + dp := newDealPublisher(dpapi, &config.PublishMsgConfig{ + PublishPeriod: config.Duration(tc.publishPeriod), MaxDealsPerMsg: tc.maxDealsPerMsg, }, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)}) // Keep a record of the deals that were submitted to be published var dealsToPublish []market.ClientDealProposal - publishDeal := func(ctxCancelled bool, expired bool) { + publishDeal := func(expired bool) { pctx := ctx var cancel context.CancelFunc - if ctxCancelled { + if expired { pctx, cancel = context.WithCancel(ctx) cancel() } - startEpoch := abi.ChainEpoch(20) - if expired { - startEpoch = abi.ChainEpoch(5) - } deal := market.ClientDealProposal{ Proposal: market0.DealProposal{ - PieceCID: generateCids(1)[0], - Client: client, - Provider: provider, - StartEpoch: startEpoch, - EndEpoch: abi.ChainEpoch(120), + PieceCID: generateCids(1)[0], + Client: client, + Provider: provider, }, ClientSignature: crypto.Signature{ Type: crypto.SigTypeSecp256k1, Data: []byte("signature data"), }, } - if !ctxCancelled && !expired { + if !expired { dealsToPublish = append(dealsToPublish, deal) } go func() { _, err := dp.Publish(pctx, deal) - if ctxCancelled || expired { + if expired { require.Error(t, err) } else { require.NoError(t, err) @@ -145,13 +123,10 @@ func TestDealPublisher(t *testing.T) { // Publish deals within publish period for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { - publishDeal(false, false) + publishDeal(false) } - for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ { - publishDeal(true, false) - } - for i := 0; i < tc.expiredDeals; i++ { - publishDeal(false, true) + for i := 0; i < tc.expiredWithinPublishPeriod; i++ { + publishDeal(true) } // Wait until publish period has elapsed @@ -159,7 +134,7 @@ func TestDealPublisher(t *testing.T) { // Publish deals after publish period for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { - publishDeal(false, false) + publishDeal(false) } // For each message that was expected to be sent @@ -241,20 +216,6 @@ func newDPAPI(t *testing.T, worker address.Address) *dpAPI { } } -func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) { - dummyCid, err := cid.Parse("bafkqaaa") - require.NoError(d.t, err) - return types.NewTipSet([]*types.BlockHeader{{ - Miner: tutils.NewActorAddr(d.t, "miner"), - Height: abi.ChainEpoch(10), - ParentStateRoot: dummyCid, - Messages: dummyCid, - ParentMessageReceipts: dummyCid, - BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, - BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, - }}) -} - func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) { d.stateMinerInfoCalls <- address return miner.MinerInfo{Worker: d.worker}, nil diff --git a/node/builder.go b/node/builder.go index 5f61420d4..81332d07a 100644 --- a/node/builder.go +++ b/node/builder.go @@ -377,7 +377,7 @@ func Online() Option { Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(storagemarket.StorageProvider), modules.StorageProvider), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds), Override(HandleRetrievalKey, modules.HandleRetrieval), @@ -520,10 +520,7 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{ - Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod), - MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg, - })), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, &cfg.PublishMsg)), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)), Override(new(sectorstorage.SealerConfig), cfg.Storage), @@ -651,6 +648,6 @@ func Test() Option { Unset(RunPeerMgrKey), Unset(new(*peermgr.PeerMgr)), Override(new(beacon.Schedule), testing.RandomBeacon), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), ) } diff --git a/node/config/def.go b/node/config/def.go index 13587e315..e5500c0ca 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -33,6 +33,7 @@ type StorageMiner struct { Common Dealmaking DealmakingConfig + PublishMsg PublishMsgConfig Sealing SealingConfig Storage sectorstorage.SealerConfig Fees MinerFeeConfig @@ -59,6 +60,15 @@ type DealmakingConfig struct { RetrievalFilter string } +type PublishMsgConfig struct { + // The amount of time to wait for more deals to arrive before + // publishing + PublishPeriod Duration + // The maximum number of deals to include in a single PublishStorageDeals + // message + MaxDealsPerMsg uint64 +} + type SealingConfig struct { // 0 = no limit MaxWaitDealsSectors uint64 @@ -217,6 +227,11 @@ func DefaultStorageMiner() *StorageMiner { MaxDealsPerPublishMsg: 8, }, + PublishMsg: PublishMsgConfig{ + PublishPeriod: Duration(time.Hour), + MaxDealsPerMsg: 8, + }, + Fees: MinerFeeConfig{ MaxPreCommitGasFee: types.MustParseFIL("0.025"), MaxCommitGasFee: types.MustParseFIL("0.05"), From fc8ee481a84e9f9ea046c0121badb5276bd675b3 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Tue, 26 Jan 2021 14:20:36 +0100 Subject: [PATCH 2/3] refactor: move waiting for publish deals from markets into lotus --- go.mod | 2 +- go.sum | 4 +- markets/storageadapter/dealpublisher.go | 124 +++++++++++++------ markets/storageadapter/dealpublisher_test.go | 83 +++++++++---- markets/storageadapter/provider.go | 28 ++++- node/builder.go | 9 +- node/config/def.go | 15 --- 7 files changed, 179 insertions(+), 86 deletions(-) diff --git a/go.mod b/go.mod index f9e843948..ad9c2d5e2 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v1.2.7 github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a - github.com/filecoin-project/go-fil-markets v1.1.5 + github.com/filecoin-project/go-fil-markets v1.0.8-0.20210203085858-de843fba7213 github.com/filecoin-project/go-jsonrpc v0.1.2 github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 diff --git a/go.sum b/go.sum index fd407a6e2..056f6a264 100644 --- a/go.sum +++ b/go.sum @@ -269,8 +269,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.1.5 h1:S5LIyy7VruG+zFMfsuDiJKvEqF+NpTPRMvN9GqJko3w= -github.com/filecoin-project/go-fil-markets v1.1.5/go.mod h1:6oTRaAsHnCqhi3mpZqdvnWIzH6QzHQc4dbhJrI9/BfQ= +github.com/filecoin-project/go-fil-markets v1.0.8-0.20210203085858-de843fba7213 h1:z4GzaQu2DJA9pmeTNNIZ0WsVlg6ULKi8tkQU1OadT2o= +github.com/filecoin-project/go-fil-markets v1.0.8-0.20210203085858-de843fba7213/go.mod h1:6oTRaAsHnCqhi3mpZqdvnWIzH6QzHQc4dbhJrI9/BfQ= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index 5be6a494c..fcc88653e 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -2,6 +2,7 @@ package storageadapter import ( "context" + "fmt" "strings" "sync" "time" @@ -24,6 +25,7 @@ import ( ) type dealPublisherAPI interface { + ChainHead(context.Context) (*types.TipSet, error) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) } @@ -52,6 +54,27 @@ type DealPublisher struct { publishPeriodStart time.Time } +// A deal that is queued to be published +type pendingDeal struct { + ctx context.Context + deal market2.ClientDealProposal + Result chan publishResult +} + +// The result of publishing a deal +type publishResult struct { + msgCid cid.Cid + err error +} + +func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { + return &pendingDeal{ + ctx: ctx, + deal: deal, + Result: make(chan publishResult), + } +} + type PublishMsgConfig struct { // The amount of time to wait for more deals to arrive before // publishing @@ -63,7 +86,7 @@ type PublishMsgConfig struct { func NewDealPublisher( feeConfig *config.MinerFeeConfig, - publishMsgCfg *config.PublishMsgConfig, + publishMsgCfg PublishMsgConfig, ) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { maxFee := abi.NewTokenAmount(0) @@ -84,22 +107,16 @@ func NewDealPublisher( func newDealPublisher( dpapi dealPublisherAPI, - publishMsgCfg *config.PublishMsgConfig, + publishMsgCfg PublishMsgConfig, publishSpec *api.MessageSendSpec, ) *DealPublisher { - publishPeriod := time.Duration(0) - maxDealsPerMsg := uint64(1) - if publishMsgCfg != nil { - publishPeriod = time.Duration(publishMsgCfg.PublishPeriod) - maxDealsPerMsg = publishMsgCfg.MaxDealsPerMsg - } ctx, cancel := context.WithCancel(context.Background()) return &DealPublisher{ api: dpapi, ctx: ctx, Shutdown: cancel, - maxDealsPerPublishMsg: maxDealsPerMsg, - publishPeriod: publishPeriod, + maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg, + publishPeriod: publishMsgCfg.Period, publishSpec: publishSpec, } } @@ -207,8 +224,33 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) { return } + // onComplete is called when the publish message has been sent or there + // was an error + onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) { + // Send the publish result on the pending deal's Result channel + res := publishResult{ + msgCid: msgCid, + err: err, + } + select { + case <-p.ctx.Done(): + case <-pd.ctx.Done(): + case pd.Result <- res: + } + } + + // Validate each deal to make sure it can be published + validated := make([]*pendingDeal, 0, len(ready)) deals := make([]market2.ClientDealProposal, 0, len(ready)) for _, pd := range ready { + // Validate the deal + if err := p.validateDeal(pd.deal); err != nil { + // Validation failed, complete immediately with an error + go onComplete(pd, cid.Undef, err) + continue + } + + validated = append(validated, pd) deals = append(deals, pd.deal) } @@ -216,26 +258,45 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) { msgCid, err := p.publishDealProposals(deals) // Signal that each deal has been published - for _, pd := range ready { - pd := pd - go func() { - res := publishResult{ - msgCid: msgCid, - err: err, - } - select { - case <-p.ctx.Done(): - case pd.Result <- res: - } - }() + for _, pd := range validated { + go onComplete(pd, msgCid, err) } } +// validateDeal checks that the deal proposal start epoch hasn't already +// elapsed +func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error { + head, err := p.api.ChainHead(p.ctx) + if err != nil { + return err + } + if head.Height() > deal.Proposal.StartEpoch { + return xerrors.Errorf( + "cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d", + deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch) + } + return nil +} + // Sends the publish message func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) { + if len(deals) == 0 { + return cid.Undef, nil + } + log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals)) provider := deals[0].Proposal.Provider + for _, dl := range deals { + if dl.Proposal.Provider != provider { + msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) + + "not all deals are for same provider: " + + fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) + + fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider) + return cid.Undef, xerrors.Errorf(msg) + } + } + mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK) if err != nil { return cid.Undef, err @@ -282,22 +343,3 @@ func (p *DealPublisher) filterCancelledDeals() { } p.pending = p.pending[:i] } - -type publishResult struct { - msgCid cid.Cid - err error -} - -type pendingDeal struct { - ctx context.Context - deal market2.ClientDealProposal - Result chan publishResult -} - -func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { - return &pendingDeal{ - ctx: ctx, - deal: deal, - Result: make(chan publishResult), - } -} diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index 77e8e65f8..7aa331162 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -22,18 +22,18 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/node/config" ) func TestDealPublisher(t *testing.T) { testCases := []struct { - name string - publishPeriod time.Duration - maxDealsPerMsg uint64 - dealCountWithinPublishPeriod int - expiredWithinPublishPeriod int - dealCountAfterPublishPeriod int - expectedDealsPerMsg []int + name string + publishPeriod time.Duration + maxDealsPerMsg uint64 + dealCountWithinPublishPeriod int + ctxCancelledWithinPublishPeriod int + expiredDeals int + dealCountAfterPublishPeriod int + expectedDealsPerMsg []int }{{ name: "publish one deal within publish period", publishPeriod: 10 * time.Millisecond, @@ -62,14 +62,30 @@ func TestDealPublisher(t *testing.T) { dealCountWithinPublishPeriod: 3, dealCountAfterPublishPeriod: 1, expectedDealsPerMsg: []int{2, 1, 1}, + }, { + name: "ignore deals with cancelled context", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + ctxCancelledWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1}, }, { name: "ignore expired deals", publishPeriod: 10 * time.Millisecond, maxDealsPerMsg: 5, dealCountWithinPublishPeriod: 2, - expiredWithinPublishPeriod: 2, + expiredDeals: 2, dealCountAfterPublishPeriod: 1, expectedDealsPerMsg: []int{2, 1}, + }, { + name: "zero config", + publishPeriod: 0, + maxDealsPerMsg: 0, + dealCountWithinPublishPeriod: 2, + ctxCancelledWithinPublishPeriod: 0, + dealCountAfterPublishPeriod: 2, + expectedDealsPerMsg: []int{1, 1, 1, 1}, }} for _, tc := range testCases { @@ -82,38 +98,44 @@ func TestDealPublisher(t *testing.T) { dpapi := newDPAPI(t, worker) // Create a deal publisher - dp := newDealPublisher(dpapi, &config.PublishMsgConfig{ - PublishPeriod: config.Duration(tc.publishPeriod), + dp := newDealPublisher(dpapi, PublishMsgConfig{ + Period: tc.publishPeriod, MaxDealsPerMsg: tc.maxDealsPerMsg, }, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)}) // Keep a record of the deals that were submitted to be published var dealsToPublish []market.ClientDealProposal - publishDeal := func(expired bool) { + publishDeal := func(ctxCancelled bool, expired bool) { pctx := ctx var cancel context.CancelFunc - if expired { + if ctxCancelled { pctx, cancel = context.WithCancel(ctx) cancel() } + startEpoch := abi.ChainEpoch(20) + if expired { + startEpoch = abi.ChainEpoch(5) + } deal := market.ClientDealProposal{ Proposal: market0.DealProposal{ - PieceCID: generateCids(1)[0], - Client: client, - Provider: provider, + PieceCID: generateCids(1)[0], + Client: client, + Provider: provider, + StartEpoch: startEpoch, + EndEpoch: abi.ChainEpoch(120), }, ClientSignature: crypto.Signature{ Type: crypto.SigTypeSecp256k1, Data: []byte("signature data"), }, } - if !expired { + if !ctxCancelled && !expired { dealsToPublish = append(dealsToPublish, deal) } go func() { _, err := dp.Publish(pctx, deal) - if expired { + if ctxCancelled || expired { require.Error(t, err) } else { require.NoError(t, err) @@ -123,10 +145,13 @@ func TestDealPublisher(t *testing.T) { // Publish deals within publish period for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { - publishDeal(false) + publishDeal(false, false) } - for i := 0; i < tc.expiredWithinPublishPeriod; i++ { - publishDeal(true) + for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ { + publishDeal(true, false) + } + for i := 0; i < tc.expiredDeals; i++ { + publishDeal(false, true) } // Wait until publish period has elapsed @@ -134,7 +159,7 @@ func TestDealPublisher(t *testing.T) { // Publish deals after publish period for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { - publishDeal(false) + publishDeal(false, false) } // For each message that was expected to be sent @@ -216,6 +241,20 @@ func newDPAPI(t *testing.T, worker address.Address) *dpAPI { } } +func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) { + dummyCid, err := cid.Parse("bafkqaaa") + require.NoError(d.t, err) + return types.NewTipSet([]*types.BlockHeader{{ + Miner: tutils.NewActorAddr(d.t, "miner"), + Height: abi.ChainEpoch(10), + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }}) +} + func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) { d.stateMinerInfoCalls <- address return miner.MinerInfo{Worker: d.worker}, nil diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index 47925af5c..0f91b8caf 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -11,14 +11,13 @@ import ( logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" - market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" @@ -284,6 +283,31 @@ func (n *ProviderNodeAdapter) WaitForMessage(ctx context.Context, mcid cid.Cid, return cb(receipt.Receipt.ExitCode, receipt.Receipt.Return, receipt.Message, nil) } +func (n *ProviderNodeAdapter) WaitForPublishDeals(ctx context.Context, publishCid cid.Cid, proposal market2.DealProposal) (*storagemarket.PublishDealsWaitResult, error) { + // Wait for deal to be published (plus additional time for confidence) + receipt, err := n.StateWaitMsg(ctx, publishCid, 2*build.MessageConfidence) + if err != nil { + return nil, xerrors.Errorf("WaitForPublishDeals errored: %w", err) + } + if receipt.Receipt.ExitCode != exitcode.Ok { + return nil, xerrors.Errorf("WaitForPublishDeals exit code: %s", receipt.Receipt.ExitCode) + } + + // The deal ID may have changed since publish if there was a reorg, so + // get the current deal ID + head, err := n.ChainHead(ctx) + if err != nil { + return nil, xerrors.Errorf("WaitForPublishDeals failed to get chain head: %w", err) + } + + res, err := n.scMgr.dealInfo.GetCurrentDealInfo(ctx, head.Key().Bytes(), (*market.DealProposal)(&proposal), publishCid) + if err != nil { + return nil, xerrors.Errorf("WaitForPublishDeals getting deal info errored: %w", err) + } + + return &storagemarket.PublishDealsWaitResult{DealID: res.DealID, FinalCid: receipt.Message}, nil +} + func (n *ProviderNodeAdapter) GetDataCap(ctx context.Context, addr address.Address, encodedTs shared.TipSetToken) (*abi.StoragePower, error) { tsk, err := types.TipSetKeyFromBytes(encodedTs) if err != nil { diff --git a/node/builder.go b/node/builder.go index 81332d07a..5f61420d4 100644 --- a/node/builder.go +++ b/node/builder.go @@ -377,7 +377,7 @@ func Online() Option { Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(storagemarket.StorageProvider), modules.StorageProvider), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds), Override(HandleRetrievalKey, modules.HandleRetrieval), @@ -520,7 +520,10 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, &cfg.PublishMsg)), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{ + Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod), + MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg, + })), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)), Override(new(sectorstorage.SealerConfig), cfg.Storage), @@ -648,6 +651,6 @@ func Test() Option { Unset(RunPeerMgrKey), Unset(new(*peermgr.PeerMgr)), Override(new(beacon.Schedule), testing.RandomBeacon), - Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), ) } diff --git a/node/config/def.go b/node/config/def.go index e5500c0ca..13587e315 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -33,7 +33,6 @@ type StorageMiner struct { Common Dealmaking DealmakingConfig - PublishMsg PublishMsgConfig Sealing SealingConfig Storage sectorstorage.SealerConfig Fees MinerFeeConfig @@ -60,15 +59,6 @@ type DealmakingConfig struct { RetrievalFilter string } -type PublishMsgConfig struct { - // The amount of time to wait for more deals to arrive before - // publishing - PublishPeriod Duration - // The maximum number of deals to include in a single PublishStorageDeals - // message - MaxDealsPerMsg uint64 -} - type SealingConfig struct { // 0 = no limit MaxWaitDealsSectors uint64 @@ -227,11 +217,6 @@ func DefaultStorageMiner() *StorageMiner { MaxDealsPerPublishMsg: 8, }, - PublishMsg: PublishMsgConfig{ - PublishPeriod: Duration(time.Hour), - MaxDealsPerMsg: 8, - }, - Fees: MinerFeeConfig{ MaxPreCommitGasFee: types.MustParseFIL("0.025"), MaxCommitGasFee: types.MustParseFIL("0.05"), From 6baddccff2c53a6711d1e5eeb0f7bb9836861dd0 Mon Sep 17 00:00:00 2001 From: Dirk McCormick Date: Fri, 5 Feb 2021 09:41:42 +0100 Subject: [PATCH 3/3] feat: update to go-fil-markets v1.1.6 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index ad9c2d5e2..87058cdb5 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v1.2.7 github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a - github.com/filecoin-project/go-fil-markets v1.0.8-0.20210203085858-de843fba7213 + github.com/filecoin-project/go-fil-markets v1.1.6 github.com/filecoin-project/go-jsonrpc v0.1.2 github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 diff --git a/go.sum b/go.sum index 056f6a264..4ed46adcd 100644 --- a/go.sum +++ b/go.sum @@ -269,8 +269,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.0.8-0.20210203085858-de843fba7213 h1:z4GzaQu2DJA9pmeTNNIZ0WsVlg6ULKi8tkQU1OadT2o= -github.com/filecoin-project/go-fil-markets v1.0.8-0.20210203085858-de843fba7213/go.mod h1:6oTRaAsHnCqhi3mpZqdvnWIzH6QzHQc4dbhJrI9/BfQ= +github.com/filecoin-project/go-fil-markets v1.1.6 h1:D5y1SvJE95BFkZTklLDqbdCmK+5+UZr9sNmgI0iI/Xk= +github.com/filecoin-project/go-fil-markets v1.1.6/go.mod h1:6oTRaAsHnCqhi3mpZqdvnWIzH6QzHQc4dbhJrI9/BfQ= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=