Merge pull request #7234 from filecoin-project/feat/dealpub-validation
dealpublisher: Fully validate deals before publishing
This commit is contained in:
commit
52cb19cc5c
@ -14,6 +14,8 @@ import (
|
|||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/go-state-types/big"
|
"github.com/filecoin-project/go-state-types/big"
|
||||||
|
"github.com/filecoin-project/go-state-types/exitcode"
|
||||||
|
market0 "github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
@ -35,6 +37,7 @@ type dealPublisherAPI interface {
|
|||||||
WalletHas(context.Context, address.Address) (bool, error)
|
WalletHas(context.Context, address.Address) (bool, error)
|
||||||
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||||
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
|
||||||
|
StateCall(context.Context, *types.Message, types.TipSetKey) (*api.InvocResult, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DealPublisher batches deal publishing so that many deals can be included in
|
// DealPublisher batches deal publishing so that many deals can be included in
|
||||||
@ -295,7 +298,7 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) {
|
|||||||
// Validate the deal
|
// Validate the deal
|
||||||
if err := p.validateDeal(pd.deal); err != nil {
|
if err := p.validateDeal(pd.deal); err != nil {
|
||||||
// Validation failed, complete immediately with an error
|
// Validation failed, complete immediately with an error
|
||||||
go onComplete(pd, cid.Undef, err)
|
go onComplete(pd, cid.Undef, xerrors.Errorf("publish validation failed: %w", err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -315,6 +318,13 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) {
|
|||||||
// validateDeal checks that the deal proposal start epoch hasn't already
|
// validateDeal checks that the deal proposal start epoch hasn't already
|
||||||
// elapsed
|
// elapsed
|
||||||
func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error {
|
func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
pcid, err := deal.Proposal.Cid()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("computing proposal cid: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
head, err := p.api.ChainHead(p.ctx)
|
head, err := p.api.ChainHead(p.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -324,6 +334,41 @@ func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error {
|
|||||||
"cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
|
"cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
|
||||||
deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch)
|
deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mi, err := p.api.StateMinerInfo(p.ctx, deal.Proposal.Provider, types.EmptyTSK)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting provider info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
params, err := actors.SerializeParams(&market2.PublishStorageDealsParams{
|
||||||
|
Deals: []market0.ClientDealProposal{deal},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("serializing PublishStorageDeals params failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
addr, _, err := p.as.AddressFor(p.ctx, p.api, mi, api.DealPublishAddr, big.Zero(), big.Zero())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("selecting address for publishing deals: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := p.api.StateCall(p.ctx, &types.Message{
|
||||||
|
To: market.Address,
|
||||||
|
From: addr,
|
||||||
|
Value: types.NewInt(0),
|
||||||
|
Method: market.Methods.PublishStorageDeals,
|
||||||
|
Params: params,
|
||||||
|
}, head.Key())
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("simulating deal publish message: %w", err)
|
||||||
|
}
|
||||||
|
if res.MsgRct.ExitCode != exitcode.Ok {
|
||||||
|
return xerrors.Errorf("simulating deal publish message: non-zero exitcode %s; message: %s", res.MsgRct.ExitCode, res.Error)
|
||||||
|
}
|
||||||
|
|
||||||
|
took := time.Now().Sub(start)
|
||||||
|
log.Infow("validating deal", "took", took, "proposal", pcid)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,24 +6,25 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/crypto"
|
|
||||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/raulk/clock"
|
"github.com/raulk/clock"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-address"
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/go-state-types/crypto"
|
||||||
|
"github.com/filecoin-project/go-state-types/exitcode"
|
||||||
|
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||||
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
|
tutils "github.com/filecoin-project/specs-actors/v2/support/testing"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
market0 "github.com/filecoin-project/specs-actors/actors/builtin/market"
|
market0 "github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
"github.com/filecoin-project/lotus/api"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDealPublisher(t *testing.T) {
|
func TestDealPublisher(t *testing.T) {
|
||||||
@ -41,6 +42,7 @@ func TestDealPublisher(t *testing.T) {
|
|||||||
expiredDeals int
|
expiredDeals int
|
||||||
dealCountAfterPublishPeriod int
|
dealCountAfterPublishPeriod int
|
||||||
expectedDealsPerMsg []int
|
expectedDealsPerMsg []int
|
||||||
|
failOne bool
|
||||||
}{{
|
}{{
|
||||||
name: "publish one deal within publish period",
|
name: "publish one deal within publish period",
|
||||||
publishPeriod: 10 * time.Millisecond,
|
publishPeriod: 10 * time.Millisecond,
|
||||||
@ -93,6 +95,14 @@ func TestDealPublisher(t *testing.T) {
|
|||||||
ctxCancelledWithinPublishPeriod: 0,
|
ctxCancelledWithinPublishPeriod: 0,
|
||||||
dealCountAfterPublishPeriod: 2,
|
dealCountAfterPublishPeriod: 2,
|
||||||
expectedDealsPerMsg: []int{1, 1, 1, 1},
|
expectedDealsPerMsg: []int{1, 1, 1, 1},
|
||||||
|
}, {
|
||||||
|
name: "one deal failing doesn't fail the entire batch",
|
||||||
|
publishPeriod: 10 * time.Millisecond,
|
||||||
|
maxDealsPerMsg: 5,
|
||||||
|
dealCountWithinPublishPeriod: 2,
|
||||||
|
dealCountAfterPublishPeriod: 0,
|
||||||
|
failOne: true,
|
||||||
|
expectedDealsPerMsg: []int{1},
|
||||||
}}
|
}}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
@ -112,14 +122,18 @@ func TestDealPublisher(t *testing.T) {
|
|||||||
|
|
||||||
// Publish deals within publish period
|
// Publish deals within publish period
|
||||||
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
|
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
|
||||||
deal := publishDeal(t, dp, false, false)
|
if tc.failOne && i == 1 {
|
||||||
|
publishDeal(t, dp, i, false, false)
|
||||||
|
} else {
|
||||||
|
deal := publishDeal(t, dp, 0, false, false)
|
||||||
dealsToPublish = append(dealsToPublish, deal)
|
dealsToPublish = append(dealsToPublish, deal)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
|
for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
|
||||||
publishDeal(t, dp, true, false)
|
publishDeal(t, dp, 0, true, false)
|
||||||
}
|
}
|
||||||
for i := 0; i < tc.expiredDeals; i++ {
|
for i := 0; i < tc.expiredDeals; i++ {
|
||||||
publishDeal(t, dp, false, true)
|
publishDeal(t, dp, 0, false, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait until publish period has elapsed
|
// Wait until publish period has elapsed
|
||||||
@ -151,7 +165,7 @@ func TestDealPublisher(t *testing.T) {
|
|||||||
|
|
||||||
// Publish deals after publish period
|
// Publish deals after publish period
|
||||||
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
|
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
|
||||||
deal := publishDeal(t, dp, false, false)
|
deal := publishDeal(t, dp, 0, false, false)
|
||||||
dealsToPublish = append(dealsToPublish, deal)
|
dealsToPublish = append(dealsToPublish, deal)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -187,12 +201,12 @@ func TestForcePublish(t *testing.T) {
|
|||||||
// Queue three deals for publishing, one with a cancelled context
|
// Queue three deals for publishing, one with a cancelled context
|
||||||
var dealsToPublish []market.ClientDealProposal
|
var dealsToPublish []market.ClientDealProposal
|
||||||
// 1. Regular deal
|
// 1. Regular deal
|
||||||
deal := publishDeal(t, dp, false, false)
|
deal := publishDeal(t, dp, 0, false, false)
|
||||||
dealsToPublish = append(dealsToPublish, deal)
|
dealsToPublish = append(dealsToPublish, deal)
|
||||||
// 2. Deal with cancelled context
|
// 2. Deal with cancelled context
|
||||||
publishDeal(t, dp, true, false)
|
publishDeal(t, dp, 0, true, false)
|
||||||
// 3. Regular deal
|
// 3. Regular deal
|
||||||
deal = publishDeal(t, dp, false, false)
|
deal = publishDeal(t, dp, 0, false, false)
|
||||||
dealsToPublish = append(dealsToPublish, deal)
|
dealsToPublish = append(dealsToPublish, deal)
|
||||||
|
|
||||||
// Allow a moment for them to be queued
|
// Allow a moment for them to be queued
|
||||||
@ -217,7 +231,7 @@ func TestForcePublish(t *testing.T) {
|
|||||||
checkPublishedDeals(t, dpapi, dealsToPublish, []int{2})
|
checkPublishedDeals(t, dpapi, dealsToPublish, []int{2})
|
||||||
}
|
}
|
||||||
|
|
||||||
func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired bool) market.ClientDealProposal {
|
func publishDeal(t *testing.T, dp *DealPublisher, invalid int, ctxCancelled bool, expired bool) market.ClientDealProposal {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
@ -238,6 +252,7 @@ func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired boo
|
|||||||
Provider: getProviderActor(t),
|
Provider: getProviderActor(t),
|
||||||
StartEpoch: startEpoch,
|
StartEpoch: startEpoch,
|
||||||
EndEpoch: abi.ChainEpoch(120),
|
EndEpoch: abi.ChainEpoch(120),
|
||||||
|
PieceSize: abi.PaddedPieceSize(invalid), // pass invalid into StateCall below
|
||||||
},
|
},
|
||||||
ClientSignature: crypto.Signature{
|
ClientSignature: crypto.Signature{
|
||||||
Type: crypto.SigTypeSecp256k1,
|
Type: crypto.SigTypeSecp256k1,
|
||||||
@ -253,7 +268,7 @@ func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired boo
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if ctxCancelled || expired {
|
if ctxCancelled || expired || invalid == 1 {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
} else {
|
} else {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -381,6 +396,19 @@ func (d *dpAPI) StateLookupID(ctx context.Context, a address.Address, key types.
|
|||||||
panic("don't call me")
|
panic("don't call me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *dpAPI) StateCall(ctx context.Context, message *types.Message, key types.TipSetKey) (*api.InvocResult, error) {
|
||||||
|
var p market2.PublishStorageDealsParams
|
||||||
|
if err := p.UnmarshalCBOR(bytes.NewReader(message.Params)); err != nil {
|
||||||
|
return nil, xerrors.Errorf("unmarshal market params: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
exit := exitcode.Ok
|
||||||
|
if p.Deals[0].Proposal.PieceSize == 1 {
|
||||||
|
exit = exitcode.ErrIllegalState
|
||||||
|
}
|
||||||
|
return &api.InvocResult{MsgRct: &types.MessageReceipt{ExitCode: exit}}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func getClientActor(t *testing.T) address.Address {
|
func getClientActor(t *testing.T) address.Address {
|
||||||
return tutils.NewActorAddr(t, "client")
|
return tutils.NewActorAddr(t, "client")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user