diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 315832b38..656357c1b 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -444,10 +444,10 @@ type GatewayStruct struct { StateMinerProvingDeadline func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error) StateMinerPower func(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error) StateMarketBalance func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (api.MarketBalance, error) - StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error) StateMarketStorageDeal func(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*api.MarketDeal, error) StateReadState func(context.Context, address.Address, types.TipSetKey) (*api.ActorState, error) StateNetworkVersion func(ctx context.Context, tsk types.TipSetKey) (stnetwork.Version, error) + StateSearchMsg func(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error) StateSectorGetInfo func(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) StateVerifiedClientStatus func(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error) StateWaitMsg func(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) diff --git a/api/test/deals.go b/api/test/deals.go index f81be139a..f3d22d26c 100644 --- a/api/test/deals.go +++ b/api/test/deals.go @@ -20,9 +20,13 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/types" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/markets/storageadapter" + "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/impl" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" ipld "github.com/ipfs/go-ipld-format" dag "github.com/ipfs/go-merkledag" dstest "github.com/ipfs/go-merkledag/test" @@ -88,6 +92,97 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api return res, data, nil } +func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { + publishPeriod := 10 * time.Second + maxDealsPerMsg := uint64(2) + + // Set max deals per publish deals message to 2 + minerDef := []StorageMiner{{ + Full: 0, + Opts: node.Override( + new(*storageadapter.DealPublisher), + storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{ + Period: publishPeriod, + MaxDealsPerMsg: maxDealsPerMsg, + })), + Preseal: PresealGenesis, + }} + + // Create a connect client and miner node + n, sn := b(t, OneFull, minerDef) + client := n[0].FullNode.(*impl.FullNodeAPI) + miner := sn[0] + s := connectAndStartMining(t, b, blocktime, client, miner) + defer s.blockMiner.Stop() + + // Starts a deal and waits until it's published + runDealTillPublish := func(rseed int) { + res, _, err := CreateClientFile(s.ctx, s.client, rseed) + require.NoError(t, err) + + upds, err := client.ClientGetDealUpdates(s.ctx) + require.NoError(t, err) + + startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch) + + // TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this + time.Sleep(time.Second) + + done := make(chan struct{}) + go func() { + for upd := range upds { + if upd.DataRef.Root == res.Root && upd.State == storagemarket.StorageDealAwaitingPreCommit { + done <- struct{}{} + } + } + }() + <-done + } + + // Run three deals in parallel + done := make(chan struct{}, maxDealsPerMsg+1) + for rseed := 1; rseed <= 3; rseed++ { + rseed := rseed + go func() { + runDealTillPublish(rseed) + done <- struct{}{} + }() + } + + // Wait for two of the deals to be published + for i := 0; i < int(maxDealsPerMsg); i++ { + <-done + } + + // Expect a single PublishStorageDeals message that includes the first two deals + msgCids, err := s.client.StateListMessages(s.ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1) + require.NoError(t, err) + count := 0 + for _, msgCid := range msgCids { + msg, err := s.client.ChainGetMessage(s.ctx, msgCid) + require.NoError(t, err) + + if msg.Method == market.Methods.PublishStorageDeals { + count++ + var pubDealsParams market2.PublishStorageDealsParams + err = pubDealsParams.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg)) + } + } + require.Equal(t, 1, count) + + // The third deal should be published once the publish period expires. + // Allow a little padding as it takes a moment for the state change to + // be noticed by the client. + padding := 10 * time.Second + select { + case <-time.After(publishPeriod + padding): + require.Fail(t, "Expected 3rd deal to be published once publish period elapsed") + case <-done: // Success + } +} + func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) { s := setupOneClientOneMiner(t, b, blocktime) defer s.blockMiner.Stop() diff --git a/api/test/test.go b/api/test/test.go index 7d804e8ae..eaf092a21 100644 --- a/api/test/test.go +++ b/api/test/test.go @@ -59,6 +59,7 @@ const GenesisPreseals = 2 // Options for setting up a mock storage miner type StorageMiner struct { Full int + Opts node.Option Preseal int } diff --git a/chain/events/events.go b/chain/events/events.go index 1dcf63423..acb65d2c1 100644 --- a/chain/events/events.go +++ b/chain/events/events.go @@ -99,11 +99,13 @@ func (e *Events) listenHeadChanges(ctx context.Context) { } else { log.Warn("listenHeadChanges quit") } - if ctx.Err() != nil { + select { + case <-build.Clock.After(time.Second): + case <-ctx.Done(): log.Warnf("not restarting listenHeadChanges: context error: %s", ctx.Err()) return } - build.Clock.Sleep(time.Second) + log.Info("restarting listenHeadChanges") } } diff --git a/markets/storageadapter/client.go b/markets/storageadapter/client.go index f3491da47..c0d78a506 100644 --- a/markets/storageadapter/client.go +++ b/markets/storageadapter/client.go @@ -6,11 +6,12 @@ import ( "bytes" "context" - "github.com/filecoin-project/go-address" - cborutil "github.com/filecoin-project/go-cbor-util" "github.com/ipfs/go-cid" + "go.uber.org/fx" "golang.org/x/xerrors" + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-state-types/abi" @@ -31,15 +32,16 @@ import ( "github.com/filecoin-project/lotus/lib/sigs" "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/impl/full" + "github.com/filecoin-project/lotus/node/modules/helpers" ) type ClientNodeAdapter struct { *clientApi - *apiWrapper fundmgr *market.FundManager ev *events.Events dsMatcher *dealStateMatcher + scMgr *SectorCommittedManager } type clientApi struct { @@ -48,16 +50,20 @@ type clientApi struct { full.MpoolAPI } -func NewClientNodeAdapter(stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode { +func NewClientNodeAdapter(mctx helpers.MetricsCtx, lc fx.Lifecycle, stateapi full.StateAPI, chain full.ChainAPI, mpool full.MpoolAPI, fundmgr *market.FundManager) storagemarket.StorageClientNode { capi := &clientApi{chain, stateapi, mpool} - return &ClientNodeAdapter{ - clientApi: capi, - apiWrapper: &apiWrapper{api: capi}, + ctx := helpers.LifecycleCtx(mctx, lc) + + ev := events.NewEvents(ctx, capi) + a := &ClientNodeAdapter{ + clientApi: capi, fundmgr: fundmgr, - ev: events.NewEvents(context.TODO(), capi), + ev: ev, dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(capi))), } + a.scMgr = NewSectorCommittedManager(ev, a, &apiWrapper{api: capi}) + return a } func (c *ClientNodeAdapter) ListStorageProviders(ctx context.Context, encodedTs shared.TipSetToken) ([]*storagemarket.StorageProviderInfo, error) { @@ -135,6 +141,7 @@ func (c *ClientNodeAdapter) GetBalance(ctx context.Context, addr address.Address // ValidatePublishedDeal validates that the provided deal has appeared on chain and references the same ClientDeal // returns the Deal id if there is no error +// TODO: Don't return deal ID func (c *ClientNodeAdapter) ValidatePublishedDeal(ctx context.Context, deal storagemarket.ClientDeal) (abi.DealID, error) { log.Infow("DEAL ACCEPTED!") @@ -216,14 +223,17 @@ func (c *ClientNodeAdapter) DealProviderCollateralBounds(ctx context.Context, si return big.Mul(bounds.Min, big.NewInt(clientOverestimation)), bounds.Max, nil } +// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer) func (c *ClientNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error { - return OnDealSectorPreCommitted(ctx, c, c.ev, provider, dealID, marketactor.DealProposal(proposal), publishCid, cb) + return c.scMgr.OnDealSectorPreCommitted(ctx, provider, marketactor.DealProposal(proposal), *publishCid, cb) } +// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer) func (c *ClientNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error { - return OnDealSectorCommitted(ctx, c, c.ev, provider, dealID, sectorNumber, marketactor.DealProposal(proposal), publishCid, cb) + return c.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, marketactor.DealProposal(proposal), *publishCid, cb) } +// TODO: Replace dealID parameter with DealProposal func (c *ClientNodeAdapter) OnDealExpiredOrSlashed(ctx context.Context, dealID abi.DealID, onDealExpired storagemarket.DealExpiredCallback, onDealSlashed storagemarket.DealSlashedCallback) error { head, err := c.ChainHead(ctx) if err != nil { diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go new file mode 100644 index 000000000..fcc88653e --- /dev/null +++ b/markets/storageadapter/dealpublisher.go @@ -0,0 +1,345 @@ +package storageadapter + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "go.uber.org/fx" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/node/config" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" + + "github.com/filecoin-project/lotus/chain/actors" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type dealPublisherAPI interface { + ChainHead(context.Context) (*types.TipSet, error) + MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) + StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error) +} + +// DealPublisher batches deal publishing so that many deals can be included in +// a single publish message. This saves gas for miners that publish deals +// frequently. +// When a deal is submitted, the DealPublisher waits a configurable amount of +// time for other deals to be submitted before sending the publish message. +// There is a configurable maximum number of deals that can be included in one +// message. When the limit is reached the DealPublisher immediately submits a +// publish message with all deals in the queue. +type DealPublisher struct { + api dealPublisherAPI + + ctx context.Context + Shutdown context.CancelFunc + + maxDealsPerPublishMsg uint64 + publishPeriod time.Duration + publishSpec *api.MessageSendSpec + + lk sync.Mutex + pending []*pendingDeal + cancelWaitForMoreDeals context.CancelFunc + publishPeriodStart time.Time +} + +// A deal that is queued to be published +type pendingDeal struct { + ctx context.Context + deal market2.ClientDealProposal + Result chan publishResult +} + +// The result of publishing a deal +type publishResult struct { + msgCid cid.Cid + err error +} + +func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal { + return &pendingDeal{ + ctx: ctx, + deal: deal, + Result: make(chan publishResult), + } +} + +type PublishMsgConfig struct { + // The amount of time to wait for more deals to arrive before + // publishing + Period time.Duration + // The maximum number of deals to include in a single PublishStorageDeals + // message + MaxDealsPerMsg uint64 +} + +func NewDealPublisher( + feeConfig *config.MinerFeeConfig, + publishMsgCfg PublishMsgConfig, +) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { + return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher { + maxFee := abi.NewTokenAmount(0) + if feeConfig != nil { + maxFee = abi.TokenAmount(feeConfig.MaxPublishDealsFee) + } + publishSpec := &api.MessageSendSpec{MaxFee: maxFee} + dp := newDealPublisher(full, publishMsgCfg, publishSpec) + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + dp.Shutdown() + return nil + }, + }) + return dp + } +} + +func newDealPublisher( + dpapi dealPublisherAPI, + publishMsgCfg PublishMsgConfig, + publishSpec *api.MessageSendSpec, +) *DealPublisher { + ctx, cancel := context.WithCancel(context.Background()) + return &DealPublisher{ + api: dpapi, + ctx: ctx, + Shutdown: cancel, + maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg, + publishPeriod: publishMsgCfg.Period, + publishSpec: publishSpec, + } +} + +func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) { + pdeal := newPendingDeal(ctx, deal) + + // Add the deal to the queue + p.processNewDeal(pdeal) + + // Wait for the deal to be submitted + select { + case <-ctx.Done(): + return cid.Undef, ctx.Err() + case res := <-pdeal.Result: + return res.msgCid, res.err + } +} + +func (p *DealPublisher) processNewDeal(pdeal *pendingDeal) { + p.lk.Lock() + defer p.lk.Unlock() + + // Filter out any cancelled deals + p.filterCancelledDeals() + + // If all deals have been cancelled, clear the wait-for-deals timer + if len(p.pending) == 0 && p.cancelWaitForMoreDeals != nil { + p.cancelWaitForMoreDeals() + p.cancelWaitForMoreDeals = nil + } + + // Make sure the new deal hasn't been cancelled + if pdeal.ctx.Err() != nil { + return + } + + // Add the new deal to the queue + p.pending = append(p.pending, pdeal) + log.Infof("add deal with piece CID %s to publish deals queue - %d deals in queue (max queue size %d)", + pdeal.deal.Proposal.PieceCID, len(p.pending), p.maxDealsPerPublishMsg) + + // If the maximum number of deals per message has been reached, + // send a publish message + if uint64(len(p.pending)) >= p.maxDealsPerPublishMsg { + log.Infof("publish deals queue has reached max size of %d, publishing deals", p.maxDealsPerPublishMsg) + p.publishAllDeals() + return + } + + // Otherwise wait for more deals to arrive or the timeout to be reached + p.waitForMoreDeals() +} + +func (p *DealPublisher) waitForMoreDeals() { + // Check if we're already waiting for deals + if !p.publishPeriodStart.IsZero() { + elapsed := time.Since(p.publishPeriodStart) + log.Infof("%s elapsed of / %s until publish deals queue is published", + elapsed, p.publishPeriod) + return + } + + // Set a timeout to wait for more deals to arrive + log.Infof("waiting publish deals queue period of %s before publishing", p.publishPeriod) + ctx, cancel := context.WithCancel(p.ctx) + p.publishPeriodStart = time.Now() + p.cancelWaitForMoreDeals = cancel + + go func() { + timer := time.NewTimer(p.publishPeriod) + select { + case <-ctx.Done(): + timer.Stop() + case <-timer.C: + p.lk.Lock() + defer p.lk.Unlock() + + // The timeout has expired so publish all pending deals + log.Infof("publish deals queue period of %s has expired, publishing deals", p.publishPeriod) + p.publishAllDeals() + } + }() +} + +func (p *DealPublisher) publishAllDeals() { + // If the timeout hasn't yet been cancelled, cancel it + if p.cancelWaitForMoreDeals != nil { + p.cancelWaitForMoreDeals() + p.cancelWaitForMoreDeals = nil + p.publishPeriodStart = time.Time{} + } + + // Filter out any deals that have been cancelled + p.filterCancelledDeals() + deals := p.pending[:] + p.pending = nil + + // Send the publish message + go p.publishReady(deals) +} + +func (p *DealPublisher) publishReady(ready []*pendingDeal) { + if len(ready) == 0 { + return + } + + // onComplete is called when the publish message has been sent or there + // was an error + onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) { + // Send the publish result on the pending deal's Result channel + res := publishResult{ + msgCid: msgCid, + err: err, + } + select { + case <-p.ctx.Done(): + case <-pd.ctx.Done(): + case pd.Result <- res: + } + } + + // Validate each deal to make sure it can be published + validated := make([]*pendingDeal, 0, len(ready)) + deals := make([]market2.ClientDealProposal, 0, len(ready)) + for _, pd := range ready { + // Validate the deal + if err := p.validateDeal(pd.deal); err != nil { + // Validation failed, complete immediately with an error + go onComplete(pd, cid.Undef, err) + continue + } + + validated = append(validated, pd) + deals = append(deals, pd.deal) + } + + // Send the publish message + msgCid, err := p.publishDealProposals(deals) + + // Signal that each deal has been published + for _, pd := range validated { + go onComplete(pd, msgCid, err) + } +} + +// validateDeal checks that the deal proposal start epoch hasn't already +// elapsed +func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error { + head, err := p.api.ChainHead(p.ctx) + if err != nil { + return err + } + if head.Height() > deal.Proposal.StartEpoch { + return xerrors.Errorf( + "cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d", + deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch) + } + return nil +} + +// Sends the publish message +func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) { + if len(deals) == 0 { + return cid.Undef, nil + } + + log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals)) + + provider := deals[0].Proposal.Provider + for _, dl := range deals { + if dl.Proposal.Provider != provider { + msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) + + "not all deals are for same provider: " + + fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) + + fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider) + return cid.Undef, xerrors.Errorf(msg) + } + } + + mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK) + if err != nil { + return cid.Undef, err + } + + params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{ + Deals: deals, + }) + + if err != nil { + return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err) + } + + smsg, err := p.api.MpoolPushMessage(p.ctx, &types.Message{ + To: market.Address, + From: mi.Worker, + Value: types.NewInt(0), + Method: market.Methods.PublishStorageDeals, + Params: params, + }, p.publishSpec) + + if err != nil { + return cid.Undef, err + } + return smsg.Cid(), nil +} + +func pieceCids(deals []market2.ClientDealProposal) string { + cids := make([]string, 0, len(deals)) + for _, dl := range deals { + cids = append(cids, dl.Proposal.PieceCID.String()) + } + return strings.Join(cids, ", ") +} + +// filter out deals that have been cancelled +func (p *DealPublisher) filterCancelledDeals() { + i := 0 + for _, pd := range p.pending { + if pd.ctx.Err() == nil { + p.pending[i] = pd + i++ + } + } + p.pending = p.pending[:i] +} diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go new file mode 100644 index 000000000..7aa331162 --- /dev/null +++ b/markets/storageadapter/dealpublisher_test.go @@ -0,0 +1,266 @@ +package storageadapter + +import ( + "bytes" + "context" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/crypto" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" + "github.com/ipfs/go-cid" + + "github.com/stretchr/testify/require" + + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/actors/builtin/market" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/types" + market0 "github.com/filecoin-project/specs-actors/actors/builtin/market" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" +) + +func TestDealPublisher(t *testing.T) { + testCases := []struct { + name string + publishPeriod time.Duration + maxDealsPerMsg uint64 + dealCountWithinPublishPeriod int + ctxCancelledWithinPublishPeriod int + expiredDeals int + dealCountAfterPublishPeriod int + expectedDealsPerMsg []int + }{{ + name: "publish one deal within publish period", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 1, + dealCountAfterPublishPeriod: 0, + expectedDealsPerMsg: []int{1}, + }, { + name: "publish two deals within publish period", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 0, + expectedDealsPerMsg: []int{2}, + }, { + name: "publish one deal within publish period, and one after", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 1, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{1, 1}, + }, { + name: "publish deals that exceed max deals per message within publish period, and one after", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 2, + dealCountWithinPublishPeriod: 3, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1, 1}, + }, { + name: "ignore deals with cancelled context", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + ctxCancelledWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1}, + }, { + name: "ignore expired deals", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + expiredDeals: 2, + dealCountAfterPublishPeriod: 1, + expectedDealsPerMsg: []int{2, 1}, + }, { + name: "zero config", + publishPeriod: 0, + maxDealsPerMsg: 0, + dealCountWithinPublishPeriod: 2, + ctxCancelledWithinPublishPeriod: 0, + dealCountAfterPublishPeriod: 2, + expectedDealsPerMsg: []int{1, 1, 1, 1}, + }} + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + client := tutils.NewActorAddr(t, "client") + provider := tutils.NewActorAddr(t, "provider") + worker := tutils.NewActorAddr(t, "worker") + dpapi := newDPAPI(t, worker) + + // Create a deal publisher + dp := newDealPublisher(dpapi, PublishMsgConfig{ + Period: tc.publishPeriod, + MaxDealsPerMsg: tc.maxDealsPerMsg, + }, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)}) + + // Keep a record of the deals that were submitted to be published + var dealsToPublish []market.ClientDealProposal + publishDeal := func(ctxCancelled bool, expired bool) { + pctx := ctx + var cancel context.CancelFunc + if ctxCancelled { + pctx, cancel = context.WithCancel(ctx) + cancel() + } + + startEpoch := abi.ChainEpoch(20) + if expired { + startEpoch = abi.ChainEpoch(5) + } + deal := market.ClientDealProposal{ + Proposal: market0.DealProposal{ + PieceCID: generateCids(1)[0], + Client: client, + Provider: provider, + StartEpoch: startEpoch, + EndEpoch: abi.ChainEpoch(120), + }, + ClientSignature: crypto.Signature{ + Type: crypto.SigTypeSecp256k1, + Data: []byte("signature data"), + }, + } + if !ctxCancelled && !expired { + dealsToPublish = append(dealsToPublish, deal) + } + go func() { + _, err := dp.Publish(pctx, deal) + if ctxCancelled || expired { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }() + } + + // Publish deals within publish period + for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { + publishDeal(false, false) + } + for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ { + publishDeal(true, false) + } + for i := 0; i < tc.expiredDeals; i++ { + publishDeal(false, true) + } + + // Wait until publish period has elapsed + time.Sleep(2 * tc.publishPeriod) + + // Publish deals after publish period + for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { + publishDeal(false, false) + } + + // For each message that was expected to be sent + var publishedDeals []market.ClientDealProposal + for _, expectedDealsInMsg := range tc.expectedDealsPerMsg { + // Should have called StateMinerInfo with the provider address + stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls + require.Equal(t, provider, stateMinerInfoAddr) + + // Check the fields of the message that was sent + msg := <-dpapi.pushedMsgs + require.Equal(t, worker, msg.From) + require.Equal(t, market.Address, msg.To) + require.Equal(t, market.Methods.PublishStorageDeals, msg.Method) + + // Check that the expected number of deals was included in the message + var params market2.PublishStorageDealsParams + err := params.UnmarshalCBOR(bytes.NewReader(msg.Params)) + require.NoError(t, err) + require.Len(t, params.Deals, expectedDealsInMsg) + + // Keep track of the deals that were sent + for _, d := range params.Deals { + publishedDeals = append(publishedDeals, d) + } + } + + // Verify that all deals that were submitted to be published were + // sent out (we do this by ensuring all the piece CIDs are present) + require.True(t, matchPieceCids(publishedDeals, dealsToPublish)) + }) + } +} + +func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool { + cidsA := dealPieceCids(sent) + cidsB := dealPieceCids(exp) + + if len(cidsA) != len(cidsB) { + return false + } + + s1 := cid.NewSet() + for _, c := range cidsA { + s1.Add(c) + } + + for _, c := range cidsB { + if !s1.Has(c) { + return false + } + } + + return true +} + +func dealPieceCids(deals []market2.ClientDealProposal) []cid.Cid { + cids := make([]cid.Cid, 0, len(deals)) + for _, dl := range deals { + cids = append(cids, dl.Proposal.PieceCID) + } + return cids +} + +type dpAPI struct { + t *testing.T + worker address.Address + + stateMinerInfoCalls chan address.Address + pushedMsgs chan *types.Message +} + +func newDPAPI(t *testing.T, worker address.Address) *dpAPI { + return &dpAPI{ + t: t, + worker: worker, + stateMinerInfoCalls: make(chan address.Address, 128), + pushedMsgs: make(chan *types.Message, 128), + } +} + +func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) { + dummyCid, err := cid.Parse("bafkqaaa") + require.NoError(d.t, err) + return types.NewTipSet([]*types.BlockHeader{{ + Miner: tutils.NewActorAddr(d.t, "miner"), + Height: abi.ChainEpoch(10), + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + }}) +} + +func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) { + d.stateMinerInfoCalls <- address + return miner.MinerInfo{Worker: d.worker}, nil +} + +func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error) { + d.pushedMsgs <- msg + return &types.SignedMessage{Message: *msg}, nil +} diff --git a/markets/storageadapter/getcurrentdealinfo.go b/markets/storageadapter/getcurrentdealinfo.go deleted file mode 100644 index 97311a0b2..000000000 --- a/markets/storageadapter/getcurrentdealinfo.go +++ /dev/null @@ -1,102 +0,0 @@ -package storageadapter - -import ( - "bytes" - "context" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/exitcode" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/actors/builtin/market" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" - "golang.org/x/xerrors" -) - -type getCurrentDealInfoAPI interface { - StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) - StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*api.MarketDeal, error) - StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) - - diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) -} - -// GetCurrentDealInfo gets current information on a deal, and corrects the deal ID as needed -func GetCurrentDealInfo(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, *api.MarketDeal, types.TipSetKey, error) { - marketDeal, dealErr := api.StateMarketStorageDeal(ctx, dealID, ts.Key()) - if dealErr == nil { - equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) - if err != nil { - return dealID, nil, types.EmptyTSK, err - } - if equal { - return dealID, marketDeal, types.EmptyTSK, nil - } - dealErr = xerrors.Errorf("Deal proposals did not match") - } - if publishCid == nil { - return dealID, nil, types.EmptyTSK, dealErr - } - // attempt deal id correction - lookup, err := api.StateSearchMsg(ctx, *publishCid) - if err != nil { - return dealID, nil, types.EmptyTSK, err - } - - if lookup.Receipt.ExitCode != exitcode.Ok { - return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", *publishCid, lookup.Receipt.ExitCode) - } - - var retval market.PublishStorageDealsReturn - if err := retval.UnmarshalCBOR(bytes.NewReader(lookup.Receipt.Return)); err != nil { - return dealID, nil, types.EmptyTSK, xerrors.Errorf("looking for publish deal message: unmarshaling message return: %w", err) - } - - if len(retval.IDs) != 1 { - // market currently only ever sends messages with 1 deal - return dealID, nil, types.EmptyTSK, xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal") - } - - if retval.IDs[0] == dealID { - // DealID did not change, so we are stuck with the original lookup error - return dealID, nil, lookup.TipSet, dealErr - } - - dealID = retval.IDs[0] - marketDeal, err = api.StateMarketStorageDeal(ctx, dealID, ts.Key()) - - if err == nil { - equal, err := checkDealEquality(ctx, ts, api, proposal, marketDeal.Proposal) - if err != nil { - return dealID, nil, types.EmptyTSK, err - } - if !equal { - return dealID, nil, types.EmptyTSK, xerrors.Errorf("Deal proposals did not match") - } - } - return dealID, marketDeal, lookup.TipSet, err -} - -func checkDealEquality(ctx context.Context, ts *types.TipSet, api getCurrentDealInfoAPI, p1, p2 market.DealProposal) (bool, error) { - p1ClientID, err := api.StateLookupID(ctx, p1.Client, ts.Key()) - if err != nil { - return false, err - } - p2ClientID, err := api.StateLookupID(ctx, p2.Client, ts.Key()) - if err != nil { - return false, err - } - return p1.PieceCID.Equals(p2.PieceCID) && - p1.PieceSize == p2.PieceSize && - p1.VerifiedDeal == p2.VerifiedDeal && - p1.Label == p2.Label && - p1.StartEpoch == p2.StartEpoch && - p1.EndEpoch == p2.EndEpoch && - p1.StoragePricePerEpoch.Equals(p2.StoragePricePerEpoch) && - p1.ProviderCollateral.Equals(p2.ProviderCollateral) && - p1.ClientCollateral.Equals(p2.ClientCollateral) && - p1.Provider == p2.Provider && - p1ClientID == p2ClientID, nil -} diff --git a/markets/storageadapter/getcurrentdealinfo_test.go b/markets/storageadapter/getcurrentdealinfo_test.go deleted file mode 100644 index 5e3c10495..000000000 --- a/markets/storageadapter/getcurrentdealinfo_test.go +++ /dev/null @@ -1,268 +0,0 @@ -package storageadapter - -import ( - "bytes" - "errors" - "math/rand" - "testing" - "time" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/exitcode" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/actors/builtin/market" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - test "github.com/filecoin-project/lotus/chain/events/state/mock" - "github.com/filecoin-project/lotus/chain/types" - "github.com/ipfs/go-cid" - "github.com/stretchr/testify/require" - "golang.org/x/net/context" - "golang.org/x/xerrors" -) - -var errNotFound = errors.New("Could not find") - -func TestGetCurrentDealInfo(t *testing.T) { - ctx := context.Background() - dummyCid, _ := cid.Parse("bafkqaaa") - startDealID := abi.DealID(rand.Uint64()) - newDealID := abi.DealID(rand.Uint64()) - twoValuesReturn := makePublishDealsReturnBytes(t, []abi.DealID{abi.DealID(rand.Uint64()), abi.DealID(rand.Uint64())}) - sameValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{startDealID}) - newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID}) - proposal := market.DealProposal{ - PieceCID: dummyCid, - PieceSize: abi.PaddedPieceSize(rand.Uint64()), - Label: "success", - } - otherProposal := market.DealProposal{ - PieceCID: dummyCid, - PieceSize: abi.PaddedPieceSize(rand.Uint64()), - Label: "other", - } - successDeal := &api.MarketDeal{ - Proposal: proposal, - State: market.DealState{ - SectorStartEpoch: 1, - LastUpdatedEpoch: 2, - }, - } - otherDeal := &api.MarketDeal{ - Proposal: otherProposal, - State: market.DealState{ - SectorStartEpoch: 1, - LastUpdatedEpoch: 2, - }, - } - testCases := map[string]struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - marketDeals map[abi.DealID]*api.MarketDeal - publishCid *cid.Cid - expectedDealID abi.DealID - expectedMarketDeal *api.MarketDeal - expectedError error - }{ - "deal lookup succeeds": { - marketDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: successDeal, - }, - expectedDealID: startDealID, - expectedMarketDeal: successDeal, - }, - "publish CID = nil": { - expectedDealID: startDealID, - expectedError: errNotFound, - }, - "publish CID = nil, other deal on lookup": { - marketDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: otherDeal, - }, - expectedDealID: startDealID, - expectedError: xerrors.Errorf("Deal proposals did not match"), - }, - "search message fails": { - publishCid: &dummyCid, - searchMessageErr: errors.New("something went wrong"), - expectedDealID: startDealID, - expectedError: errors.New("something went wrong"), - }, - "return code not ok": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.ErrIllegalState, - }, - }, - expectedDealID: startDealID, - expectedError: xerrors.Errorf("looking for publish deal message %s: non-ok exit code: %s", dummyCid, exitcode.ErrIllegalState), - }, - "unable to unmarshal params": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: []byte("applesauce"), - }, - }, - expectedDealID: startDealID, - expectedError: xerrors.Errorf("looking for publish deal message: unmarshaling message return: cbor input should be of type array"), - }, - "more than one returned id": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: twoValuesReturn, - }, - }, - expectedDealID: startDealID, - expectedError: xerrors.Errorf("can't recover dealIDs from publish deal message with more than 1 deal"), - }, - "deal ids still match": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: sameValueReturn, - }, - }, - expectedDealID: startDealID, - expectedError: errNotFound, - }, - "new deal id success": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: newValueReturn, - }, - }, - marketDeals: map[abi.DealID]*api.MarketDeal{ - newDealID: successDeal, - }, - expectedDealID: newDealID, - expectedMarketDeal: successDeal, - }, - "new deal id after other deal found": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: newValueReturn, - }, - }, - marketDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: otherDeal, - newDealID: successDeal, - }, - expectedDealID: newDealID, - expectedMarketDeal: successDeal, - }, - "new deal id failure": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: newValueReturn, - }, - }, - expectedDealID: newDealID, - expectedError: errNotFound, - }, - "new deal id, failure due to other deal present": { - publishCid: &dummyCid, - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: newValueReturn, - }, - }, - marketDeals: map[abi.DealID]*api.MarketDeal{ - newDealID: otherDeal, - }, - expectedDealID: newDealID, - expectedError: xerrors.Errorf("Deal proposals did not match"), - }, - } - runTestCase := func(testCase string, data struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - marketDeals map[abi.DealID]*api.MarketDeal - publishCid *cid.Cid - expectedDealID abi.DealID - expectedMarketDeal *api.MarketDeal - expectedError error - }) { - t.Run(testCase, func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - ts, err := test.MockTipset(address.TestAddress, rand.Uint64()) - require.NoError(t, err) - marketDeals := make(map[marketDealKey]*api.MarketDeal) - for dealID, deal := range data.marketDeals { - marketDeals[marketDealKey{dealID, ts.Key()}] = deal - } - api := &mockGetCurrentDealInfoAPI{ - SearchMessageLookup: data.searchMessageLookup, - SearchMessageErr: data.searchMessageErr, - MarketDeals: marketDeals, - } - - dealID, marketDeal, _, err := GetCurrentDealInfo(ctx, ts, api, startDealID, proposal, data.publishCid) - require.Equal(t, data.expectedDealID, dealID) - require.Equal(t, data.expectedMarketDeal, marketDeal) - if data.expectedError == nil { - require.NoError(t, err) - } else { - require.EqualError(t, err, data.expectedError.Error()) - } - }) - } - for testCase, data := range testCases { - runTestCase(testCase, data) - } -} - -type marketDealKey struct { - abi.DealID - types.TipSetKey -} - -type mockGetCurrentDealInfoAPI struct { - SearchMessageLookup *api.MsgLookup - SearchMessageErr error - - MarketDeals map[marketDealKey]*api.MarketDeal -} - -func (mapi *mockGetCurrentDealInfoAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { - return &miner.PreCommitChanges{}, nil -} - -func (mapi *mockGetCurrentDealInfoAPI) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, ts types.TipSetKey) (*api.MarketDeal, error) { - deal, ok := mapi.MarketDeals[marketDealKey{dealID, ts}] - if !ok { - return nil, errNotFound - } - return deal, nil -} - -func (mapi *mockGetCurrentDealInfoAPI) StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) { - return mapi.SearchMessageLookup, mapi.SearchMessageErr -} - -func (mapi *mockGetCurrentDealInfoAPI) StateLookupID(ctx context.Context, addr address.Address, ts types.TipSetKey) (address.Address, error) { - return addr, nil -} - -func makePublishDealsReturnBytes(t *testing.T, dealIDs []abi.DealID) []byte { - buf := new(bytes.Buffer) - dealsReturn := market.PublishStorageDealsReturn{ - IDs: dealIDs, - } - err := dealsReturn.MarshalCBOR(buf) - require.NoError(t, err) - return buf.Bytes() -} diff --git a/markets/storageadapter/ondealsectorcommitted.go b/markets/storageadapter/ondealsectorcommitted.go index 5466c81ef..b5f9c7510 100644 --- a/markets/storageadapter/ondealsectorcommitted.go +++ b/markets/storageadapter/ondealsectorcommitted.go @@ -5,6 +5,7 @@ import ( "context" "sync" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -19,11 +20,40 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -type sectorCommittedEventsAPI interface { +type eventsCalledAPI interface { Called(check events.CheckFunc, msgHnd events.MsgHandler, rev events.RevertHandler, confidence int, timeout abi.ChainEpoch, mf events.MsgMatchFunc) error } -func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { +type dealInfoAPI interface { + GetCurrentDealInfo(ctx context.Context, tok sealing.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, error) +} + +type diffPreCommitsAPI interface { + diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) +} + +type SectorCommittedManager struct { + ev eventsCalledAPI + dealInfo dealInfoAPI + dpc diffPreCommitsAPI +} + +func NewSectorCommittedManager(ev eventsCalledAPI, tskAPI sealing.CurrentDealInfoTskAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager { + dim := &sealing.CurrentDealInfoManager{ + CDAPI: &sealing.CurrentDealInfoAPIAdapter{CurrentDealInfoTskAPI: tskAPI}, + } + return newSectorCommittedManager(ev, dim, dpcAPI) +} + +func newSectorCommittedManager(ev eventsCalledAPI, dealInfo dealInfoAPI, dpcAPI diffPreCommitsAPI) *SectorCommittedManager { + return &SectorCommittedManager{ + ev: ev, + dealInfo: dealInfo, + dpc: dpcAPI, + } +} + +func (mgr *SectorCommittedManager) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, proposal market.DealProposal, publishCid cid.Cid, callback storagemarket.DealSectorPreCommittedCallback) error { // Ensure callback is only called once var once sync.Once cb := func(sectorNumber abi.SectorNumber, isActive bool, err error) { @@ -34,7 +64,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - di, isActive, publishTs, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + dealInfo, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorPreCommitted so no need to call the callback @@ -54,24 +84,19 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // when the client node was down after the deal was published, and when // the precommit containing it landed on chain) - if publishTs == types.EmptyTSK { - lookup, err := api.StateSearchMsg(ctx, *publishCid) - if err != nil { - return false, false, err - } - if lookup != nil { // can be nil in tests - publishTs = lookup.TipSet - } + publishTs, err := types.TipSetKeyFromBytes(dealInfo.PublishMsgTipSet) + if err != nil { + return false, false, err } - diff, err := api.diffPreCommits(ctx, provider, publishTs, ts.Key()) + diff, err := mgr.dpc.diffPreCommits(ctx, provider, publishTs, ts.Key()) if err != nil { return false, false, err } for _, info := range diff.Added { for _, d := range info.Info.DealIDs { - if d == di { + if d == dealInfo.DealID { cb(info.Info.SectorNumber, false, nil) return true, false, nil } @@ -103,7 +128,7 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev // If the deal hasn't been activated by the proposed start epoch, the // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - err = xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + err = xerrors.Errorf("deal with piece CID %s was not activated by proposed deal start epoch %d", proposal.PieceCID, proposal.StartEpoch) return false, err } @@ -118,16 +143,16 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return false, xerrors.Errorf("unmarshal pre commit: %w", err) } - // When the deal is published, the deal ID may change, so get the + // When there is a reorg, the deal ID may change, so get the // current deal ID from the publish message CID - dealID, _, _, err = GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid) if err != nil { return false, err } // Check through the deal IDs associated with this message for _, did := range params.DealIDs { - if did == dealID { + if did == res.DealID { // Found the deal ID in this message. Callback with the sector ID. cb(params.SectorNumber, false, nil) return false, nil @@ -144,14 +169,14 @@ func OnDealSectorPreCommitted(ctx context.Context, api getCurrentDealInfoAPI, ev return nil } - if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { + if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { return xerrors.Errorf("failed to set up called handler: %w", err) } return nil } -func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, eventsApi sectorCommittedEventsAPI, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid *cid.Cid, callback storagemarket.DealSectorCommittedCallback) error { +func (mgr *SectorCommittedManager) OnDealSectorCommitted(ctx context.Context, provider address.Address, sectorNumber abi.SectorNumber, proposal market.DealProposal, publishCid cid.Cid, callback storagemarket.DealSectorCommittedCallback) error { // Ensure callback is only called once var once sync.Once cb := func(err error) { @@ -162,7 +187,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event // First check if the deal is already active, and if so, bail out checkFunc := func(ts *types.TipSet) (done bool, more bool, err error) { - _, isActive, _, err := checkIfDealAlreadyActive(ctx, api, ts, dealID, proposal, publishCid) + _, isActive, err := mgr.checkIfDealAlreadyActive(ctx, ts, &proposal, publishCid) if err != nil { // Note: the error returned from here will end up being returned // from OnDealSectorCommitted so no need to call the callback @@ -208,7 +233,7 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event // If the deal hasn't been activated by the proposed start epoch, the // deal will timeout (when msg == nil it means the timeout epoch was reached) if msg == nil { - err := xerrors.Errorf("deal %d was not activated by proposed deal start epoch %d", dealID, proposal.StartEpoch) + err := xerrors.Errorf("deal with piece CID %s was not activated by proposed deal start epoch %d", proposal.PieceCID, proposal.StartEpoch) return false, err } @@ -218,17 +243,17 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event } // Get the deal info - _, sd, _, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) + res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), &proposal, publishCid) if err != nil { return false, xerrors.Errorf("failed to look up deal on chain: %w", err) } // Make sure the deal is active - if sd.State.SectorStartEpoch < 1 { - return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", dealID, ts.ParentState(), ts.Height()) + if res.MarketDeal.State.SectorStartEpoch < 1 { + return false, xerrors.Errorf("deal wasn't active: deal=%d, parentState=%s, h=%d", res.DealID, ts.ParentState(), ts.Height()) } - log.Infof("Storage deal %d activated at epoch %d", dealID, sd.State.SectorStartEpoch) + log.Infof("Storage deal %d activated at epoch %d", res.DealID, res.MarketDeal.State.SectorStartEpoch) cb(nil) @@ -241,29 +266,29 @@ func OnDealSectorCommitted(ctx context.Context, api getCurrentDealInfoAPI, event return nil } - if err := eventsApi.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { + if err := mgr.ev.Called(checkFunc, called, revert, int(build.MessageConfidence+1), timeoutEpoch, matchEvent); err != nil { return xerrors.Errorf("failed to set up called handler: %w", err) } return nil } -func checkIfDealAlreadyActive(ctx context.Context, api getCurrentDealInfoAPI, ts *types.TipSet, dealID abi.DealID, proposal market.DealProposal, publishCid *cid.Cid) (abi.DealID, bool, types.TipSetKey, error) { - di, sd, publishTs, err := GetCurrentDealInfo(ctx, ts, api, dealID, proposal, publishCid) +func (mgr *SectorCommittedManager) checkIfDealAlreadyActive(ctx context.Context, ts *types.TipSet, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, bool, error) { + res, err := mgr.dealInfo.GetCurrentDealInfo(ctx, ts.Key().Bytes(), proposal, publishCid) if err != nil { // TODO: This may be fine for some errors - return 0, false, types.EmptyTSK, xerrors.Errorf("failed to look up deal on chain: %w", err) - } - - // Sector with deal is already active - if sd.State.SectorStartEpoch > 0 { - return 0, true, publishTs, nil + return res, false, xerrors.Errorf("failed to look up deal on chain: %w", err) } // Sector was slashed - if sd.State.SlashEpoch > 0 { - return 0, false, types.EmptyTSK, xerrors.Errorf("deal %d was slashed at epoch %d", dealID, sd.State.SlashEpoch) + if res.MarketDeal.State.SlashEpoch > 0 { + return res, false, xerrors.Errorf("deal %d was slashed at epoch %d", res.DealID, res.MarketDeal.State.SlashEpoch) } - return di, false, publishTs, nil + // Sector with deal is already active + if res.MarketDeal.State.SectorStartEpoch > 0 { + return res, true, nil + } + + return res, false, nil } diff --git a/markets/storageadapter/ondealsectorcommitted_test.go b/markets/storageadapter/ondealsectorcommitted_test.go index dea1f89d2..db56ee651 100644 --- a/markets/storageadapter/ondealsectorcommitted_test.go +++ b/markets/storageadapter/ondealsectorcommitted_test.go @@ -7,6 +7,9 @@ import ( "fmt" "math/rand" "testing" + "time" + + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "golang.org/x/xerrors" @@ -15,13 +18,13 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/cbor" - "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" test "github.com/filecoin-project/lotus/chain/events/state/mock" "github.com/filecoin-project/lotus/chain/types" + tutils "github.com/filecoin-project/specs-actors/v2/support/testing" "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" ) @@ -32,14 +35,17 @@ func TestOnDealSectorPreCommitted(t *testing.T) { publishCid := generateCids(1)[0] sealedCid := generateCids(1)[0] pieceCid := generateCids(1)[0] - startDealID := abi.DealID(rand.Uint64()) - newDealID := abi.DealID(rand.Uint64()) - newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID}) + dealID := abi.DealID(rand.Uint64()) sectorNumber := abi.SectorNumber(rand.Uint64()) proposal := market.DealProposal{ - PieceCID: pieceCid, - PieceSize: abi.PaddedPieceSize(rand.Uint64()), - Label: "success", + PieceCID: pieceCid, + PieceSize: abi.PaddedPieceSize(rand.Uint64()), + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(1), + ProviderCollateral: abi.NewTokenAmount(1), + ClientCollateral: abi.NewTokenAmount(1), + Label: "success", } unfinishedDeal := &api.MarketDeal{ Proposal: proposal, @@ -48,17 +54,26 @@ func TestOnDealSectorPreCommitted(t *testing.T) { LastUpdatedEpoch: 2, }, } - successDeal := &api.MarketDeal{ + activeDeal := &api.MarketDeal{ Proposal: proposal, State: market.DealState{ SectorStartEpoch: 1, LastUpdatedEpoch: 2, }, } + slashedDeal := &api.MarketDeal{ + Proposal: proposal, + State: market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 2, + }, + } type testCase struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - checkTsDeals map[abi.DealID]*api.MarketDeal + currentDealInfo sealing.CurrentDealInfo + currentDealInfoErr error + currentDealInfoErr2 error + preCommitDiff *miner.PreCommitChanges matchStates []matchState dealStartEpochTimeout bool expectedCBCallCount uint64 @@ -69,45 +84,17 @@ func TestOnDealSectorPreCommitted(t *testing.T) { } testCases := map[string]testCase{ "normal sequence": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, }, matchStates: []matchState{ { msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{ SectorNumber: sectorNumber, SealedCID: sealedCid, - DealIDs: []abi.DealID{startDealID}, + DealIDs: []abi.DealID{dealID}, }), - deals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, - }, - }, - }, - expectedCBCallCount: 1, - expectedCBIsActive: false, - expectedCBSectorNumber: sectorNumber, - }, - "deal id changes in called": { - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: newValueReturn, - }, - }, - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - newDealID: unfinishedDeal, - }, - matchStates: []matchState{ - { - msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{ - SectorNumber: sectorNumber, - SealedCID: sealedCid, - DealIDs: []abi.DealID{newDealID}, - }), - deals: map[abi.DealID]*api.MarketDeal{ - newDealID: unfinishedDeal, - }, }, }, expectedCBCallCount: 1, @@ -115,85 +102,98 @@ func TestOnDealSectorPreCommitted(t *testing.T) { expectedCBSectorNumber: sectorNumber, }, "ignores unsuccessful pre-commit message": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, }, matchStates: []matchState{ { msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{ SectorNumber: sectorNumber, SealedCID: sealedCid, - DealIDs: []abi.DealID{startDealID}, + DealIDs: []abi.DealID{dealID}, }), - deals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, - }, + // non-zero exit code indicates unsuccessful pre-commit message receipt: &types.MessageReceipt{ExitCode: 1}, }, }, expectedCBCallCount: 0, }, - "error on deal in check": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{}, - searchMessageErr: errors.New("something went wrong"), - expectedCBCallCount: 0, - expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), + "deal already pre-committed": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, + }, + preCommitDiff: &miner.PreCommitChanges{ + Added: []miner.SectorPreCommitOnChainInfo{{ + Info: miner.SectorPreCommitInfo{ + SectorNumber: sectorNumber, + DealIDs: []abi.DealID{dealID}, + }, + }}, + }, + expectedCBCallCount: 1, + expectedCBIsActive: false, + expectedCBSectorNumber: sectorNumber, }, - "sector start epoch > 0 in check": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: successDeal, + "error getting current deal info in check func": { + currentDealInfoErr: errors.New("something went wrong"), + expectedCBCallCount: 0, + expectedError: xerrors.Errorf("failed to set up called handler: failed to look up deal on chain: something went wrong"), + }, + "sector already active": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: activeDeal, }, expectedCBCallCount: 1, expectedCBIsActive: true, }, - "error on deal in pre-commit": { - searchMessageErr: errors.New("something went wrong"), - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, + "sector was slashed": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: slashedDeal, + PublishMsgTipSet: nil, }, + expectedCBCallCount: 0, + expectedError: xerrors.Errorf("failed to set up called handler: deal %d was slashed at epoch %d", dealID, slashedDeal.State.SlashEpoch), + }, + "error getting current deal info in called func": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, + }, + currentDealInfoErr2: errors.New("something went wrong"), matchStates: []matchState{ { msg: makeMessage(t, provider, miner.Methods.PreCommitSector, &miner.SectorPreCommitInfo{ SectorNumber: sectorNumber, SealedCID: sealedCid, - DealIDs: []abi.DealID{startDealID}, + DealIDs: []abi.DealID{dealID}, }), - deals: map[abi.DealID]*api.MarketDeal{}, }, }, - expectedCBCallCount: 0, - expectedError: errors.New("failed to set up called handler: something went wrong"), + expectedCBCallCount: 1, + expectedCBError: errors.New("handling applied event: something went wrong"), }, "proposed deal epoch timeout": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: activeDeal, }, dealStartEpochTimeout: true, expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), + expectedCBError: xerrors.Errorf("handling applied event: deal with piece CID %s was not activated by proposed deal start epoch 0", unfinishedDeal.Proposal.PieceCID), }, } runTestCase := func(testCase string, data testCase) { t.Run(testCase, func(t *testing.T) { - // ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - // defer cancel() - api := &mockGetCurrentDealInfoAPI{ - SearchMessageLookup: data.searchMessageLookup, - SearchMessageErr: data.searchMessageErr, - MarketDeals: make(map[marketDealKey]*api.MarketDeal), - } checkTs, err := test.MockTipset(provider, rand.Uint64()) require.NoError(t, err) - for dealID, deal := range data.checkTsDeals { - api.MarketDeals[marketDealKey{dealID, checkTs.Key()}] = deal - } matchMessages := make([]matchMessage, len(data.matchStates)) for i, ms := range data.matchStates { matchTs, err := test.MockTipset(provider, rand.Uint64()) require.NoError(t, err) - for dealID, deal := range ms.deals { - api.MarketDeals[marketDealKey{dealID, matchTs.Key()}] = deal - } matchMessages[i] = matchMessage{ curH: 5, msg: ms.msg, @@ -217,7 +217,18 @@ func TestOnDealSectorPreCommitted(t *testing.T) { cbIsActive = isActive cbError = err } - err = OnDealSectorPreCommitted(ctx, api, eventsAPI, provider, startDealID, proposal, &publishCid, cb) + + mockPCAPI := &mockPreCommitsAPI{ + PCChanges: data.preCommitDiff, + } + mockDIAPI := &mockDealInfoAPI{ + CurrentDealInfo: data.currentDealInfo, + CurrentDealInfo2: data.currentDealInfo, + Err: data.currentDealInfoErr, + Err2: data.currentDealInfoErr2, + } + scm := newSectorCommittedManager(eventsAPI, mockDIAPI, mockPCAPI) + err = scm.OnDealSectorPreCommitted(ctx, provider, proposal, publishCid, cb) if data.expectedError == nil { require.NoError(t, err) } else { @@ -240,17 +251,19 @@ func TestOnDealSectorPreCommitted(t *testing.T) { func TestOnDealSectorCommitted(t *testing.T) { provider := address.TestAddress - ctx := context.Background() publishCid := generateCids(1)[0] pieceCid := generateCids(1)[0] - startDealID := abi.DealID(rand.Uint64()) - newDealID := abi.DealID(rand.Uint64()) - newValueReturn := makePublishDealsReturnBytes(t, []abi.DealID{newDealID}) + dealID := abi.DealID(rand.Uint64()) sectorNumber := abi.SectorNumber(rand.Uint64()) proposal := market.DealProposal{ - PieceCID: pieceCid, - PieceSize: abi.PaddedPieceSize(rand.Uint64()), - Label: "success", + PieceCID: pieceCid, + PieceSize: abi.PaddedPieceSize(rand.Uint64()), + Client: tutils.NewActorAddr(t, "client"), + Provider: tutils.NewActorAddr(t, "provider"), + StoragePricePerEpoch: abi.NewTokenAmount(1), + ProviderCollateral: abi.NewTokenAmount(1), + ClientCollateral: abi.NewTokenAmount(1), + Label: "success", } unfinishedDeal := &api.MarketDeal{ Proposal: proposal, @@ -259,17 +272,26 @@ func TestOnDealSectorCommitted(t *testing.T) { LastUpdatedEpoch: 2, }, } - successDeal := &api.MarketDeal{ + activeDeal := &api.MarketDeal{ Proposal: proposal, State: market.DealState{ SectorStartEpoch: 1, LastUpdatedEpoch: 2, }, } + slashedDeal := &api.MarketDeal{ + Proposal: proposal, + State: market.DealState{ + SectorStartEpoch: 1, + LastUpdatedEpoch: 2, + SlashEpoch: 2, + }, + } type testCase struct { - searchMessageLookup *api.MsgLookup - searchMessageErr error - checkTsDeals map[abi.DealID]*api.MarketDeal + currentDealInfo sealing.CurrentDealInfo + currentDealInfoErr error + currentDealInfo2 sealing.CurrentDealInfo + currentDealInfoErr2 error matchStates []matchState dealStartEpochTimeout bool expectedCBCallCount uint64 @@ -278,121 +300,118 @@ func TestOnDealSectorCommitted(t *testing.T) { } testCases := map[string]testCase{ "normal sequence": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, + }, + currentDealInfo2: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: activeDeal, }, matchStates: []matchState{ { msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{ SectorNumber: sectorNumber, }), - deals: map[abi.DealID]*api.MarketDeal{ - startDealID: successDeal, - }, - }, - }, - expectedCBCallCount: 1, - }, - "deal id changes in called": { - searchMessageLookup: &api.MsgLookup{ - Receipt: types.MessageReceipt{ - ExitCode: exitcode.Ok, - Return: newValueReturn, - }, - }, - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - newDealID: unfinishedDeal, - }, - matchStates: []matchState{ - { - msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{ - SectorNumber: sectorNumber, - }), - deals: map[abi.DealID]*api.MarketDeal{ - newDealID: successDeal, - }, }, }, expectedCBCallCount: 1, }, "ignores unsuccessful prove-commit message": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, + }, + currentDealInfo2: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: activeDeal, }, matchStates: []matchState{ { msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{ SectorNumber: sectorNumber, }), - deals: map[abi.DealID]*api.MarketDeal{ - startDealID: successDeal, - }, + // Exit-code 1 means the prove-commit was unsuccessful receipt: &types.MessageReceipt{ExitCode: 1}, }, }, expectedCBCallCount: 0, }, - "error on deal in check": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{}, - searchMessageErr: errors.New("something went wrong"), + "error getting current deal info in check func": { + currentDealInfoErr: errors.New("something went wrong"), expectedCBCallCount: 0, - expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), + expectedError: xerrors.Errorf("failed to set up called handler: failed to look up deal on chain: something went wrong"), }, - "sector start epoch > 0 in check": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: successDeal, + "sector already active": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: activeDeal, }, expectedCBCallCount: 1, }, - "error on deal in called": { - searchMessageErr: errors.New("something went wrong"), - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, + "sector was slashed": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: slashedDeal, + }, + expectedCBCallCount: 0, + expectedError: xerrors.Errorf("failed to set up called handler: deal %d was slashed at epoch %d", dealID, slashedDeal.State.SlashEpoch), + }, + "error getting current deal info in called func": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, + }, + currentDealInfoErr2: errors.New("something went wrong"), + matchStates: []matchState{ + { + msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{ + SectorNumber: sectorNumber, + }), + }, + }, + expectedCBCallCount: 1, + expectedCBError: xerrors.Errorf("handling applied event: failed to look up deal on chain: something went wrong"), + }, + "proposed deal epoch timeout": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, + }, + dealStartEpochTimeout: true, + expectedCBCallCount: 1, + expectedCBError: xerrors.Errorf("handling applied event: deal with piece CID %s was not activated by proposed deal start epoch 0", unfinishedDeal.Proposal.PieceCID), + }, + "got prove-commit but deal not active": { + currentDealInfo: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, + }, + currentDealInfo2: sealing.CurrentDealInfo{ + DealID: dealID, + MarketDeal: unfinishedDeal, }, matchStates: []matchState{ { msg: makeMessage(t, provider, miner.Methods.ProveCommitSector, &miner.ProveCommitSectorParams{ SectorNumber: sectorNumber, }), - deals: map[abi.DealID]*api.MarketDeal{ - newDealID: successDeal, - }, }, }, expectedCBCallCount: 1, - expectedCBError: errors.New("handling applied event: failed to look up deal on chain: something went wrong"), - expectedError: errors.New("failed to set up called handler: failed to look up deal on chain: something went wrong"), - }, - "proposed deal epoch timeout": { - checkTsDeals: map[abi.DealID]*api.MarketDeal{ - startDealID: unfinishedDeal, - }, - dealStartEpochTimeout: true, - expectedCBCallCount: 1, - expectedCBError: xerrors.Errorf("handling applied event: deal %d was not activated by proposed deal start epoch 0", startDealID), + expectedCBError: xerrors.Errorf("handling applied event: deal wasn't active: deal=%d, parentState=bafkqaaa, h=5", dealID), }, } runTestCase := func(testCase string, data testCase) { t.Run(testCase, func(t *testing.T) { - // ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - // defer cancel() - api := &mockGetCurrentDealInfoAPI{ - SearchMessageLookup: data.searchMessageLookup, - SearchMessageErr: data.searchMessageErr, - MarketDeals: make(map[marketDealKey]*api.MarketDeal), - } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() checkTs, err := test.MockTipset(provider, rand.Uint64()) require.NoError(t, err) - for dealID, deal := range data.checkTsDeals { - api.MarketDeals[marketDealKey{dealID, checkTs.Key()}] = deal - } matchMessages := make([]matchMessage, len(data.matchStates)) for i, ms := range data.matchStates { matchTs, err := test.MockTipset(provider, rand.Uint64()) require.NoError(t, err) - for dealID, deal := range ms.deals { - api.MarketDeals[marketDealKey{dealID, matchTs.Key()}] = deal - } matchMessages[i] = matchMessage{ curH: 5, msg: ms.msg, @@ -412,7 +431,15 @@ func TestOnDealSectorCommitted(t *testing.T) { cbCallCount++ cbError = err } - err = OnDealSectorCommitted(ctx, api, eventsAPI, provider, startDealID, sectorNumber, proposal, &publishCid, cb) + mockPCAPI := &mockPreCommitsAPI{} + mockDIAPI := &mockDealInfoAPI{ + CurrentDealInfo: data.currentDealInfo, + CurrentDealInfo2: data.currentDealInfo2, + Err: data.currentDealInfoErr, + Err2: data.currentDealInfoErr2, + } + scm := newSectorCommittedManager(eventsAPI, mockDIAPI, mockPCAPI) + err = scm.OnDealSectorCommitted(ctx, provider, sectorNumber, proposal, publishCid, cb) if data.expectedError == nil { require.NoError(t, err) } else { @@ -434,7 +461,6 @@ func TestOnDealSectorCommitted(t *testing.T) { type matchState struct { msg *types.Message receipt *types.MessageReceipt - deals map[abi.DealID]*api.MarketDeal } type matchMessage struct { @@ -476,7 +502,8 @@ func (fe *fakeEvents) Called(check events.CheckFunc, msgHnd events.MsgHandler, r } more, err := msgHnd(matchMessage.msg, receipt, matchMessage.ts, matchMessage.curH) if err != nil { - return err + // error is handled through a callback rather than being returned + return nil } if matchMessage.doesRevert { err := rev(fe.Ctx, matchMessage.ts) @@ -514,3 +541,32 @@ func generateCids(n int) []cid.Cid { } return cids } + +type mockPreCommitsAPI struct { + PCChanges *miner.PreCommitChanges + Err error +} + +func (m *mockPreCommitsAPI) diffPreCommits(ctx context.Context, actor address.Address, pre, cur types.TipSetKey) (*miner.PreCommitChanges, error) { + pcc := &miner.PreCommitChanges{} + if m.PCChanges != nil { + pcc = m.PCChanges + } + return pcc, m.Err +} + +type mockDealInfoAPI struct { + count int + CurrentDealInfo sealing.CurrentDealInfo + Err error + CurrentDealInfo2 sealing.CurrentDealInfo + Err2 error +} + +func (m *mockDealInfoAPI) GetCurrentDealInfo(ctx context.Context, tok sealing.TipSetToken, proposal *market.DealProposal, publishCid cid.Cid) (sealing.CurrentDealInfo, error) { + m.count++ + if m.count == 2 { + return m.CurrentDealInfo2, m.Err2 + } + return m.CurrentDealInfo, m.Err +} diff --git a/markets/storageadapter/provider.go b/markets/storageadapter/provider.go index c254a30cd..f81fadc32 100644 --- a/markets/storageadapter/provider.go +++ b/markets/storageadapter/provider.go @@ -9,6 +9,7 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" + "go.uber.org/fx" "golang.org/x/xerrors" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" @@ -22,7 +23,6 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" - "github.com/filecoin-project/lotus/chain/actors" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/events" @@ -33,6 +33,7 @@ import ( "github.com/filecoin-project/lotus/markets/utils" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -42,7 +43,6 @@ var log = logging.Logger("storageadapter") type ProviderNodeAdapter struct { api.FullNode - *apiWrapper // this goes away with the data transfer module dag dtypes.StagingDAG @@ -50,57 +50,38 @@ type ProviderNodeAdapter struct { secb *sectorblocks.SectorBlocks ev *events.Events - publishSpec, addBalanceSpec *api.MessageSendSpec - dsMatcher *dealStateMatcher + dealPublisher *DealPublisher + + addBalanceSpec *api.MessageSendSpec + dsMatcher *dealStateMatcher + scMgr *SectorCommittedManager } -func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { - return func(dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode) storagemarket.StorageProviderNode { - na := &ProviderNodeAdapter{ - FullNode: full, - apiWrapper: &apiWrapper{api: full}, +func NewProviderNodeAdapter(fc *config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, dag dtypes.StagingDAG, secb *sectorblocks.SectorBlocks, full api.FullNode, dealPublisher *DealPublisher) storagemarket.StorageProviderNode { + ctx := helpers.LifecycleCtx(mctx, lc) - dag: dag, - secb: secb, - ev: events.NewEvents(context.TODO(), full), - dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))), + ev := events.NewEvents(ctx, full) + na := &ProviderNodeAdapter{ + FullNode: full, + + dag: dag, + secb: secb, + ev: ev, + dealPublisher: dealPublisher, + dsMatcher: newDealStateMatcher(state.NewStatePredicates(state.WrapFastAPI(full))), } if fc != nil { - na.publishSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxPublishDealsFee)} na.addBalanceSpec = &api.MessageSendSpec{MaxFee: abi.TokenAmount(fc.MaxMarketBalanceAddFee)} } + na.scMgr = NewSectorCommittedManager(ev, na, &apiWrapper{api: full}) + return na } } func (n *ProviderNodeAdapter) PublishDeals(ctx context.Context, deal storagemarket.MinerDeal) (cid.Cid, error) { - log.Info("publishing deal") - - mi, err := n.StateMinerInfo(ctx, deal.Proposal.Provider, types.EmptyTSK) - if err != nil { - return cid.Undef, err - } - - params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{ - Deals: []market2.ClientDealProposal{deal.ClientDealProposal}, - }) - - if err != nil { - return cid.Undef, xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err) - } - - // TODO: We may want this to happen after fetching data - smsg, err := n.MpoolPushMessage(ctx, &types.Message{ - To: market.Address, - From: mi.Worker, - Value: types.NewInt(0), - Method: market.Methods.PublishStorageDeals, - Params: params, - }, n.publishSpec) - if err != nil { - return cid.Undef, err - } - return smsg.Cid(), nil + return n.dealPublisher.Publish(ctx, deal.ClientDealProposal) } func (n *ProviderNodeAdapter) OnDealComplete(ctx context.Context, deal storagemarket.MinerDeal, pieceSize abi.UnpaddedPieceSize, pieceData io.Reader) (*storagemarket.PackingResult, error) { @@ -280,12 +261,14 @@ func (n *ProviderNodeAdapter) DealProviderCollateralBounds(ctx context.Context, return bounds.Min, bounds.Max, nil } +// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer) func (n *ProviderNodeAdapter) OnDealSectorPreCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorPreCommittedCallback) error { - return OnDealSectorPreCommitted(ctx, n, n.ev, provider, dealID, market.DealProposal(proposal), publishCid, cb) + return n.scMgr.OnDealSectorPreCommitted(ctx, provider, market.DealProposal(proposal), *publishCid, cb) } +// TODO: Remove dealID parameter, change publishCid to be cid.Cid (instead of pointer) func (n *ProviderNodeAdapter) OnDealSectorCommitted(ctx context.Context, provider address.Address, dealID abi.DealID, sectorNumber abi.SectorNumber, proposal market2.DealProposal, publishCid *cid.Cid, cb storagemarket.DealSectorCommittedCallback) error { - return OnDealSectorCommitted(ctx, n, n.ev, provider, dealID, sectorNumber, market.DealProposal(proposal), publishCid, cb) + return n.scMgr.OnDealSectorCommitted(ctx, provider, sectorNumber, market.DealProposal(proposal), *publishCid, cb) } func (n *ProviderNodeAdapter) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) { diff --git a/node/builder.go b/node/builder.go index 1dd60ee1b..5f61420d4 100644 --- a/node/builder.go +++ b/node/builder.go @@ -377,6 +377,7 @@ func Online() Option { Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)), Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)), Override(new(storagemarket.StorageProvider), modules.StorageProvider), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)), Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds), Override(HandleRetrievalKey, modules.HandleRetrieval), @@ -519,6 +520,10 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))), ), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{ + Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod), + MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg, + })), Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)), Override(new(sectorstorage.SealerConfig), cfg.Storage), @@ -646,5 +651,6 @@ func Test() Option { Unset(RunPeerMgrKey), Unset(new(*peermgr.PeerMgr)), Override(new(beacon.Schedule), testing.RandomBeacon), + Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})), ) } diff --git a/node/config/def.go b/node/config/def.go index 5e72df985..3be773367 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -48,6 +48,12 @@ type DealmakingConfig struct { ConsiderUnverifiedStorageDeals bool PieceCidBlocklist []cid.Cid ExpectedSealDuration Duration + // The amount of time to wait for more deals to arrive before + // publishing + PublishMsgPeriod Duration + // The maximum number of deals to include in a single PublishStorageDeals + // message + MaxDealsPerPublishMsg uint64 Filter string RetrievalFilter string @@ -208,7 +214,9 @@ func DefaultStorageMiner() *StorageMiner { ConsiderUnverifiedStorageDeals: true, PieceCidBlocklist: []cid.Cid{}, // TODO: It'd be nice to set this based on sector size - ExpectedSealDuration: Duration(time.Hour * 24), + ExpectedSealDuration: Duration(time.Hour * 24), + PublishMsgPeriod: Duration(time.Hour), + MaxDealsPerPublishMsg: 8, }, Fees: MinerFeeConfig{ diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 91a1b74db..bfe1ed0e8 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -54,8 +54,8 @@ type StateModuleAPI interface { StateMinerProvingDeadline(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*dline.Info, error) StateMinerPower(context.Context, address.Address, types.TipSetKey) (*api.MinerPower, error) StateNetworkVersion(ctx context.Context, key types.TipSetKey) (network.Version, error) - StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) StateSearchMsg(ctx context.Context, msg cid.Cid) (*api.MsgLookup, error) + StateSectorGetInfo(ctx context.Context, maddr address.Address, n abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) StateVerifiedClientStatus(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*abi.StoragePower, error) StateWaitMsg(ctx context.Context, msg cid.Cid, confidence uint64) (*api.MsgLookup, error) } diff --git a/node/node_test.go b/node/node_test.go index e8afd8789..69d6147bc 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -5,14 +5,12 @@ import ( "testing" "time" - builder "github.com/filecoin-project/lotus/node/test" - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/lib/lotuslog" - logging "github.com/ipfs/go-log/v2" - "github.com/filecoin-project/lotus/api/test" "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/lib/lotuslog" + builder "github.com/filecoin-project/lotus/node/test" + logging "github.com/ipfs/go-log/v2" ) func init() { @@ -57,8 +55,8 @@ func TestAPIDealFlow(t *testing.T) { t.Run("TestFastRetrievalDealFlow", func(t *testing.T) { test.TestFastRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch) }) - t.Run("TestZeroPricePerByteRetrievalDealFlow", func(t *testing.T) { - test.TestZeroPricePerByteRetrievalDealFlow(t, builder.MockSbBuilder, blockTime, dealStartEpoch) + t.Run("TestPublishDealsBatching", func(t *testing.T) { + test.TestPublishDealsBatching(t, builder.MockSbBuilder, blockTime, dealStartEpoch) }) } diff --git a/node/test/builder.go b/node/test/builder.go index dc14e9f0a..72a55ab49 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -288,7 +288,11 @@ func mockBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []test. genMiner := maddrs[i] wa := genms[i].Worker - storers[i] = CreateTestStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options()) + opts := def.Opts + if opts == nil { + opts = node.Options() + } + storers[i] = CreateTestStorageNode(ctx, t, wa, genMiner, pk, f, mn, opts) if err := storers[i].StorageAddLocal(ctx, presealDirs[i]); err != nil { t.Fatalf("%+v", err) } @@ -455,12 +459,17 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes } } + opts := def.Opts + if opts == nil { + opts = node.Options() + } storers[i] = CreateTestStorageNode(ctx, t, genms[i].Worker, maddrs[i], pidKeys[i], f, mn, node.Options( node.Override(new(sectorstorage.SectorManager), func() (sectorstorage.SectorManager, error) { return mock.NewMockSectorMgr(sectors), nil }), node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), node.Unset(new(*sectorstorage.Manager)), + opts, )) if rpc { diff --git a/paychmgr/settler/settler.go b/paychmgr/settler/settler.go index 131cd25a7..3abd136fc 100644 --- a/paychmgr/settler/settler.go +++ b/paychmgr/settler/settler.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/impl/full" payapi "github.com/filecoin-project/lotus/node/impl/paych" + "github.com/filecoin-project/lotus/node/modules/helpers" ) var log = logging.Logger("payment-channel-settler") @@ -50,9 +51,10 @@ type paymentChannelSettler struct { // SettlePaymentChannels checks the chain for events related to payment channels settling and // submits any vouchers for inbound channels tracked for this node -func SettlePaymentChannels(lc fx.Lifecycle, api API) error { +func SettlePaymentChannels(mctx helpers.MetricsCtx, lc fx.Lifecycle, api API) error { + ctx := helpers.LifecycleCtx(mctx, lc) lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { + OnStart: func(context.Context) error { pcs := newPaymentChannelSettler(ctx, &api) ev := events.NewEvents(ctx, &api) return ev.Called(pcs.check, pcs.messageHandler, pcs.revertHandler, int(build.MessageConfidence+1), events.NoTimeout, pcs.matcher)