Merge pull request #7276 from filecoin-project/nonsense/reject-deal-too-many-staging-deals
introduce MaxStagingDealsBytes - reject new deals if our staging deals area is full
This commit is contained in:
commit
3a3a4fc413
@ -820,6 +820,11 @@ workflows:
|
|||||||
suite: itest-deals_concurrent
|
suite: itest-deals_concurrent
|
||||||
target: "./itests/deals_concurrent_test.go"
|
target: "./itests/deals_concurrent_test.go"
|
||||||
|
|
||||||
|
- test:
|
||||||
|
name: test-itest-deals_max_staging_deals
|
||||||
|
suite: itest-deals_max_staging_deals
|
||||||
|
target: "./itests/deals_max_staging_deals_test.go"
|
||||||
|
|
||||||
- test:
|
- test:
|
||||||
name: test-itest-deals_offline
|
name: test-itest-deals_offline
|
||||||
suite: itest-deals_offline
|
suite: itest-deals_offline
|
||||||
|
65
itests/deals_max_staging_deals_test.go
Normal file
65
itests/deals_max_staging_deals_test.go
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
package itests
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/itests/kit"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMaxStagingDeals(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// enable 512MiB proofs so we can conduct larger transfers.
|
||||||
|
kit.EnableLargeSectors(t)
|
||||||
|
kit.QuietMiningLogs()
|
||||||
|
|
||||||
|
client, miner, ens := kit.EnsembleMinimal(t,
|
||||||
|
kit.MockProofs(),
|
||||||
|
kit.WithMaxStagingDealsBytes(8192), // max 8KB staging deals
|
||||||
|
kit.SectorSize(512<<20), // 512MiB sectors.
|
||||||
|
)
|
||||||
|
ens.InterconnectAll().BeginMining(200 * time.Millisecond)
|
||||||
|
|
||||||
|
dh := kit.NewDealHarness(t, client, miner, miner)
|
||||||
|
|
||||||
|
client.WaitTillChain(ctx, kit.HeightAtLeast(5))
|
||||||
|
|
||||||
|
res, _ := client.CreateImportFile(ctx, 0, 8192) // 8KB file
|
||||||
|
list, err := client.ClientListImports(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, list, 1)
|
||||||
|
require.Equal(t, res.Root, *list[0].Root)
|
||||||
|
|
||||||
|
res2, _ := client.CreateImportFile(ctx, 0, 4096)
|
||||||
|
list, err = client.ClientListImports(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Len(t, list, 2)
|
||||||
|
require.Equal(t, res2.Root, *list[1].Root)
|
||||||
|
|
||||||
|
// first deal stays in staging area, and is not yet passed to the sealing subsystem
|
||||||
|
dp := dh.DefaultStartDealParams()
|
||||||
|
dp.Data.Root = res.Root
|
||||||
|
dp.FastRetrieval = true
|
||||||
|
dp.EpochPrice = abi.NewTokenAmount(62500000) // minimum asking price.
|
||||||
|
deal := dh.StartDeal(ctx, dp)
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
// expecting second deal to fail since staging area is full
|
||||||
|
dp.Data.Root = res2.Root
|
||||||
|
dp.FastRetrieval = true
|
||||||
|
dp.EpochPrice = abi.NewTokenAmount(62500000) // minimum asking price.
|
||||||
|
deal2 := dh.StartDeal(ctx, dp)
|
||||||
|
|
||||||
|
_ = deal
|
||||||
|
|
||||||
|
err = dh.ExpectDealFailure(ctx, deal2, "cannot accept deal as miner is overloaded at the moment")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
@ -2,9 +2,11 @@ package kit
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -175,6 +177,49 @@ loop:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (dh *DealHarness) ExpectDealFailure(ctx context.Context, deal *cid.Cid, errs string) error {
|
||||||
|
for {
|
||||||
|
di, err := dh.client.ClientGetDealInfo(ctx, *deal)
|
||||||
|
require.NoError(dh.t, err)
|
||||||
|
|
||||||
|
switch di.State {
|
||||||
|
case storagemarket.StorageDealAwaitingPreCommit, storagemarket.StorageDealSealing:
|
||||||
|
return fmt.Errorf("deal is sealing, and we expected an error: %s", errs)
|
||||||
|
case storagemarket.StorageDealProposalRejected:
|
||||||
|
if strings.Contains(di.Message, errs) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unexpected error: %s ; expected: %s", di.Message, errs)
|
||||||
|
case storagemarket.StorageDealFailing:
|
||||||
|
if strings.Contains(di.Message, errs) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unexpected error: %s ; expected: %s", di.Message, errs)
|
||||||
|
case storagemarket.StorageDealError:
|
||||||
|
if strings.Contains(di.Message, errs) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unexpected error: %s ; expected: %s", di.Message, errs)
|
||||||
|
case storagemarket.StorageDealActive:
|
||||||
|
return errors.New("expected to get an error, but didn't get one")
|
||||||
|
}
|
||||||
|
|
||||||
|
mds, err := dh.market.MarketListIncompleteDeals(ctx)
|
||||||
|
require.NoError(dh.t, err)
|
||||||
|
|
||||||
|
var minerState storagemarket.StorageDealStatus
|
||||||
|
for _, md := range mds {
|
||||||
|
if md.DealID == di.DealID {
|
||||||
|
minerState = md.State
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
dh.t.Logf("Deal %d state: client:%s provider:%s\n", di.DealID, storagemarket.DealStates[di.State], storagemarket.DealStates[minerState])
|
||||||
|
time.Sleep(time.Second / 2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WaitDealPublished waits until the deal is published.
|
// WaitDealPublished waits until the deal is published.
|
||||||
func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) {
|
func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) {
|
||||||
subCtx, cancel := context.WithCancel(ctx)
|
subCtx, cancel := context.WithCancel(ctx)
|
||||||
|
@ -453,6 +453,7 @@ func (n *Ensemble) Start() *Ensemble {
|
|||||||
cfg.Subsystems.EnableMining = m.options.subsystems.Has(SMining)
|
cfg.Subsystems.EnableMining = m.options.subsystems.Has(SMining)
|
||||||
cfg.Subsystems.EnableSealing = m.options.subsystems.Has(SSealing)
|
cfg.Subsystems.EnableSealing = m.options.subsystems.Has(SSealing)
|
||||||
cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage)
|
cfg.Subsystems.EnableSectorStorage = m.options.subsystems.Has(SSectorStorage)
|
||||||
|
cfg.Dealmaking.MaxStagingDealsBytes = m.options.maxStagingDealsBytes
|
||||||
|
|
||||||
if m.options.mainMiner != nil {
|
if m.options.mainMiner != nil {
|
||||||
token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions)
|
token, err := m.options.mainMiner.FullNode.AuthNew(ctx, api.AllPermissions)
|
||||||
|
@ -27,11 +27,12 @@ type nodeOpts struct {
|
|||||||
ownerKey *wallet.Key
|
ownerKey *wallet.Key
|
||||||
extraNodeOpts []node.Option
|
extraNodeOpts []node.Option
|
||||||
|
|
||||||
subsystems MinerSubsystem
|
subsystems MinerSubsystem
|
||||||
mainMiner *TestMiner
|
mainMiner *TestMiner
|
||||||
disableLibp2p bool
|
disableLibp2p bool
|
||||||
optBuilders []OptBuilder
|
optBuilders []OptBuilder
|
||||||
sectorSize abi.SectorSize
|
sectorSize abi.SectorSize
|
||||||
|
maxStagingDealsBytes int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultNodeOpts are the default options that will be applied to test nodes.
|
// DefaultNodeOpts are the default options that will be applied to test nodes.
|
||||||
@ -68,6 +69,13 @@ func WithSubsystems(systems ...MinerSubsystem) NodeOpt {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithMaxStagingDealsBytes(size int64) NodeOpt {
|
||||||
|
return func(opts *nodeOpts) error {
|
||||||
|
opts.maxStagingDealsBytes = size
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func DisableLibp2p() NodeOpt {
|
func DisableLibp2p() NodeOpt {
|
||||||
return func(opts *nodeOpts) error {
|
return func(opts *nodeOpts) error {
|
||||||
opts.disableLibp2p = true
|
opts.disableLibp2p = true
|
||||||
|
@ -165,7 +165,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
// Markets (storage)
|
// Markets (storage)
|
||||||
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
Override(new(dtypes.ProviderDataTransfer), modules.NewProviderDAGServiceDataTransfer),
|
||||||
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
|
Override(new(*storedask.StoredAsk), modules.NewStorageAsk),
|
||||||
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(nil)),
|
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, nil)),
|
||||||
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
|
Override(new(storagemarket.StorageProvider), modules.StorageProvider),
|
||||||
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
|
Override(new(*storageadapter.DealPublisher), storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{})),
|
||||||
Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds),
|
Override(HandleMigrateProviderFundsKey, modules.HandleMigrateProviderFunds),
|
||||||
@ -192,7 +192,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
|||||||
Override(new(dtypes.GetMaxDealStartDelayFunc), modules.NewGetMaxDealStartDelayFunc),
|
Override(new(dtypes.GetMaxDealStartDelayFunc), modules.NewGetMaxDealStartDelayFunc),
|
||||||
|
|
||||||
If(cfg.Dealmaking.Filter != "",
|
If(cfg.Dealmaking.Filter != "",
|
||||||
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(dealfilter.CliStorageDealFilter(cfg.Dealmaking.Filter))),
|
Override(new(dtypes.StorageDealFilter), modules.BasicDealFilter(cfg.Dealmaking, dealfilter.CliStorageDealFilter(cfg.Dealmaking.Filter))),
|
||||||
),
|
),
|
||||||
|
|
||||||
If(cfg.Dealmaking.RetrievalFilter != "",
|
If(cfg.Dealmaking.RetrievalFilter != "",
|
||||||
|
@ -251,6 +251,13 @@ message`,
|
|||||||
|
|
||||||
Comment: `The maximum collateral that the provider will put up against a deal,
|
Comment: `The maximum collateral that the provider will put up against a deal,
|
||||||
as a multiplier of the minimum collateral bound`,
|
as a multiplier of the minimum collateral bound`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "MaxStagingDealsBytes",
|
||||||
|
Type: "int64",
|
||||||
|
|
||||||
|
Comment: `The maximum allowed disk usage size in bytes of staging deals not yet
|
||||||
|
passed to the sealing node by the markets service. 0 is unlimited.`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "SimultaneousTransfers",
|
Name: "SimultaneousTransfers",
|
||||||
|
@ -126,7 +126,9 @@ type DealmakingConfig struct {
|
|||||||
// The maximum collateral that the provider will put up against a deal,
|
// The maximum collateral that the provider will put up against a deal,
|
||||||
// as a multiplier of the minimum collateral bound
|
// as a multiplier of the minimum collateral bound
|
||||||
MaxProviderCollateralMultiplier uint64
|
MaxProviderCollateralMultiplier uint64
|
||||||
|
// The maximum allowed disk usage size in bytes of staging deals not yet
|
||||||
|
// passed to the sealing node by the markets service. 0 is unlimited.
|
||||||
|
MaxStagingDealsBytes int64
|
||||||
// The maximum number of parallel online data transfers (storage+retrieval)
|
// The maximum number of parallel online data transfers (storage+retrieval)
|
||||||
SimultaneousTransfers uint64
|
SimultaneousTransfers uint64
|
||||||
|
|
||||||
|
@ -441,14 +441,16 @@ func NewStorageAsk(ctx helpers.MetricsCtx, fapi v1api.FullNode, ds dtypes.Metada
|
|||||||
storagemarket.MaxPieceSize(abi.PaddedPieceSize(mi.SectorSize)))
|
storagemarket.MaxPieceSize(abi.PaddedPieceSize(mi.SectorSize)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
|
func BasicDealFilter(cfg config.DealmakingConfig, user dtypes.StorageDealFilter) func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
|
||||||
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
|
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
|
||||||
verifiedOk dtypes.ConsiderVerifiedStorageDealsConfigFunc,
|
verifiedOk dtypes.ConsiderVerifiedStorageDealsConfigFunc,
|
||||||
unverifiedOk dtypes.ConsiderUnverifiedStorageDealsConfigFunc,
|
unverifiedOk dtypes.ConsiderUnverifiedStorageDealsConfigFunc,
|
||||||
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
|
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
|
||||||
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
|
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
|
||||||
startDelay dtypes.GetMaxDealStartDelayFunc,
|
startDelay dtypes.GetMaxDealStartDelayFunc,
|
||||||
spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter {
|
spn storagemarket.StorageProviderNode,
|
||||||
|
r repo.LockedRepo,
|
||||||
|
) dtypes.StorageDealFilter {
|
||||||
return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
|
return func(onlineOk dtypes.ConsiderOnlineStorageDealsConfigFunc,
|
||||||
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
|
offlineOk dtypes.ConsiderOfflineStorageDealsConfigFunc,
|
||||||
verifiedOk dtypes.ConsiderVerifiedStorageDealsConfigFunc,
|
verifiedOk dtypes.ConsiderVerifiedStorageDealsConfigFunc,
|
||||||
@ -456,7 +458,9 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
|
|||||||
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
|
blocklistFunc dtypes.StorageDealPieceCidBlocklistConfigFunc,
|
||||||
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
|
expectedSealTimeFunc dtypes.GetExpectedSealDurationFunc,
|
||||||
startDelay dtypes.GetMaxDealStartDelayFunc,
|
startDelay dtypes.GetMaxDealStartDelayFunc,
|
||||||
spn storagemarket.StorageProviderNode) dtypes.StorageDealFilter {
|
spn storagemarket.StorageProviderNode,
|
||||||
|
r repo.LockedRepo,
|
||||||
|
) dtypes.StorageDealFilter {
|
||||||
|
|
||||||
return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
|
return func(ctx context.Context, deal storagemarket.MinerDeal) (bool, string, error) {
|
||||||
b, err := onlineOk()
|
b, err := onlineOk()
|
||||||
@ -532,6 +536,16 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
|
|||||||
return false, "miner error", err
|
return false, "miner error", err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
diskUsageBytes, err := r.DiskUsage(r.Path() + "/deal-staging")
|
||||||
|
if err != nil {
|
||||||
|
return false, "miner error", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxStagingDealsBytes != 0 && diskUsageBytes >= cfg.MaxStagingDealsBytes {
|
||||||
|
log.Errorw("proposed deal rejected because there are too many deals in the staging area at the moment", "MaxStagingDealsBytes", cfg.MaxStagingDealsBytes, "DiskUsageBytes", diskUsageBytes)
|
||||||
|
return false, "cannot accept deal as miner is overloaded at the moment - there are too many staging deals being processed", nil
|
||||||
|
}
|
||||||
|
|
||||||
// Reject if it's more than 7 days in the future
|
// Reject if it's more than 7 days in the future
|
||||||
// TODO: read from cfg
|
// TODO: read from cfg
|
||||||
maxStartEpoch := earliest + abi.ChainEpoch(uint64(sd.Seconds())/build.BlockDelaySecs)
|
maxStartEpoch := earliest + abi.ChainEpoch(uint64(sd.Seconds())/build.BlockDelaySecs)
|
||||||
|
Loading…
Reference in New Issue
Block a user