feat: batch publish deal messages

This commit is contained in:
Dirk McCormick 2021-01-08 16:28:38 +01:00
parent 332ea8a126
commit 05bf177686
4 changed files with 81 additions and 150 deletions

View File

@ -2,7 +2,6 @@ package storageadapter
import (
"context"
"fmt"
"strings"
"sync"
"time"
@ -25,7 +24,6 @@ import (
)
type dealPublisherAPI interface {
ChainHead(context.Context) (*types.TipSet, error)
MpoolPushMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec) (*types.SignedMessage, error)
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (miner.MinerInfo, error)
}
@ -54,27 +52,6 @@ type DealPublisher struct {
publishPeriodStart time.Time
}
// A deal that is queued to be published
type pendingDeal struct {
ctx context.Context
deal market2.ClientDealProposal
Result chan publishResult
}
// The result of publishing a deal
type publishResult struct {
msgCid cid.Cid
err error
}
func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal {
return &pendingDeal{
ctx: ctx,
deal: deal,
Result: make(chan publishResult),
}
}
type PublishMsgConfig struct {
// The amount of time to wait for more deals to arrive before
// publishing
@ -86,7 +63,7 @@ type PublishMsgConfig struct {
func NewDealPublisher(
feeConfig *config.MinerFeeConfig,
publishMsgCfg PublishMsgConfig,
publishMsgCfg *config.PublishMsgConfig,
) func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
return func(lc fx.Lifecycle, full api.FullNode) *DealPublisher {
maxFee := abi.NewTokenAmount(0)
@ -107,16 +84,22 @@ func NewDealPublisher(
func newDealPublisher(
dpapi dealPublisherAPI,
publishMsgCfg PublishMsgConfig,
publishMsgCfg *config.PublishMsgConfig,
publishSpec *api.MessageSendSpec,
) *DealPublisher {
publishPeriod := time.Duration(0)
maxDealsPerMsg := uint64(1)
if publishMsgCfg != nil {
publishPeriod = time.Duration(publishMsgCfg.PublishPeriod)
maxDealsPerMsg = publishMsgCfg.MaxDealsPerMsg
}
ctx, cancel := context.WithCancel(context.Background())
return &DealPublisher{
api: dpapi,
ctx: ctx,
Shutdown: cancel,
maxDealsPerPublishMsg: publishMsgCfg.MaxDealsPerMsg,
publishPeriod: publishMsgCfg.Period,
maxDealsPerPublishMsg: maxDealsPerMsg,
publishPeriod: publishPeriod,
publishSpec: publishSpec,
}
}
@ -224,33 +207,8 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) {
return
}
// onComplete is called when the publish message has been sent or there
// was an error
onComplete := func(pd *pendingDeal, msgCid cid.Cid, err error) {
// Send the publish result on the pending deal's Result channel
res := publishResult{
msgCid: msgCid,
err: err,
}
select {
case <-p.ctx.Done():
case <-pd.ctx.Done():
case pd.Result <- res:
}
}
// Validate each deal to make sure it can be published
validated := make([]*pendingDeal, 0, len(ready))
deals := make([]market2.ClientDealProposal, 0, len(ready))
for _, pd := range ready {
// Validate the deal
if err := p.validateDeal(pd.deal); err != nil {
// Validation failed, complete immediately with an error
go onComplete(pd, cid.Undef, err)
continue
}
validated = append(validated, pd)
deals = append(deals, pd.deal)
}
@ -258,45 +216,26 @@ func (p *DealPublisher) publishReady(ready []*pendingDeal) {
msgCid, err := p.publishDealProposals(deals)
// Signal that each deal has been published
for _, pd := range validated {
go onComplete(pd, msgCid, err)
for _, pd := range ready {
pd := pd
go func() {
res := publishResult{
msgCid: msgCid,
err: err,
}
}
// validateDeal checks that the deal proposal start epoch hasn't already
// elapsed
func (p *DealPublisher) validateDeal(deal market2.ClientDealProposal) error {
head, err := p.api.ChainHead(p.ctx)
if err != nil {
return err
select {
case <-p.ctx.Done():
case pd.Result <- res:
}
if head.Height() > deal.Proposal.StartEpoch {
return xerrors.Errorf(
"cannot publish deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
deal.Proposal.PieceCID, head.Height(), deal.Proposal.StartEpoch)
}()
}
return nil
}
// Sends the publish message
func (p *DealPublisher) publishDealProposals(deals []market2.ClientDealProposal) (cid.Cid, error) {
if len(deals) == 0 {
return cid.Undef, nil
}
log.Infof("publishing %d deals in publish deals queue with piece CIDs: %s", len(deals), pieceCids(deals))
provider := deals[0].Proposal.Provider
for _, dl := range deals {
if dl.Proposal.Provider != provider {
msg := fmt.Sprintf("publishing %d deals failed: ", len(deals)) +
"not all deals are for same provider: " +
fmt.Sprintf("deal with piece CID %s is for provider %s ", deals[0].Proposal.PieceCID, deals[0].Proposal.Provider) +
fmt.Sprintf("but deal with piece CID %s is for provider %s", dl.Proposal.PieceCID, dl.Proposal.Provider)
return cid.Undef, xerrors.Errorf(msg)
}
}
mi, err := p.api.StateMinerInfo(p.ctx, provider, types.EmptyTSK)
if err != nil {
return cid.Undef, err
@ -343,3 +282,22 @@ func (p *DealPublisher) filterCancelledDeals() {
}
p.pending = p.pending[:i]
}
type publishResult struct {
msgCid cid.Cid
err error
}
type pendingDeal struct {
ctx context.Context
deal market2.ClientDealProposal
Result chan publishResult
}
func newPendingDeal(ctx context.Context, deal market2.ClientDealProposal) *pendingDeal {
return &pendingDeal{
ctx: ctx,
deal: deal,
Result: make(chan publishResult),
}
}

View File

@ -22,6 +22,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/node/config"
)
func TestDealPublisher(t *testing.T) {
@ -30,8 +31,7 @@ func TestDealPublisher(t *testing.T) {
publishPeriod time.Duration
maxDealsPerMsg uint64
dealCountWithinPublishPeriod int
ctxCancelledWithinPublishPeriod int
expiredDeals int
expiredWithinPublishPeriod int
dealCountAfterPublishPeriod int
expectedDealsPerMsg []int
}{{
@ -62,30 +62,14 @@ func TestDealPublisher(t *testing.T) {
dealCountWithinPublishPeriod: 3,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1, 1},
}, {
name: "ignore deals with cancelled context",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2,
ctxCancelledWithinPublishPeriod: 2,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1},
}, {
name: "ignore expired deals",
publishPeriod: 10 * time.Millisecond,
maxDealsPerMsg: 5,
dealCountWithinPublishPeriod: 2,
expiredDeals: 2,
expiredWithinPublishPeriod: 2,
dealCountAfterPublishPeriod: 1,
expectedDealsPerMsg: []int{2, 1},
}, {
name: "zero config",
publishPeriod: 0,
maxDealsPerMsg: 0,
dealCountWithinPublishPeriod: 2,
ctxCancelledWithinPublishPeriod: 0,
dealCountAfterPublishPeriod: 2,
expectedDealsPerMsg: []int{1, 1, 1, 1},
}}
for _, tc := range testCases {
@ -98,44 +82,38 @@ func TestDealPublisher(t *testing.T) {
dpapi := newDPAPI(t, worker)
// Create a deal publisher
dp := newDealPublisher(dpapi, PublishMsgConfig{
Period: tc.publishPeriod,
dp := newDealPublisher(dpapi, &config.PublishMsgConfig{
PublishPeriod: config.Duration(tc.publishPeriod),
MaxDealsPerMsg: tc.maxDealsPerMsg,
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})
// Keep a record of the deals that were submitted to be published
var dealsToPublish []market.ClientDealProposal
publishDeal := func(ctxCancelled bool, expired bool) {
publishDeal := func(expired bool) {
pctx := ctx
var cancel context.CancelFunc
if ctxCancelled {
if expired {
pctx, cancel = context.WithCancel(ctx)
cancel()
}
startEpoch := abi.ChainEpoch(20)
if expired {
startEpoch = abi.ChainEpoch(5)
}
deal := market.ClientDealProposal{
Proposal: market0.DealProposal{
PieceCID: generateCids(1)[0],
Client: client,
Provider: provider,
StartEpoch: startEpoch,
EndEpoch: abi.ChainEpoch(120),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("signature data"),
},
}
if !ctxCancelled && !expired {
if !expired {
dealsToPublish = append(dealsToPublish, deal)
}
go func() {
_, err := dp.Publish(pctx, deal)
if ctxCancelled || expired {
if expired {
require.Error(t, err)
} else {
require.NoError(t, err)
@ -145,13 +123,10 @@ func TestDealPublisher(t *testing.T) {
// Publish deals within publish period
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
publishDeal(false, false)
publishDeal(false)
}
for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
publishDeal(true, false)
}
for i := 0; i < tc.expiredDeals; i++ {
publishDeal(false, true)
for i := 0; i < tc.expiredWithinPublishPeriod; i++ {
publishDeal(true)
}
// Wait until publish period has elapsed
@ -159,7 +134,7 @@ func TestDealPublisher(t *testing.T) {
// Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
publishDeal(false, false)
publishDeal(false)
}
// For each message that was expected to be sent
@ -241,20 +216,6 @@ func newDPAPI(t *testing.T, worker address.Address) *dpAPI {
}
}
func (d *dpAPI) ChainHead(ctx context.Context) (*types.TipSet, error) {
dummyCid, err := cid.Parse("bafkqaaa")
require.NoError(d.t, err)
return types.NewTipSet([]*types.BlockHeader{{
Miner: tutils.NewActorAddr(d.t, "miner"),
Height: abi.ChainEpoch(10),
ParentStateRoot: dummyCid,
Messages: dummyCid,
ParentMessageReceipts: dummyCid,
BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS},
BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS},
}})
}
func (d *dpAPI) StateMinerInfo(ctx context.Context, address address.Address, key types.TipSetKey) (miner.MinerInfo, error) {
d.stateMinerInfoCalls <- address
return miner.MinerInfo{Worker: d.worker}, nil

View File

@ -377,7 +377,7 @@ func Online() Option {
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(nil)),
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(nil)),
Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds),
Override(HandleRetrievalKey, modules.HandleRetrieval),
@ -520,10 +520,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalDealFilter), modules.RetrievalDealFilter(dealfilter.CliRetrievalDealFilter(cfg.Dealmaking.RetrievalFilter))),
),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, storageadapter.PublishMsgConfig{
Period: time.Duration(cfg.Dealmaking.PublishMsgPeriod),
MaxDealsPerMsg: cfg.Dealmaking.MaxDealsPerPublishMsg,
})),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(&cfg.Fees, &cfg.PublishMsg)),
Override(new(storagemarket.StorageProviderNode), storageadapter.NewProviderNodeAdapter(&cfg.Fees)),
Override(new(sectorstorage.SealerConfig), cfg.Storage),
@ -651,6 +648,6 @@ func Test() Option {
Unset(RunPeerMgrKey),
Unset(new(*peermgr.PeerMgr)),
Override(new(beacon.Schedule), testing.RandomBeacon),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, nil)),
)
}

View File

@ -33,6 +33,7 @@ type StorageMiner struct {
Common
Dealmaking DealmakingConfig
PublishMsg PublishMsgConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
@ -59,6 +60,15 @@ type DealmakingConfig struct {
RetrievalFilter string
}
type PublishMsgConfig struct {
// The amount of time to wait for more deals to arrive before
// publishing
PublishPeriod Duration
// The maximum number of deals to include in a single PublishStorageDeals
// message
MaxDealsPerMsg uint64
}
type SealingConfig struct {
// 0 = no limit
MaxWaitDealsSectors uint64
@ -217,6 +227,11 @@ func DefaultStorageMiner() *StorageMiner {
MaxDealsPerPublishMsg: 8,
},
PublishMsg: PublishMsgConfig{
PublishPeriod: Duration(time.Hour),
MaxDealsPerMsg: 8,
},
Fees: MinerFeeConfig{
MaxPreCommitGasFee: types.MustParseFIL("0.025"),
MaxCommitGasFee: types.MustParseFIL("0.05"),