diff --git a/.circleci/config.yml b/.circleci/config.yml index 2c972f0ff..4be6b8fce 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -917,6 +917,11 @@ workflows: suite: itest-sector_finalize_early target: "./itests/sector_finalize_early_test.go" + - test: + name: test-itest-sector_make_cc_avail + suite: itest-sector_make_cc_avail + target: "./itests/sector_make_cc_avail_test.go" + - test: name: test-itest-sector_miner_collateral suite: itest-sector_miner_collateral @@ -927,6 +932,16 @@ workflows: suite: itest-sector_pledge target: "./itests/sector_pledge_test.go" + - test: + name: test-itest-sector_prefer_no_upgrade + suite: itest-sector_prefer_no_upgrade + target: "./itests/sector_prefer_no_upgrade_test.go" + + - test: + name: test-itest-sector_revert_available + suite: itest-sector_revert_available + target: "./itests/sector_revert_available_test.go" + - test: name: test-itest-sector_terminate suite: itest-sector_terminate diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index de45be748..634da3b9d 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -325,18 +325,33 @@ # env var: LOTUS_SEALING_MAXWAITDEALSSECTORS #MaxWaitDealsSectors = 2 - # Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited) + # Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited) # # type: uint64 # env var: LOTUS_SEALING_MAXSEALINGSECTORS #MaxSealingSectors = 0 - # Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited) + # Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited) # # type: uint64 # env var: LOTUS_SEALING_MAXSEALINGSECTORSFORDEALS #MaxSealingSectorsForDeals = 0 + # Prefer creating new sectors even if there are sectors Available for upgrading. + # This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it + # possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing + # flow when the volume of storage deals is lower. + # + # type: bool + # env var: LOTUS_SEALING_PREFERNEWSECTORSFORDEALS + #PreferNewSectorsForDeals = false + + # Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals) + # + # type: uint64 + # env var: LOTUS_SEALING_MAXUPGRADINGSECTORS + #MaxUpgradingSectors = 0 + # CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will # live before it must be extended or converted into sector containing deals before it is # terminated. Value must be between 180-540 days inclusive diff --git a/extern/sector-storage/piece_provider.go b/extern/sector-storage/piece_provider.go index 72e09df06..32f0c9028 100644 --- a/extern/sector-storage/piece_provider.go +++ b/extern/sector-storage/piece_provider.go @@ -166,8 +166,6 @@ func (p *pieceProvider) ReadPiece(ctx context.Context, sector storage.SectorRef, r, err := p.tryReadUnsealedPiece(ctx, unsealed, sector, pieceOffset, size) - log.Debugf("result of first tryReadUnsealedPiece: r=%s, err=%s", r, err) - if xerrors.Is(err, storiface.ErrSectorNotFound) { log.Debugf("no unsealed sector file with unsealed piece, sector=%+v, pieceOffset=%d, size=%d", sector, pieceOffset, size) err = nil diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index b70bd809b..de3ae9f2a 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -554,7 +554,7 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize return curEpoch + minDur, curEpoch + maxDur, nil } -func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) { +func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) { if len(m.available) == 0 { return false, nil } @@ -623,56 +623,8 @@ func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSeal return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{}) } -func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error { - m.startupWait.Wait() - - if m.nextDealSector != nil { - return nil // new sector is being created right now - } - - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting storage config: %w", err) - } - - if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals { - return nil - } - - if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors { - return nil - } - - got, err := m.tryGetUpgradeSector(ctx, sp, ef) - if err != nil { - return err - } - if got { - return nil - } - - if !cfg.MakeNewSectorForDeals { - return nil - } - - sid, err := m.createSector(ctx, cfg, sp) - if err != nil { - return err - } - - m.nextDealSector = &sid - - log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) - return m.sectors.Send(uint64(sid), SectorStart{ - ID: sid, - SectorType: sp, - }) -} - // call with m.inputLk func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) { - // Now actually create a new sector - sid, err := m.sc.Next() if err != nil { return 0, xerrors.Errorf("getting sector number: %w", err) @@ -686,7 +638,74 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi // update stats early, fsm planner would do that async m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState) - return sid, nil + return sid, err +} + +func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error { + m.startupWait.Wait() + + if m.nextDealSector != nil { + return nil // new sector is being created right now + } + + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting storage config: %w", err) + } + + // if we're above WaitDeals limit, we don't want to add more staging sectors + if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors { + return nil + } + + maxUpgrading := cfg.MaxSealingSectorsForDeals + if cfg.MaxUpgradingSectors > 0 { + maxUpgrading = cfg.MaxUpgradingSectors + } + + canCreate := cfg.MakeNewSectorForDeals && !(cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals) + canUpgrade := !(maxUpgrading > 0 && m.stats.curSealing() >= maxUpgrading) + + // we want to try to upgrade when: + // - we can upgrade and prefer upgrades + // - we don't prefer upgrades, but can't create a new sector + shouldUpgrade := canUpgrade && (!cfg.PreferNewSectorsForDeals || !canCreate) + + log.Infow("new deal sector decision", + "sealing", m.stats.curSealing(), + "maxSeal", cfg.MaxSealingSectorsForDeals, + "maxUpgrade", maxUpgrading, + "preferNew", cfg.PreferNewSectorsForDeals, + "canCreate", canCreate, + "canUpgrade", canUpgrade, + "shouldUpgrade", shouldUpgrade) + + if shouldUpgrade { + got, err := m.maybeUpgradeSector(ctx, sp, ef) + if err != nil { + return err + } + if got { + return nil + } + } + + if canCreate { + sid, err := m.createSector(ctx, cfg, sp) + if err != nil { + return err + } + m.nextDealSector = &sid + + log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) + if err := m.sectors.Send(uint64(sid), SectorStart{ + ID: sid, + SectorType: sp, + }); err != nil { + return err + } + } + return nil } func (m *Sealing) StartPacking(sid abi.SectorNumber) error { diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index 20bd2b564..0470db38e 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -18,6 +18,10 @@ type Config struct { // includes failed, 0 = no limit MaxSealingSectorsForDeals uint64 + PreferNewSectorsForDeals bool + + MaxUpgradingSectors uint64 + MakeNewSectorForDeals bool MakeCCSectorsAvailable bool diff --git a/itests/ccupgrade_test.go b/itests/ccupgrade_test.go index ac89088c2..cd876a3fd 100644 --- a/itests/ccupgrade_test.go +++ b/itests/ccupgrade_test.go @@ -7,16 +7,13 @@ import ( "testing" "time" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/network" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" - sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/itests/kit" ) @@ -32,7 +29,23 @@ func TestCCUpgrade(t *testing.T) { //stm: @MINER_SECTOR_LIST_001 kit.QuietMiningLogs() - runTestCCUpgrade(t) + n := runTestCCUpgrade(t) + + t.Run("post", func(t *testing.T) { + ctx := context.Background() + ts, err := n.ChainHead(ctx) + require.NoError(t, err) + start := ts.Height() + // wait for a full proving period + t.Log("waiting for chain") + + n.WaitTillChain(ctx, func(ts *types.TipSet) bool { + if ts.Height() > start+abi.ChainEpoch(2880) { + return true + } + return false + }) + }) } func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { @@ -60,7 +73,7 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { require.NoError(t, err) require.Less(t, 50000, int(si.Expiration)) } - waitForSectorActive(ctx, t, CCUpgrade, client, maddr) + client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) //stm: @SECTOR_CC_UPGRADE_001 err = miner.SectorMarkForUpgrade(ctx, sl[0], true) @@ -88,104 +101,3 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { return client } - -func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) { - for { - active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK) - require.NoError(t, err) - for _, si := range active { - if si.SectorNumber == sn { - fmt.Printf("ACTIVE\n") - return - } - } - - time.Sleep(time.Second) - } -} - -func TestCCUpgradeAndPoSt(t *testing.T) { - kit.QuietMiningLogs() - t.Run("upgrade and then post", func(t *testing.T) { - ctx := context.Background() - n := runTestCCUpgrade(t) - ts, err := n.ChainHead(ctx) - require.NoError(t, err) - start := ts.Height() - // wait for a full proving period - t.Log("waiting for chain") - - n.WaitTillChain(ctx, func(ts *types.TipSet) bool { - if ts.Height() > start+abi.ChainEpoch(2880) { - return true - } - return false - }) - }) -} - -func TestAbortUpgradeAvailable(t *testing.T) { - kit.QuietMiningLogs() - - ctx := context.Background() - blockTime := 1 * time.Millisecond - - client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC()) - ens.InterconnectAll().BeginMiningMustPost(blockTime) - - maddr, err := miner.ActorAddress(ctx) - if err != nil { - t.Fatal(err) - } - - CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) - fmt.Printf("CCUpgrade: %d\n", CCUpgrade) - - miner.PledgeSectors(ctx, 1, 0, nil) - sl, err := miner.SectorsList(ctx) - require.NoError(t, err) - require.Len(t, sl, 1, "expected 1 sector") - require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") - { - si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) - require.NoError(t, err) - require.Less(t, 50000, int(si.Expiration)) - } - waitForSectorActive(ctx, t, CCUpgrade, client, maddr) - - err = miner.SectorMarkForUpgrade(ctx, sl[0], true) - require.NoError(t, err) - - sl, err = miner.SectorsList(ctx) - require.NoError(t, err) - require.Len(t, sl, 1, "expected 1 sector") - - ss, err := miner.SectorsStatus(ctx, sl[0], false) - require.NoError(t, err) - - for i := 0; i < 100; i++ { - ss, err = miner.SectorsStatus(ctx, sl[0], false) - require.NoError(t, err) - if ss.State == api.SectorState(sealing.Proving) { - time.Sleep(50 * time.Millisecond) - continue - } - - require.Equal(t, api.SectorState(sealing.Available), ss.State) - break - } - - require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0])) - - for i := 0; i < 100; i++ { - ss, err = miner.SectorsStatus(ctx, sl[0], false) - require.NoError(t, err) - if ss.State == api.SectorState(sealing.Available) { - time.Sleep(50 * time.Millisecond) - continue - } - - require.Equal(t, api.SectorState(sealing.Proving), ss.State) - break - } -} diff --git a/itests/kit/node_full.go b/itests/kit/node_full.go index b606db8f4..1714e01e0 100644 --- a/itests/kit/node_full.go +++ b/itests/kit/node_full.go @@ -2,7 +2,9 @@ package kit import ( "context" + "fmt" "testing" + "time" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" @@ -67,6 +69,21 @@ func (f *TestFullNode) WaitTillChain(ctx context.Context, pred ChainPredicate) * return nil } +func (f *TestFullNode) WaitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, maddr address.Address) { + for { + active, err := f.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + for _, si := range active { + if si.SectorNumber == sn { + fmt.Printf("ACTIVE\n") + return + } + } + + time.Sleep(time.Second) + } +} + // ChainPredicate encapsulates a chain condition. type ChainPredicate func(set *types.TipSet) bool diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index 866c1124b..02fe90807 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -99,7 +99,7 @@ func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.Sec st, err := tm.StorageMiner.SectorsStatus(ctx, n, false) require.NoError(tm.t, err) states[st.State]++ - if st.State == api.SectorState(sealing.Proving) { + if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) { delete(toCheck, n) } if strings.Contains(string(st.State), "Fail") { diff --git a/itests/kit/node_opts.go b/itests/kit/node_opts.go index 1fd449e5f..8cb49c9b2 100644 --- a/itests/kit/node_opts.go +++ b/itests/kit/node_opts.go @@ -3,6 +3,11 @@ package kit import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" @@ -144,6 +149,17 @@ func ConstructorOpts(extra ...node.Option) NodeOpt { } } +func MutateSealingConfig(mut func(sc *config.SealingConfig)) NodeOpt { + return ConstructorOpts( + node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { + return func() (sealiface.Config, error) { + cf := config.DefaultStorageMiner() + mut(&cf.Sealing) + return modules.ToSealingConfig(cf.Dealmaking, cf.Sealing), nil + }, nil + }))) +} + // SectorSize sets the sector size for this miner. Start() will populate the // corresponding proof type depending on the network version (genesis network // version if the Ensemble is unstarted, or the current network version diff --git a/itests/sector_finalize_early_test.go b/itests/sector_finalize_early_test.go index 0f0fcdec6..7974870b6 100644 --- a/itests/sector_finalize_early_test.go +++ b/itests/sector_finalize_early_test.go @@ -9,13 +9,8 @@ import ( "github.com/stretchr/testify/require" - "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/itests/kit" - "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/modules" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/repo" ) func TestDealsWithFinalizeEarly(t *testing.T) { @@ -34,14 +29,7 @@ func TestDealsWithFinalizeEarly(t *testing.T) { var blockTime = 50 * time.Millisecond - client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts( - node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { - return func() (sealiface.Config, error) { - cf := config.DefaultStorageMiner() - cf.Sealing.FinalizeEarly = true - return modules.ToSealingConfig(cf.Dealmaking, cf.Sealing), nil - }, nil - })))) // no mock proofs. + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) { sc.FinalizeEarly = true })) // no mock proofs. ens.InterconnectAll().BeginMining(blockTime) dh := kit.NewDealHarness(t, client, miner, miner) diff --git a/itests/sector_make_cc_avail_test.go b/itests/sector_make_cc_avail_test.go new file mode 100644 index 000000000..094367e96 --- /dev/null +++ b/itests/sector_make_cc_avail_test.go @@ -0,0 +1,77 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/network" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node/config" +) + +func TestMakeAvailable(t *testing.T) { + kit.QuietMiningLogs() + + ctx := context.Background() + blockTime := 1 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) { + sc.MakeCCSectorsAvailable = true + })) + ens.InterconnectAll().BeginMiningMustPost(blockTime) + + maddr, err := miner.ActorAddress(ctx) + if err != nil { + t.Fatal(err) + } + + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + + miner.PledgeSectors(ctx, 1, 0, nil) + sl, err := miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") + { + si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) + require.NoError(t, err) + require.Less(t, 50000, int(si.Expiration)) + } + client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) + + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + + status, err := miner.SectorsStatus(ctx, CCUpgrade, true) + require.NoError(t, err) + assert.Equal(t, api.SectorState(sealing.Available), status.State) + + dh := kit.NewDealHarness(t, client, miner, miner) + deal, res, inPath := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{ + Rseed: 6, + SuspendUntilCryptoeconStable: true, + }) + outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, false) + kit.AssertFilesEqual(t, inPath, outPath) + + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + + status, err = miner.SectorsStatus(ctx, CCUpgrade, true) + require.NoError(t, err) + assert.Equal(t, 1, len(status.Deals)) + miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{ + CCUpgrade: {}, + }) +} diff --git a/itests/sector_prefer_no_upgrade_test.go b/itests/sector_prefer_no_upgrade_test.go new file mode 100644 index 000000000..11fd2c1de --- /dev/null +++ b/itests/sector_prefer_no_upgrade_test.go @@ -0,0 +1,89 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/network" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/itests/kit" + "github.com/filecoin-project/lotus/node/config" +) + +func TestPreferNoUpgrade(t *testing.T) { + kit.QuietMiningLogs() + + ctx := context.Background() + blockTime := 1 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC(), kit.MutateSealingConfig(func(sc *config.SealingConfig) { + sc.PreferNewSectorsForDeals = true + })) + ens.InterconnectAll().BeginMiningMustPost(blockTime) + + maddr, err := miner.ActorAddress(ctx) + if err != nil { + t.Fatal(err) + } + + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + Sealed := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 2) + + { + miner.PledgeSectors(ctx, 1, 0, nil) + sl, err := miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") + { + si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) + require.NoError(t, err) + require.Less(t, 50000, int(si.Expiration)) + } + client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) + + err = miner.SectorMarkForUpgrade(ctx, sl[0], true) + require.NoError(t, err) + + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + } + + { + dh := kit.NewDealHarness(t, client, miner, miner) + deal, res, inPath := dh.MakeOnlineDeal(ctx, kit.MakeFullDealParams{ + Rseed: 6, + SuspendUntilCryptoeconStable: true, + }) + outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, false) + kit.AssertFilesEqual(t, inPath, outPath) + } + + sl, err := miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 2, "expected 2 sectors") + + { + status, err := miner.SectorsStatus(ctx, CCUpgrade, true) + require.NoError(t, err) + assert.Equal(t, api.SectorState(sealing.Available), status.State) + } + + { + status, err := miner.SectorsStatus(ctx, Sealed, true) + require.NoError(t, err) + assert.Equal(t, 1, len(status.Deals)) + miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{ + Sealed: {}, + }) + } +} diff --git a/itests/sector_revert_available_test.go b/itests/sector_revert_available_test.go new file mode 100644 index 000000000..6827a85fa --- /dev/null +++ b/itests/sector_revert_available_test.go @@ -0,0 +1,84 @@ +package itests + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/network" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/itests/kit" +) + +func TestAbortUpgradeAvailable(t *testing.T) { + kit.QuietMiningLogs() + + ctx := context.Background() + blockTime := 1 * time.Millisecond + + client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC()) + ens.InterconnectAll().BeginMiningMustPost(blockTime) + + maddr, err := miner.ActorAddress(ctx) + if err != nil { + t.Fatal(err) + } + + CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) + fmt.Printf("CCUpgrade: %d\n", CCUpgrade) + + miner.PledgeSectors(ctx, 1, 0, nil) + sl, err := miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") + { + si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) + require.NoError(t, err) + require.Less(t, 50000, int(si.Expiration)) + } + client.WaitForSectorActive(ctx, t, CCUpgrade, maddr) + + err = miner.SectorMarkForUpgrade(ctx, sl[0], true) + require.NoError(t, err) + + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") + + ss, err := miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + + for i := 0; i < 100; i++ { + ss, err = miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + if ss.State == api.SectorState(sealing.Proving) { + time.Sleep(50 * time.Millisecond) + continue + } + + require.Equal(t, api.SectorState(sealing.Available), ss.State) + break + } + + require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0])) + + for i := 0; i < 100; i++ { + ss, err = miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + if ss.State == api.SectorState(sealing.Available) { + time.Sleep(50 * time.Millisecond) + continue + } + + require.Equal(t, api.SectorState(sealing.Proving), ss.State) + break + } +} diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 972c196f7..ba5ffcc03 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -714,13 +714,28 @@ Note that setting this number too high in relation to deal ingestion rate may re Name: "MaxSealingSectors", Type: "uint64", - Comment: `Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited)`, + Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited)`, }, { Name: "MaxSealingSectorsForDeals", Type: "uint64", - Comment: `Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited)`, + Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited)`, + }, + { + Name: "PreferNewSectorsForDeals", + Type: "bool", + + Comment: `Prefer creating new sectors even if there are sectors Available for upgrading. +This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it +possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing +flow when the volume of storage deals is lower.`, + }, + { + Name: "MaxUpgradingSectors", + Type: "uint64", + + Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)`, }, { Name: "CommittedCapacitySectorLifetime", diff --git a/node/config/types.go b/node/config/types.go index 2e9357993..b3ba36c7f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -228,12 +228,21 @@ type SealingConfig struct { // 0 = no limit MaxWaitDealsSectors uint64 - // Upper bound on how many sectors can be sealing at the same time when creating new CC sectors (0 = unlimited) + // Upper bound on how many sectors can be sealing+upgrading at the same time when creating new CC sectors (0 = unlimited) MaxSealingSectors uint64 - // Upper bound on how many sectors can be sealing at the same time when creating new sectors with deals (0 = unlimited) + // Upper bound on how many sectors can be sealing+upgrading at the same time when creating new sectors with deals (0 = unlimited) MaxSealingSectorsForDeals uint64 + // Prefer creating new sectors even if there are sectors Available for upgrading. + // This setting combined with MaxUpgradingSectors set to a value higher than MaxSealingSectorsForDeals makes it + // possible to use fast sector upgrades to handle high volumes of storage deals, while still using the simple sealing + // flow when the volume of storage deals is lower. + PreferNewSectorsForDeals bool + + // Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals) + MaxUpgradingSectors uint64 + // CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will // live before it must be extended or converted into sector containing deals before it is // terminated. Value must be between 180-540 days inclusive diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 179363e29..4dc062349 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -916,6 +916,8 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error MaxWaitDealsSectors: cfg.MaxWaitDealsSectors, MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, + PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals, + MaxUpgradingSectors: cfg.MaxUpgradingSectors, CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime), WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable, @@ -954,6 +956,8 @@ func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.Se MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors, MaxSealingSectors: sealingCfg.MaxSealingSectors, MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals, + PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals, + MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors, StartEpochSealingBuffer: abi.ChainEpoch(dealmakingCfg.StartEpochSealingBuffer), MakeNewSectorForDeals: dealmakingCfg.MakeNewSectorForDeals, CommittedCapacitySectorLifetime: time.Duration(sealingCfg.CommittedCapacitySectorLifetime),