market: APIs to manage pending deals

This commit is contained in:
Łukasz Magiera 2021-02-05 18:58:55 +01:00
parent 01e30e0665
commit b3f4e50c58
5 changed files with 52 additions and 23 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
@ -105,10 +106,12 @@ type StorageMiner interface {
MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error) MarketGetRetrievalAsk(ctx context.Context) (*retrievalmarket.Ask, error)
MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error) MarketListDataTransfers(ctx context.Context) ([]DataTransferChannel, error)
MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error) MarketDataTransferUpdates(ctx context.Context) (<-chan DataTransferChannel, error)
// MinerRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer // MarketRestartDataTransfer attempts to restart a data transfer with the given transfer ID and other peer
MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error MarketRestartDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
// ClientCancelDataTransfer cancels a data transfer with the given transfer ID and other peer // MarketCancelDataTransfer cancels a data transfer with the given transfer ID and other peer
MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error MarketCancelDataTransfer(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error
MarketPendingDeals(ctx context.Context) (PendingDealInfo, error)
MarketPublishPendingDeals(ctx context.Context) error
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
DealsList(ctx context.Context) ([]MarketDeal, error) DealsList(ctx context.Context) ([]MarketDeal, error)
@ -236,3 +239,11 @@ type AddressConfig struct {
CommitControl []address.Address CommitControl []address.Address
TerminateControl []address.Address TerminateControl []address.Address
} }
// PendingDealInfo has info about pending deals and when they are due to be
// published
type PendingDealInfo struct {
Deals []market.ClientDealProposal
PublishPeriodStart time.Time
PublishPeriod time.Duration
}

View File

@ -301,6 +301,8 @@ type StorageMinerStruct struct {
MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"` MarketDataTransferUpdates func(ctx context.Context) (<-chan api.DataTransferChannel, error) `perm:"write"`
MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"` MarketRestartDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`
MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"` MarketCancelDataTransfer func(ctx context.Context, transferID datatransfer.TransferID, otherPeer peer.ID, isInitiator bool) error `perm:"read"`
MarketPendingDeals func(ctx context.Context) (api.PendingDealInfo, error) `perm:"admin"`
MarketPublishPendingDeals func(ctx context.Context) error `perm:"admin"`
PledgeSector func(context.Context) error `perm:"write"` PledgeSector func(context.Context) error `perm:"write"`
@ -1506,6 +1508,14 @@ func (c *StorageMinerStruct) MarketCancelDataTransfer(ctx context.Context, trans
return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator) return c.Internal.MarketCancelDataTransfer(ctx, transferID, otherPeer, isInitiator)
} }
func (c *StorageMinerStruct) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
return c.Internal.MarketPendingDeals(ctx)
}
func (c *StorageMinerStruct) MarketPublishPendingDeals(ctx context.Context) error {
return c.Internal.MarketPublishPendingDeals(ctx)
}
func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error { func (c *StorageMinerStruct) DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error {
return c.Internal.DealsImportData(ctx, dealPropCid, file) return c.Internal.DealsImportData(ctx, dealPropCid, file)
} }

View File

@ -121,16 +121,8 @@ func newDealPublisher(
} }
} }
// PendingInfo has info about pending deals and when they are due to be
// published
type PendingInfo struct {
deals []*pendingDeal
publishPeriodStart time.Time
publishPeriod time.Duration
}
// PendingDeals returns the list of deals that are queued up to be published // PendingDeals returns the list of deals that are queued up to be published
func (p *DealPublisher) PendingDeals() *PendingInfo { func (p *DealPublisher) PendingDeals() api.PendingDealInfo {
p.lk.Lock() p.lk.Lock()
defer p.lk.Unlock() defer p.lk.Unlock()
@ -142,10 +134,15 @@ func (p *DealPublisher) PendingDeals() *PendingInfo {
} }
} }
return &PendingInfo{ pending := make([]market2.ClientDealProposal, len(deals))
deals: deals, for i, deal := range deals {
publishPeriodStart: p.publishPeriodStart, pending[i] = deal.deal
publishPeriod: p.publishPeriod, }
return api.PendingDealInfo{
Deals: pending,
PublishPeriodStart: p.publishPeriodStart,
PublishPeriod: p.publishPeriod,
} }
} }

View File

@ -156,17 +156,17 @@ func TestForcePublish(t *testing.T) {
// Should be two deals in the pending deals list // Should be two deals in the pending deals list
// (deal with cancelled context is ignored) // (deal with cancelled context is ignored)
pendingInfo := dp.PendingDeals() pendingInfo := dp.PendingDeals()
require.Len(t, pendingInfo.deals, 2) require.Len(t, pendingInfo.Deals, 2)
require.Equal(t, publishPeriod, pendingInfo.publishPeriod) require.Equal(t, publishPeriod, pendingInfo.PublishPeriod)
require.True(t, pendingInfo.publishPeriodStart.After(start)) require.True(t, pendingInfo.PublishPeriodStart.After(start))
require.True(t, pendingInfo.publishPeriodStart.Before(time.Now())) require.True(t, pendingInfo.PublishPeriodStart.Before(time.Now()))
// Force publish all pending deals // Force publish all pending deals
dp.ForcePublishPendingDeals() dp.ForcePublishPendingDeals()
// Should be no pending deals // Should be no pending deals
pendingInfo = dp.PendingDeals() pendingInfo = dp.PendingDeals()
require.Len(t, pendingInfo.deals, 0) require.Len(t, pendingInfo.Deals, 0)
// Make sure the expected deals were published // Make sure the expected deals were published
checkPublishedDeals(t, dpapi, dealsToPublish, []int{2}) checkPublishedDeals(t, dpapi, dealsToPublish, []int{2})

View File

@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/impl/common" "github.com/filecoin-project/lotus/node/impl/common"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
@ -55,9 +56,10 @@ type StorageMinerAPI struct {
IStorageMgr sectorstorage.SectorManager IStorageMgr sectorstorage.SectorManager
*stores.Index *stores.Index
storiface.WorkerReturn storiface.WorkerReturn
DataTransfer dtypes.ProviderDataTransfer DataTransfer dtypes.ProviderDataTransfer
Host host.Host Host host.Host
AddrSel *storage.AddressSelector AddrSel *storage.AddressSelector
DealPublisher *storageadapter.DealPublisher
DS dtypes.MetadataDS DS dtypes.MetadataDS
@ -501,6 +503,15 @@ func (sm *StorageMinerAPI) MarketDataTransferUpdates(ctx context.Context) (<-cha
return channels, nil return channels, nil
} }
func (sm *StorageMinerAPI) MarketPendingDeals(ctx context.Context) (api.PendingDealInfo, error) {
return sm.DealPublisher.PendingDeals(), nil
}
func (sm *StorageMinerAPI) MarketPublishPendingDeals(ctx context.Context) error {
sm.DealPublisher.ForcePublishPendingDeals()
return nil
}
func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) { func (sm *StorageMinerAPI) DealsList(ctx context.Context) ([]api.MarketDeal, error) {
return sm.listDeals(ctx) return sm.listDeals(ctx)
} }