diff --git a/markets/storageadapter/dealpublisher.go b/markets/storageadapter/dealpublisher.go index f458e7a4f..7508a5d33 100644 --- a/markets/storageadapter/dealpublisher.go +++ b/markets/storageadapter/dealpublisher.go @@ -14,6 +14,8 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/exitcode" + market0 "github.com/filecoin-project/specs-actors/actors/builtin/market" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/filecoin-project/lotus/api" @@ -35,6 +37,7 @@ type dealPublisherAPI interface { WalletHas(context.Context, address.Address) (bool, error) StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error) } // DealPublisher batches deal publishing so that many deals can be included in @@ -295,7 +298,7 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) { // Validate the deal if err := p.validateDeal(pd.deal); err != nil { // Validation failed, complete immediately with an error - go onComplete(pd, cid.Undef, err) + go onComplete(pd, cid.Undef, xerrors.Errorf("publish validation failed: %w", err)) continue } @@ -315,6 +318,13 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) { // validateDeal checks that the deal proposal start epoch hasn't already // elapsed func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error { + start := time.Now() + + pcid, err := deal.Proposal.Cid() + if err != nil { + return xerrors.Errorf("computing proposal cid: %w", err) + } + head, err := p.api.ChainHead(p.ctx) if err != nil { return err @@ -324,6 +334,41 @@ func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error { "cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d", deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch) } + + mi, err := p.api.StateMinerInfo(p.ctx, deal.Proposal.Provider, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting provider info: %w", err) + } + + params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{ + Deals: []market0.ClientDealProposal{deal}, + }) + if err != nil { + return xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err) + } + + addr, _, err := p.as.AddressFor(p.ctx, p.api, mi, api.DealPublishAddr, big.Zero(), big.Zero()) + if err != nil { + return xerrors.Errorf("selecting address for publishing deals: %w", err) + } + + res, err := p.api.StateCall(p.ctx, &types.Message{ + To: market.Address, + From: addr, + Value: types.NewInt(0), + Method: market.Methods.PublishStorageDeals, + Params: params, + }, head.Key()) + if err != nil { + return xerrors.Errorf("simulating deal publish message: %w", err) + } + if res.MsgRct.ExitCode != exitcode.Ok { + return xerrors.Errorf("simulating deal publish message: non-zero exitcode %s; message: %s", res.MsgRct.ExitCode, res.Error) + } + + took := time.Now().Sub(start) + log.Infow("validating deal", "took", took, "proposal", pcid) + return nil } diff --git a/markets/storageadapter/dealpublisher_test.go b/markets/storageadapter/dealpublisher_test.go index a4991396a..351a00171 100644 --- a/markets/storageadapter/dealpublisher_test.go +++ b/markets/storageadapter/dealpublisher_test.go @@ -6,24 +6,25 @@ import ( "testing" "time" - "github.com/filecoin-project/go-state-types/crypto" - market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" "github.com/ipfs/go-cid" "github.com/raulk/clock" + "golang.org/x/xerrors" "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/exitcode" + market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" tutils "github.com/filecoin-project/specs-actors/v2/support/testing" - "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" market0 "github.com/filecoin-project/specs-actors/actors/builtin/market" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/lotus/api" ) func TestDealPublisher(t *testing.T) { @@ -41,6 +42,7 @@ func TestDealPublisher(t *testing.T) { expiredDeals int dealCountAfterPublishPeriod int expectedDealsPerMsg []int + failOne bool }{{ name: "publish one deal within publish period", publishPeriod: 10 * time.Millisecond, @@ -93,6 +95,14 @@ func TestDealPublisher(t *testing.T) { ctxCancelledWithinPublishPeriod: 0, dealCountAfterPublishPeriod: 2, expectedDealsPerMsg: []int{1, 1, 1, 1}, + }, { + name: "one deal failing doesn't fail the entire batch", + publishPeriod: 10 * time.Millisecond, + maxDealsPerMsg: 5, + dealCountWithinPublishPeriod: 2, + dealCountAfterPublishPeriod: 0, + failOne: true, + expectedDealsPerMsg: []int{1}, }} for _, tc := range testCases { @@ -112,14 +122,18 @@ func TestDealPublisher(t *testing.T) { // Publish deals within publish period for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { - deal := publishDeal(t, dp, false, false) - dealsToPublish = append(dealsToPublish, deal) + if tc.failOne && i == 1 { + publishDeal(t, dp, i, false, false) + } else { + deal := publishDeal(t, dp, 0, false, false) + dealsToPublish = append(dealsToPublish, deal) + } } for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ { - publishDeal(t, dp, true, false) + publishDeal(t, dp, 0, true, false) } for i := 0; i < tc.expiredDeals; i++ { - publishDeal(t, dp, false, true) + publishDeal(t, dp, 0, false, true) } // Wait until publish period has elapsed @@ -151,7 +165,7 @@ func TestDealPublisher(t *testing.T) { // Publish deals after publish period for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { - deal := publishDeal(t, dp, false, false) + deal := publishDeal(t, dp, 0, false, false) dealsToPublish = append(dealsToPublish, deal) } @@ -187,12 +201,12 @@ func TestForcePublish(t *testing.T) { // Queue three deals for publishing, one with a cancelled context var dealsToPublish []market.ClientDealProposal // 1. Regular deal - deal := publishDeal(t, dp, false, false) + deal := publishDeal(t, dp, 0, false, false) dealsToPublish = append(dealsToPublish, deal) // 2. Deal with cancelled context - publishDeal(t, dp, true, false) + publishDeal(t, dp, 0, true, false) // 3. Regular deal - deal = publishDeal(t, dp, false, false) + deal = publishDeal(t, dp, 0, false, false) dealsToPublish = append(dealsToPublish, deal) // Allow a moment for them to be queued @@ -217,7 +231,7 @@ func TestForcePublish(t *testing.T) { checkPublishedDeals(t, dpapi, dealsToPublish, []int{2}) } -func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired bool) market.ClientDealProposal { +func publishDeal(t *testing.T, dp *DealPublisher, invalid int, ctxCancelled bool, expired bool) market.ClientDealProposal { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -238,6 +252,7 @@ func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired boo Provider: getProviderActor(t), StartEpoch: startEpoch, EndEpoch: abi.ChainEpoch(120), + PieceSize: abi.PaddedPieceSize(invalid), // pass invalid into StateCall below }, ClientSignature: crypto.Signature{ Type: crypto.SigTypeSecp256k1, @@ -253,7 +268,7 @@ func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired boo return } - if ctxCancelled || expired { + if ctxCancelled || expired || invalid == 1 { require.Error(t, err) } else { require.NoError(t, err) @@ -381,6 +396,19 @@ func (d *dpAPI) StateLookupID(ctx context.Context, a address.Address, key types. panic("don't call me") } +func (d *dpAPI) StateCall(ctx context.Context, message *types.Message, key types.TipSetKey) (*api.InvocResult, error) { + var p market2.PublishStorageDealsParams + if err := p.UnmarshalCBOR(bytes.NewReader(message.Params)); err != nil { + return nil, xerrors.Errorf("unmarshal market params: %w", err) + } + + exit := exitcode.Ok + if p.Deals[0].Proposal.PieceSize == 1 { + exit = exitcode.ErrIllegalState + } + return &api.InvocResult{MsgRct: &types.MessageReceipt{ExitCode: exit}}, nil +} + func getClientActor(t *testing.T) address.Address { return tutils.NewActorAddr(t, "client") }