Merge pull request #5538 from filecoin-project/feat/deal-publisher-force

add commands to list pending deals and force publish
This commit is contained in:
Łukasz Magiera 2021-02-05 23:00:52 +01:00 committed by GitHub
commit 6de62411c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 291 additions and 88 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -105,10 +106,12 @@ type StorageMiner interface {
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer // MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer // MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
MarketPendingDeals(ctx context.Context) (PendingDealInfo, error)
MarketPublishPendingDeals(ctx context.Context) error
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]MarketDeal, error) DealsList(ctx context.Context) ([]MarketDeal, error)
@ -236,3 +239,11 @@ type AddressConfig struct {
CommitControl []address.Address CommitControl []address.Address
TerminateControl []address.Address TerminateControl []address.Address
} }
// PendingDealInfo has info about pending deals and when they are due to be
// published
type PendingDealInfo struct {
Deals []market.ClientDealProposal
PublishPeriodStart time.Time
PublishPeriod time.Duration
}

View File

@ -299,8 +299,10 @@ type StorageMinerStruct struct {
MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"` MarketGetRetrievalAsk func(ctx context.Context) (*retrievalmarket.Ask, error) `perm:"read"`
MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"` MarketListDataTransfers func(ctx context.Context) ([]api.DataTransferChannel, error) `perm:"write"`
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"` MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"` MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"write"`
MarketPendingDeals func(ctx context.Context) (api.PendingDealInfo, error) `perm:"write"`
MarketPublishPendingDeals func(ctx context.Context) error `perm:"admin"`
PledgeSector func(context.Context) error `perm:"write"` PledgeSector func(context.Context) error `perm:"write"`
@ -1506,6 +1508,14 @@ func (c *StorageMinerStruct) MarketCancelDataTransfer(ctx context.Context, trans
return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator) return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
} }
func (c *StorageMinerStruct) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
return c.Internal.MarketPendingDeals(ctx)
}
func (c *StorageMinerStruct) MarketPublishPendingDeals(ctx context.Context) error {
return c.Internal.MarketPublishPendingDeals(ctx)
}
func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error { func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file) return c.Internal.DealsImportData(ctx, dealPropCid, file)
} }

View File

@ -15,7 +15,6 @@ import (
tm "github.com/buger/goterm" tm "github.com/buger/goterm"
"github.com/docker/go-units" "github.com/docker/go-units"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil/cidenc" "github.com/ipfs/go-cidutil/cidenc"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
@ -23,6 +22,8 @@ import (
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
cborutil "github.com/filecoin-project/go-cbor-util"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
@ -341,6 +342,7 @@ var storageDealsCmd = &cli.Command{
getBlocklistCmd, getBlocklistCmd,
resetBlocklistCmd, resetBlocklistCmd,
setSealDurationCmd, setSealDurationCmd,
dealsPendingPublish,
}, },
} }
@ -825,3 +827,49 @@ var transfersListCmd = &cli.Command{
return nil return nil
}, },
} }
var dealsPendingPublish = &cli.Command{
Name: "pending-publish",
Usage: "list deals waiting in publish queue",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "publish-now",
Usage: "send a publish message now",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.Bool("publish-now") {
if err := api.MarketPublishPendingDeals(ctx); err != nil {
return xerrors.Errorf("publishing deals: %w", err)
}
fmt.Println("triggered deal publishing")
return nil
}
pending, err := api.MarketPendingDeals(ctx)
if err != nil {
return xerrors.Errorf("getting pending deals: %w", err)
}
w := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintf(w, "ProposalCID\tClient\tSize\n")
for _, deal := range pending.Deals {
proposalNd, err := cborutil.AsIpld(&deal) // nolint
if err != nil {
return err
}
_, _ = fmt.Fprintf(w, "%s\t%s\t%s\n", proposalNd.Cid(), deal.Proposal.Client, units.BytesSize(float64(deal.Proposal.PieceSize)))
}
return w.Flush()
},
}

View File

@ -48,6 +48,8 @@
* [MarketListDeals](#MarketListDeals) * [MarketListDeals](#MarketListDeals)
* [MarketListIncompleteDeals](#MarketListIncompleteDeals) * [MarketListIncompleteDeals](#MarketListIncompleteDeals)
* [MarketListRetrievalDeals](#MarketListRetrievalDeals) * [MarketListRetrievalDeals](#MarketListRetrievalDeals)
* [MarketPendingDeals](#MarketPendingDeals)
* [MarketPublishPendingDeals](#MarketPublishPendingDeals)
* [MarketRestartDataTransfer](#MarketRestartDataTransfer) * [MarketRestartDataTransfer](#MarketRestartDataTransfer)
* [MarketSetAsk](#MarketSetAsk) * [MarketSetAsk](#MarketSetAsk)
* [MarketSetRetrievalAsk](#MarketSetRetrievalAsk) * [MarketSetRetrievalAsk](#MarketSetRetrievalAsk)
@ -524,10 +526,10 @@ Response: `{}`
### MarketCancelDataTransfer ### MarketCancelDataTransfer
ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
Perms: read Perms: write
Inputs: Inputs:
```json ```json
@ -725,11 +727,36 @@ Inputs: `null`
Response: `null` Response: `null`
### MarketPendingDeals
There are not yet any comments for this method.
Perms: write
Inputs: `null`
Response:
```json
{
"Deals": null,
"PublishPeriodStart": "0001-01-01T00:00:00Z",
"PublishPeriod": 60000000000
}
```
### MarketPublishPendingDeals
There are not yet any comments for this method.
Perms: admin
Inputs: `null`
Response: `{}`
### MarketRestartDataTransfer ### MarketRestartDataTransfer
MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
Perms: read Perms: write
Inputs: Inputs:
```json ```json

View File

@ -121,6 +121,41 @@ func newDealPublisher(
} }
} }
// PendingDeals returns the list of deals that are queued up to be published
func (p *DealPublisher) PendingDeals() api.PendingDealInfo {
p.lk.Lock()
defer p.lk.Unlock()
// Filter out deals whose context has been cancelled
deals := make([]*pendingDeal, 0, len(p.pending))
for _, dl := range p.pending {
if dl.ctx.Err() == nil {
deals = append(deals, dl)
}
}
pending := make([]market2.ClientDealProposal, len(deals))
for i, deal := range deals {
pending[i] = deal.deal
}
return api.PendingDealInfo{
Deals: pending,
PublishPeriodStart: p.publishPeriodStart,
PublishPeriod: p.publishPeriod,
}
}
// ForcePublishPendingDeals publishes all pending deals without waiting for
// the publish period to elapse
func (p *DealPublisher) ForcePublishPendingDeals() {
p.lk.Lock()
defer p.lk.Unlock()
log.Infof("force publishing deals")
p.publishAllDeals()
}
func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) { func (p *DealPublisher) Publish(ctx context.Context, deal market2.ClientDealProposal) (cid.Cid, error) {
pdeal := newPendingDeal(ctx, deal) pdeal := newPendingDeal(ctx, deal)

View File

@ -91,11 +91,7 @@ func TestDealPublisher(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
tc := tc tc := tc
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
ctx := context.Background() dpapi := newDPAPI(t)
client := tutils.NewActorAddr(t, "client")
provider := tutils.NewActorAddr(t, "provider")
worker := tutils.NewActorAddr(t, "worker")
dpapi := newDPAPI(t, worker)
// Create a deal publisher // Create a deal publisher
dp := newDealPublisher(dpapi, PublishMsgConfig{ dp := newDealPublisher(dpapi, PublishMsgConfig{
@ -105,53 +101,17 @@ func TestDealPublisher(t *testing.T) {
// Keep a record of the deals that were submitted to be published // Keep a record of the deals that were submitted to be published
var dealsToPublish []market.ClientDealProposal var dealsToPublish []market.ClientDealProposal
publishDeal := func(ctxCancelled bool, expired bool) {
pctx := ctx
var cancel context.CancelFunc
if ctxCancelled {
pctx, cancel = context.WithCancel(ctx)
cancel()
}
startEpoch := abi.ChainEpoch(20)
if expired {
startEpoch = abi.ChainEpoch(5)
}
deal := market.ClientDealProposal{
Proposal: market0.DealProposal{
PieceCID: generateCids(1)[0],
Client: client,
Provider: provider,
StartEpoch: startEpoch,
EndEpoch: abi.ChainEpoch(120),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("signature data"),
},
}
if !ctxCancelled && !expired {
dealsToPublish = append(dealsToPublish, deal)
}
go func() {
_, err := dp.Publish(pctx, deal)
if ctxCancelled || expired {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}()
}
// Publish deals within publish period // Publish deals within publish period
for i := 0; i < tc.dealCountWithinPublishPeriod; i++ { for i := 0; i < tc.dealCountWithinPublishPeriod; i++ {
publishDeal(false, false) deal := publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
} }
for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ { for i := 0; i < tc.ctxCancelledWithinPublishPeriod; i++ {
publishDeal(true, false) publishDeal(t, dp, true, false)
} }
for i := 0; i < tc.expiredDeals; i++ { for i := 0; i < tc.expiredDeals; i++ {
publishDeal(false, true) publishDeal(t, dp, false, true)
} }
// Wait until publish period has elapsed // Wait until publish period has elapsed
@ -159,41 +119,130 @@ func TestDealPublisher(t *testing.T) {
// Publish deals after publish period // Publish deals after publish period
for i := 0; i < tc.dealCountAfterPublishPeriod; i++ { for i := 0; i < tc.dealCountAfterPublishPeriod; i++ {
publishDeal(false, false) deal := publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
} }
// For each message that was expected to be sent checkPublishedDeals(t, dpapi, dealsToPublish, tc.expectedDealsPerMsg)
var publishedDeals []market.ClientDealProposal
for _, expectedDealsInMsg := range tc.expectedDealsPerMsg {
// Should have called StateMinerInfo with the provider address
stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls
require.Equal(t, provider, stateMinerInfoAddr)
// Check the fields of the message that was sent
msg := <-dpapi.pushedMsgs
require.Equal(t, worker, msg.From)
require.Equal(t, market.Address, msg.To)
require.Equal(t, market.Methods.PublishStorageDeals, msg.Method)
// Check that the expected number of deals was included in the message
var params market2.PublishStorageDealsParams
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, params.Deals, expectedDealsInMsg)
// Keep track of the deals that were sent
for _, d := range params.Deals {
publishedDeals = append(publishedDeals, d)
}
}
// Verify that all deals that were submitted to be published were
// sent out (we do this by ensuring all the piece CIDs are present)
require.True(t, matchPieceCids(publishedDeals, dealsToPublish))
}) })
} }
} }
func TestForcePublish(t *testing.T) {
dpapi := newDPAPI(t)
// Create a deal publisher
start := time.Now()
publishPeriod := time.Hour
dp := newDealPublisher(dpapi, PublishMsgConfig{
Period: publishPeriod,
MaxDealsPerMsg: 10,
}, &api.MessageSendSpec{MaxFee: abi.NewTokenAmount(1)})
// Queue three deals for publishing, one with a cancelled context
var dealsToPublish []market.ClientDealProposal
// 1. Regular deal
deal := publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
// 2. Deal with cancelled context
publishDeal(t, dp, true, false)
// 3. Regular deal
deal = publishDeal(t, dp, false, false)
dealsToPublish = append(dealsToPublish, deal)
// Allow a moment for them to be queued
time.Sleep(10 * time.Millisecond)
// Should be two deals in the pending deals list
// (deal with cancelled context is ignored)
pendingInfo := dp.PendingDeals()
require.Len(t, pendingInfo.Deals, 2)
require.Equal(t, publishPeriod, pendingInfo.PublishPeriod)
require.True(t, pendingInfo.PublishPeriodStart.After(start))
require.True(t, pendingInfo.PublishPeriodStart.Before(time.Now()))
// Force publish all pending deals
dp.ForcePublishPendingDeals()
// Should be no pending deals
pendingInfo = dp.PendingDeals()
require.Len(t, pendingInfo.Deals, 0)
// Make sure the expected deals were published
checkPublishedDeals(t, dpapi, dealsToPublish, []int{2})
}
func publishDeal(t *testing.T, dp *DealPublisher, ctxCancelled bool, expired bool) market.ClientDealProposal {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
pctx := ctx
if ctxCancelled {
pctx, cancel = context.WithCancel(ctx)
cancel()
}
startEpoch := abi.ChainEpoch(20)
if expired {
startEpoch = abi.ChainEpoch(5)
}
deal := market.ClientDealProposal{
Proposal: market0.DealProposal{
PieceCID: generateCids(1)[0],
Client: getClientActor(t),
Provider: getProviderActor(t),
StartEpoch: startEpoch,
EndEpoch: abi.ChainEpoch(120),
},
ClientSignature: crypto.Signature{
Type: crypto.SigTypeSecp256k1,
Data: []byte("signature data"),
},
}
go func() {
_, err := dp.Publish(pctx, deal)
if ctxCancelled || expired {
require.Error(t, err)
} else {
require.NoError(t, err)
}
}()
return deal
}
func checkPublishedDeals(t *testing.T, dpapi *dpAPI, dealsToPublish []market.ClientDealProposal, expectedDealsPerMsg []int) {
// For each message that was expected to be sent
var publishedDeals []market.ClientDealProposal
for _, expectedDealsInMsg := range expectedDealsPerMsg {
// Should have called StateMinerInfo with the provider address
stateMinerInfoAddr := <-dpapi.stateMinerInfoCalls
require.Equal(t, getProviderActor(t), stateMinerInfoAddr)
// Check the fields of the message that was sent
msg := <-dpapi.pushedMsgs
require.Equal(t, getWorkerActor(t), msg.From)
require.Equal(t, market.Address, msg.To)
require.Equal(t, market.Methods.PublishStorageDeals, msg.Method)
// Check that the expected number of deals was included in the message
var params market2.PublishStorageDealsParams
err := params.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, params.Deals, expectedDealsInMsg)
// Keep track of the deals that were sent
for _, d := range params.Deals {
publishedDeals = append(publishedDeals, d)
}
}
// Verify that all deals that were submitted to be published were
// sent out (we do this by ensuring all the piece CIDs are present)
require.True(t, matchPieceCids(publishedDeals, dealsToPublish))
}
func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool { func matchPieceCids(sent []market.ClientDealProposal, exp []market.ClientDealProposal) bool {
cidsA := dealPieceCids(sent) cidsA := dealPieceCids(sent)
cidsB := dealPieceCids(exp) cidsB := dealPieceCids(exp)
@ -232,10 +281,10 @@ type dpAPI struct {
pushedMsgs chan *types.Message pushedMsgs chan *types.Message
} }
func newDPAPI(t *testing.T, worker address.Address) *dpAPI { func newDPAPI(t *testing.T) *dpAPI {
return &dpAPI{ return &dpAPI{
t: t, t: t,
worker: worker, worker: getWorkerActor(t),
stateMinerInfoCalls: make(chan address.Address, 128), stateMinerInfoCalls: make(chan address.Address, 128),
pushedMsgs: make(chan *types.Message, 128), pushedMsgs: make(chan *types.Message, 128),
} }
@ -264,3 +313,15 @@ func (d *dpAPI) MpoolPushMessage(ctx context.Context, msg *types.Message, spec *
d.pushedMsgs <- msg d.pushedMsgs <- msg
return &types.SignedMessage{Message: *msg}, nil return &types.SignedMessage{Message: *msg}, nil
} }
func getClientActor(t *testing.T) address.Address {
return tutils.NewActorAddr(t, "client")
}
func getWorkerActor(t *testing.T) address.Address {
return tutils.NewActorAddr(t, "worker")
}
func getProviderActor(t *testing.T) address.Address {
return tutils.NewActorAddr(t, "provider")
}

View File

@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/impl/common" "github.com/filecoin-project/lotus/node/impl/common"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
@ -55,9 +56,10 @@ type StorageMinerAPI struct {
IStorageMgr sectorstorage.SectorManager IStorageMgr sectorstorage.SectorManager
*stores.Index *stores.Index
storiface.WorkerReturn storiface.WorkerReturn
DataTransfer dtypes.ProviderDataTransfer DataTransfer dtypes.ProviderDataTransfer
Host host.Host Host host.Host
AddrSel *storage.AddressSelector AddrSel *storage.AddressSelector
DealPublisher *storageadapter.DealPublisher
DS dtypes.MetadataDS DS dtypes.MetadataDS
@ -501,6 +503,15 @@ func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-cha
return channels, nil return channels, nil
} }
func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
return sm.DealPublisher.PendingDeals(), nil
}
func (sm *StorageMinerAPI) MarketPublishPendingDeals(ctx context.Context) error {
sm.DealPublisher.ForcePublishPendingDeals()
return nil
}
func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) { func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) {
return sm.listDeals(ctx) return sm.listDeals(ctx)
} }