Merge branch 'release/v1.15.1' into feat/fvm

This commit is contained in:
Aayush Rajasekaran 2022-03-17 11:31:56 -04:00
commit 2ed2ee1da7
37 changed files with 833 additions and 403 deletions

View File

@ -1,5 +1,134 @@
# Lotus changelog # Lotus changelog
# 1.15.0 / 2022-03-09
This is an optional release with retrieval improvements(client side), SP ux with unsealing, snap deals and regular deal making and many other new features, improvements and bug fixes.
## Highlights
- feat:sealing: StartEpochSealingBuffer triggers packing on time([filecoin-project/lotus#7905](https://github.com/filecoin-project/lotus/pull/7905))
- use the `StartEpochSealingBuffer` configuration variable as a way to enforce that sectors are packed for sealing / updating no matter how many deals they have if the nearest deal start date is close enough to the present.
- feat: #6017 market: retrieval ask CLI command ([filecoin-project/lotus#7814](https://github.com/filecoin-project/lotus/pull/7814))
- feat(graphsync): allow setting of per-peer incoming requests for miners ([filecoin-project/lotus#7578](https://github.com/filecoin-project/lotus/pull/7578))
- by setting `SimultaneousTransfersForStoragePerClient` in deal making configuration.
- Make retrieval even faster ([filecoin-project/lotus#7746](https://github.com/filecoin-project/lotus/pull/7746))
- feat: #7747 sealing: Adding conf variable for capping number of concurrent unsealing jobs (#7884) ([filecoin-project/lotus#7884](https://github.com/filecoin-project/lotus/pull/7884))
- by setting `MaxConcurrentUnseals` in `DAGStoreConfig`
## New Features
- feat: mpool: Cache state nonces ([filecoin-project/lotus#8005](https://github.com/filecoin-project/lotus/pull/8005))
- chore: build: make the OhSnap epoch configurable by an envvar for devnets ([filecoin-project/lotus#7995](https://github.com/filecoin-project/lotus/pull/7995))
- Shed: Add a util to send a batch of messages ([filecoin-project/lotus#7667](https://github.com/filecoin-project/lotus/pull/7667))
- Add api for transfer diagnostics ([filecoin-project/lotus#7759](https://github.com/filecoin-project/lotus/pull/7759))
- Shed: Add a util to list terminated deals ([filecoin-project/lotus#7774](https://github.com/filecoin-project/lotus/pull/7774))
- Expose EnableGasTracing as an env_var ([filecoin-project/lotus#7750](https://github.com/filecoin-project/lotus/pull/7750))
- Command to list active sector locks ([filecoin-project/lotus#7735](https://github.com/filecoin-project/lotus/pull/7735))
- Initial switch to OpenTelemetry ([filecoin-project/lotus#7725](https://github.com/filecoin-project/lotus/pull/7725))
## Improvements
- splitstore sortless compaction ([filecoin-project/lotus#8008](https://github.com/filecoin-project/lotus/pull/8008))
- perf: chain: Make drand logs in daemon less noisy (#7955) ([filecoin-project/lotus#7955](https://github.com/filecoin-project/lotus/pull/7955))
- chore: shed: storage stats 2.0 ([filecoin-project/lotus#7941](https://github.com/filecoin-project/lotus/pull/7941))
- misc: api: Annotate lotus tests according to listed behaviors ([filecoin-project/lotus#7835](https://github.com/filecoin-project/lotus/pull/7835))
- some basic splitstore refactors ([filecoin-project/lotus#7999](https://github.com/filecoin-project/lotus/pull/7999))
- chore: sealer: quieten a log ([filecoin-project/lotus#7998](https://github.com/filecoin-project/lotus/pull/7998))
- tvx: supply network version when extracting messages. ([filecoin-project/lotus#7996](https://github.com/filecoin-project/lotus/pull/7996))
- chore: remove inaccurate comment in sealtasks ([filecoin-project/lotus#7977](https://github.com/filecoin-project/lotus/pull/7977))
- Refactor: VM: Remove the NetworkVersionGetter ([filecoin-project/lotus#7818](https://github.com/filecoin-project/lotus/pull/7818))
- refactor: state: Move randomness versioning out of the VM ([filecoin-project/lotus#7816](https://github.com/filecoin-project/lotus/pull/7816))
- updating to new datastore/blockstore code with contexts ([filecoin-project/lotus#7646](https://github.com/filecoin-project/lotus/pull/7646))
- Mempool msg selection should respect block message limits ([filecoin-project/lotus#7321](https://github.com/filecoin-project/lotus/pull/7321))
- Minor improvement for OpenTelemetry ([filecoin-project/lotus#7760](https://github.com/filecoin-project/lotus/pull/7760))
- Sort lotus-miner retrieval-deals by dealId ([filecoin-project/lotus#7749](https://github.com/filecoin-project/lotus/pull/7749))
- dagstore pieceReader: Always read full in ReadAt ([filecoin-project/lotus#7737](https://github.com/filecoin-project/lotus/pull/7737))
## Bug Fixes
- fix: sealing: Stop recovery attempts after fault ([filecoin-project/lotus#8014](https://github.com/filecoin-project/lotus/pull/8014))
- fix:snap: pay for the collateral difference needed if the miner available balance is insufficient ([filecoin-project/lotus#8234](https://github.com/filecoin-project/lotus/pull/8234))
- sealer: fix error message ([filecoin-project/lotus#8136](https://github.com/filecoin-project/lotus/pull/8136))
- typo in variable name ([filecoin-project/lotus#8134](https://github.com/filecoin-project/lotus/pull/8134))
- fix: sealer: allow enable/disabling ReplicaUpdate tasks ([filecoin-project/lotus#8093](https://github.com/filecoin-project/lotus/pull/8093))
- chore: chain: fix log ([filecoin-project/lotus#7993](https://github.com/filecoin-project/lotus/pull/7993))
- Fix: chain: create a new VM for each epoch ([filecoin-project/lotus#7966](https://github.com/filecoin-project/lotus/pull/7966))
- fix: doc generation struct slice example value ([filecoin-project/lotus#7851](https://github.com/filecoin-project/lotus/pull/7851))
- fix: returned error not be accept correctly ([filecoin-project/lotus#7852](https://github.com/filecoin-project/lotus/pull/7852))
- fix: #7577 markets: When retrying Add Piece, first seek to start of reader ([filecoin-project/lotus#7812](https://github.com/filecoin-project/lotus/pull/7812))
- misc: n/a sealing: Fix grammatical error in a log warning message ([filecoin-project/lotus#7831](https://github.com/filecoin-project/lotus/pull/7831))
- sectors update-state checks if sector exists before changing its state ([filecoin-project/lotus#7762](https://github.com/filecoin-project/lotus/pull/7762))
- SplitStore: supress compaction near upgrades ([filecoin-project/lotus#7734](https://github.com/filecoin-project/lotus/pull/7734))
## Dependency Updates
- github.com/filecoin-project/go-commp-utils (v0.1.2 -> v0.1.3):
- github.com/filecoin-project/dagstore (v0.4.3 -> v0.4.4):
- github.com/filecoin-project/go-fil-markets (v1.13.4 -> v1.19.2):
- github.com/filecoin-project/go-statestore (v0.1.1 -> v0.2.0):
- github.com/filecoin-project/go-storedcounter (v0.0.0-20200421200003-1c99c62e8a5b -> v0.1.0):
- github.com/filecoin-project/specs-actors/v2 (v2.3.5 -> v2.3.6):
- feat(deps): update markets stack ([filecoin-project/lotus#7959](https://github.com/filecoin-project/lotus/pull/7959))
- Use go-libp2p-connmgr v0.3.1 ([filecoin-project/lotus#7957](https://github.com/filecoin-project/lotus/pull/7957))
- dep/fix 7701 Dependency: update to ipld-legacy to v0.1.1 ([filecoin-project/lotus#7751](https://github.com/filecoin-project/lotus/pull/7751))
## Others
- chore: backport: release ([filecoin-project/lotus#8245](https://github.com/filecoin-project/lotus/pull/8245))
- Lotus release v1.15.0-rc3 ([filecoin-project/lotus#8236](https://github.com/filecoin-project/lotus/pull/8236))
- Lotus release v1.15.0-rc2 ([filecoin-project/lotus#8211](https://github.com/filecoin-project/lotus/pull/8211))
- Merge branch 'releases' into release/v1.15.0
- chore: build: backport releases ([filecoin-project/lotus#8193](https://github.com/filecoin-project/lotus/pull/8193))
- Merge branch 'releases' into release/v1.15.0
- bump the version to v1.15.0-rc1
- chore: build: v1.14.0 -> master ([filecoin-project/lotus#8053](https://github.com/filecoin-project/lotus/pull/8053))
- chore: merge release/v1.14.0 PRs into master ([filecoin-project/lotus#7979](https://github.com/filecoin-project/lotus/pull/7979))
- chore: update PR template ([filecoin-project/lotus#7918](https://github.com/filecoin-project/lotus/pull/7918))
- build: release: bump master version to v1.15.0-dev ([filecoin-project/lotus#7922](https://github.com/filecoin-project/lotus/pull/7922))
- misc: docs: remove issue number from the pr title ([filecoin-project/lotus#7902](https://github.com/filecoin-project/lotus/pull/7902))
- Snapcraft grade no develgrade ([filecoin-project/lotus#7802](https://github.com/filecoin-project/lotus/pull/7802))
- chore: create pull_request_template.md ([filecoin-project/lotus#7726](https://github.com/filecoin-project/lotus/pull/7726))
- Disable appimage ([filecoin-project/lotus#7707](https://github.com/filecoin-project/lotus/pull/7707))
## Contributors
| Contributor | Commits | Lines ± | Files Changed |
|-------------|---------|---------|---------------|
| @arajasek | 73 | +7232/-2778 | 386 |
| @zenground0 | 27 | +5604/-1049 | 219 |
| @vyzo | 118 | +4356/-1470 | 253 |
| @zl | 1 | +3725/-309 | 8 |
| @dirkmc | 7 | +1392/-1110 | 61 |
| arajasek | 37 | +221/-1329 | 90 |
| @magik6k | 33 | +1138/-336 | 101 |
| @whyrusleeping | 2 | +483/-585 | 28 |
| Darko Brdareski | 14 | +725/-276 | 154 |
| @rvagg | 2 | +43/-947 | 10 |
| @hannahhoward | 5 | +436/-335 | 31 |
| @hannahhoward | 12 | +507/-133 | 37 |
| @jennijuju | 27 | +333/-178 | 54 |
| @TheMenko | 8 | +237/-179 | 17 |
| c r | 2 | +227/-45 | 12 |
| @dirkmck | 12 | +188/-40 | 27 |
| @ribasushi | 3 | +128/-62 | 3 |
| @raulk | 6 | +128/-49 | 9 |
| @Whyrusleeping | 1 | +76/-70 | 8 |
| @Stebalien | 1 | +55/-37 | 1 |
| @jennijuju | 11 | +29/-16 | 11 |
| @aarshkshah1992 | 1 | +23/-19 | 5 |
| @travisperson | 1 | +0/-18 | 2 |
| @gstuart | 3 | +12/-1 | 3 |
| @coryschwartz | 4 | +5/-6 | 4 |
| @pefish | 1 | +4/-3 | 1 |
| @Kubuxu | 1 | +5/-2 | 2 |
| Colin Kennedy | 1 | +4/-2 | 1 |
| Rob Quist | 1 | +2/-2 | 1 |
| @shotcollin | 1 | +1/-1 | 1 |
# 1.14.4 / 2022-03-03
This is a *highly recommended* optional release for storage providers that are doing snap deals. This fix the bug
that causes some snap deal sectors are stuck in `FinalizeReplicaUpdate`. In addition, SPs should be able to force
update sectors status without getting blocked by `normal shutdown of state machine`.
# v1.14.3 / 2022-02-28
This is an **optional** release, that includes a fix to properly register the `--really-do-it` flag for abort-upgrade.
# 1.14.2 / 2022-02-24 # 1.14.2 / 2022-02-24
This is an **optional** release of lotus, that's had a couple more improvements w.r.t Snap experience for storage providers in preparation of the[upcoming OhSnap upgrade](https://github.com/filecoin-project/community/discussions/74?sort=new#discussioncomment-1922550). This is an **optional** release of lotus, that's had a couple more improvements w.r.t Snap experience for storage providers in preparation of the[upcoming OhSnap upgrade](https://github.com/filecoin-project/community/discussions/74?sort=new#discussioncomment-1922550).
@ -878,7 +1007,7 @@ This is a **highly recommended** but optional Lotus v1.11.1 release that introd
| dependabot[bot] | 1 | +3/-3 | 1 | | dependabot[bot] | 1 | +3/-3 | 1 |
| zhoutian527 | 1 | +2/-2 | 1 | | zhoutian527 | 1 | +2/-2 | 1 |
| xloem | 1 | +4/-0 | 1 | | xloem | 1 | +4/-0 | 1 |
| @travisperson| 2 | +2/-2 | 3 | | | 2 | +2/-2 | 3 |
| Liviu Damian | 2 | +2/-2 | 2 | | Liviu Damian | 2 | +2/-2 | 2 |
| @jimpick | 2 | +2/-2 | 2 | | @jimpick | 2 | +2/-2 | 2 |
| Frank | 1 | +3/-0 | 1 | | Frank | 1 | +3/-0 | 1 |
@ -890,6 +1019,7 @@ This is a **highly recommended** but optional Lotus v1.11.1 release that introd
This is a **highly recommended** release of Lotus that have many bug fixes, improvements and new features. This is a **highly recommended** release of Lotus that have many bug fixes, improvements and new features.
## Highlights ## Highlights
- Miner SimultaneousTransfers config ([filecoin-project/lotus#6612](https://github.com/filecoin-project/lotus/pull/6612))
- Miner SimultaneousTransfers config ([filecoin-project/lotus#6612](https://github.com/filecoin-project/lotus/pull/6612)) - Miner SimultaneousTransfers config ([filecoin-project/lotus#6612](https://github.com/filecoin-project/lotus/pull/6612))
- Set `SimultaneousTransfers` in lotus miner config to configure the maximum number of parallel online data transfers, including both storage and retrieval deals. - Set `SimultaneousTransfers` in lotus miner config to configure the maximum number of parallel online data transfers, including both storage and retrieval deals.
- Dynamic Retrieval pricing ([filecoin-project/lotus#6175](https://github.com/filecoin-project/lotus/pull/6175)) - Dynamic Retrieval pricing ([filecoin-project/lotus#6175](https://github.com/filecoin-project/lotus/pull/6175))
@ -1044,7 +1174,7 @@ This is a **highly recommended** release of Lotus that have many bug fixes, impr
| @Stebalien | 106 | +7653/-2718 | 273 | | @Stebalien | 106 | +7653/-2718 | 273 |
| dirkmc | 11 | +2580/-1371 | 77 | | dirkmc | 11 | +2580/-1371 | 77 |
| @dirkmc | 39 | +1865/-1194 | 79 | | @dirkmc | 39 | +1865/-1194 | 79 |
| @Kubuxu | 19 | +1973/-485 | 81 | | | 19 | +1973/-485 | 81 |
| @vyzo | 4 | +1748/-330 | 50 | | @vyzo | 4 | +1748/-330 | 50 |
| @aarshkshah1992 | 5 | +1462/-213 | 27 | | @aarshkshah1992 | 5 | +1462/-213 | 27 |
| @coryschwartz | 35 | +568/-206 | 59 | | @coryschwartz | 35 | +568/-206 | 59 |

View File

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/libp2p/go-libp2p-core/network"
datatransfer "github.com/filecoin-project/go-data-transfer" datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
@ -12,7 +14,6 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
@ -124,12 +125,6 @@ func NewDataTransferChannel(hostID peer.ID, channelState datatransfer.ChannelSta
return channel return channel
} }
type NetBlockList struct {
Peers []peer.ID
IPAddrs []string
IPSubnets []string
}
type NetStat struct { type NetStat struct {
System *network.ScopeStat `json:",omitempty"` System *network.ScopeStat `json:",omitempty"`
Transient *network.ScopeStat `json:",omitempty"` Transient *network.ScopeStat `json:",omitempty"`
@ -152,6 +147,12 @@ type NetLimit struct {
FD int FD int
} }
type NetBlockList struct {
Peers []peer.ID
IPAddrs []string
IPSubnets []string
}
type ExtendedPeerInfo struct { type ExtendedPeerInfo struct {
ID peer.ID ID peer.ID
Agent string Agent string

View File

@ -57,8 +57,8 @@ var (
FullAPIVersion0 = newVer(1, 5, 0) FullAPIVersion0 = newVer(1, 5, 0)
FullAPIVersion1 = newVer(2, 2, 0) FullAPIVersion1 = newVer(2, 2, 0)
MinerAPIVersion0 = newVer(1, 4, 0) MinerAPIVersion0 = newVer(1, 5, 0)
WorkerAPIVersion0 = newVer(1, 5, 0) WorkerAPIVersion0 = newVer(1, 6, 0)
) )
//nolint:varcheck,deadcode //nolint:varcheck,deadcode

View File

@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{}
var stateList = []stateMeta{ var stateList = []stateMeta{
{col: 39, state: "Total"}, {col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving}, {col: color.FgGreen, state: sealing.Proving},
{col: color.FgGreen, state: sealing.Available},
{col: color.FgGreen, state: sealing.UpdateActivating}, {col: color.FgGreen, state: sealing.UpdateActivating},
{col: color.FgBlue, state: sealing.Empty}, {col: color.FgBlue, state: sealing.Empty},

View File

@ -351,7 +351,7 @@ var sectorsListCmd = &cli.Command{
if cctx.Bool("unproven") { if cctx.Bool("unproven") {
for state := range sealing.ExistSectorStateList { for state := range sealing.ExistSectorStateList {
if state == sealing.Proving { if state == sealing.Proving || state == sealing.Available {
continue continue
} }
states = append(states, api.SectorState(state)) states = append(states, api.SectorState(state))

View File

@ -598,7 +598,7 @@ var storageListSectorsCmd = &cli.Command{
ft storiface.SectorFileType ft storiface.SectorFileType
urls string urls string
primary, seal, store bool primary, copy, main, seal, store bool
state api.SectorState state api.SectorState
} }
@ -626,6 +626,9 @@ var storageListSectorsCmd = &cli.Command{
urls: strings.Join(info.URLs, ";"), urls: strings.Join(info.URLs, ";"),
primary: info.Primary, primary: info.Primary,
copy: !info.Primary && len(si) > 1,
main: !info.Primary && len(si) == 1, // only copy, but not primary
seal: info.CanSeal, seal: info.CanSeal,
store: info.CanStore, store: info.CanStore,
@ -680,7 +683,7 @@ var storageListSectorsCmd = &cli.Command{
"Sector": e.id, "Sector": e.id,
"Type": e.ft.String(), "Type": e.ft.String(),
"State": color.New(stateOrder[sealing.SectorState(e.state)].col).Sprint(e.state), "State": color.New(stateOrder[sealing.SectorState(e.state)].col).Sprint(e.state),
"Primary": maybeStr(e.seal, color.FgGreen, "primary"), "Primary": maybeStr(e.primary, color.FgGreen, "primary") + maybeStr(e.copy, color.FgBlue, "copy") + maybeStr(e.main, color.FgRed, "main"),
"Path use": maybeStr(e.seal, color.FgMagenta, "seal ") + maybeStr(e.store, color.FgCyan, "store"), "Path use": maybeStr(e.seal, color.FgMagenta, "seal ") + maybeStr(e.store, color.FgCyan, "store"),
"URLs": e.urls, "URLs": e.urls,
} }

View File

@ -277,7 +277,7 @@
# #
# type: bool # type: bool
# env var: LOTUS_INDEXPROVIDER_ENABLE # env var: LOTUS_INDEXPROVIDER_ENABLE
#Enable = false #Enable = true
# EntriesCacheCapacity sets the maximum capacity to use for caching the indexing advertisement # EntriesCacheCapacity sets the maximum capacity to use for caching the indexing advertisement
# entries. Defaults to 1024 if not specified. The cache is evicted using LRU policy. The # entries. Defaults to 1024 if not specified. The cache is evicted using LRU policy. The
@ -365,6 +365,12 @@
# env var: LOTUS_SEALING_FINALIZEEARLY # env var: LOTUS_SEALING_FINALIZEEARLY
#FinalizeEarly = false #FinalizeEarly = false
# After sealing CC sectors, make them available for upgrading with deals
#
# type: bool
# env var: LOTUS_SEALING_MAKECCSECTORSAVAILABLE
#MakeCCSectorsAvailable = false
# Whether to use available miner balance for sector collateral instead of sending it with each message # Whether to use available miner balance for sector collateral instead of sending it with each message
# #
# type: bool # type: bool

View File

@ -12,6 +12,7 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"go.uber.org/multierr"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
@ -589,7 +590,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
return xerrors.Errorf("acquiring sector lock: %w", err) return xerrors.Errorf("acquiring sector lock: %w", err)
} }
fts := storiface.FTUnsealed moveUnsealed := storiface.FTUnsealed
{ {
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
if err != nil { if err != nil {
@ -597,7 +598,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
} }
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
fts = storiface.FTNone moveUnsealed = storiface.FTNone
} }
} }
@ -616,10 +617,10 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
} }
} }
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, false) selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector, err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|fts, pathType, storiface.AcquireMove), m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed)) _, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed))
return err return err
@ -628,8 +629,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
return err return err
} }
fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathStorage) move := func(types storiface.SectorFileType) error {
moveUnsealed := fts fetchSel := newAllocSelector(m.index, types, storiface.PathStorage)
{ {
if len(keepUnsealed) == 0 { if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone moveUnsealed = storiface.FTNone
@ -637,14 +638,22 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
} }
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), m.schedFetch(sector, types, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed)) _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, types))
return err return err
}) })
if err != nil { if err != nil {
return xerrors.Errorf("moving sector to storage: %w", err) return xerrors.Errorf("moving sector to storage: %w", err)
} }
return nil
}
err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache))
err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called
if moveUnsealed != storiface.FTNone {
err = multierr.Append(err, move(moveUnsealed))
}
return nil return nil
} }

View File

@ -172,7 +172,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
return return
} }
if err := handler.Local.Remove(r.Context(), id, ft, false, []ID{ID(r.FormValue("keep"))}); err != nil { if err := handler.Local.Remove(r.Context(), id, ft, false, ParseIDList(r.FormValue("keep"))); err != nil {
log.Errorf("%+v", err) log.Errorf("%+v", err)
w.WriteHeader(500) w.WriteHeader(500)
return return

View File

@ -7,6 +7,7 @@ import (
"net/url" "net/url"
gopath "path" gopath "path"
"sort" "sort"
"strings"
"sync" "sync"
"time" "time"
@ -29,6 +30,27 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5
// filesystem, local or networked / shared by multiple machines // filesystem, local or networked / shared by multiple machines
type ID string type ID string
const IDSep = "."
type IDList []ID
func (il IDList) String() string {
l := make([]string, len(il))
for i, id := range il {
l[i] = string(id)
}
return strings.Join(l, IDSep)
}
func ParseIDList(s string) IDList {
strs := strings.Split(s, IDSep)
out := make([]ID, len(strs))
for i, str := range strs {
out[i] = ID(str)
}
return out
}
type Group = string type Group = string
type StorageInfo struct { type StorageInfo struct {

View File

@ -44,12 +44,36 @@ type Remote struct {
pfHandler PartialFileHandler pfHandler PartialFileHandler
} }
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error { func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, typ storiface.SectorFileType) error {
// TODO: do this on remotes too if bits.OnesCount(uint(typ)) != 1 {
// (not that we really need to do that since it's always called by the return xerrors.New("RemoveCopies expects one file type")
// worker which pulled the copy) }
return r.local.RemoveCopies(ctx, s, types) if err := r.local.RemoveCopies(ctx, s, typ); err != nil {
return xerrors.Errorf("removing local copies: %w", err)
}
si, err := r.index.StorageFindSector(ctx, s, typ, 0, false)
if err != nil {
return xerrors.Errorf("finding existing sector %d(t:%d) failed: %w", s, typ, err)
}
var hasPrimary bool
var keep []ID
for _, info := range si {
if info.Primary {
hasPrimary = true
keep = append(keep, info.ID)
break
}
}
if !hasPrimary {
log.Warnf("remote RemoveCopies: no primary copies of sector %v (%s), not removing anything", s, typ)
return nil
}
return r.Remove(ctx, s, typ, true, keep)
} }
func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler PartialFileHandler) *Remote { func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, pfHandler PartialFileHandler) *Remote {
@ -156,7 +180,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
if op == storiface.AcquireMove { if op == storiface.AcquireMove {
id := ID(storageID) id := ID(storageID)
if err := r.deleteFromRemote(ctx, url, &id); err != nil { if err := r.deleteFromRemote(ctx, url, []ID{id}); err != nil {
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
} }
} }
@ -355,7 +379,7 @@ storeLoop:
} }
} }
for _, url := range info.URLs { for _, url := range info.URLs {
if err := r.deleteFromRemote(ctx, url, nil); err != nil { if err := r.deleteFromRemote(ctx, url, keepIn); err != nil {
log.Warnf("remove %s: %+v", url, err) log.Warnf("remove %s: %+v", url, err)
continue continue
} }
@ -366,9 +390,9 @@ storeLoop:
return nil return nil
} }
func (r *Remote) deleteFromRemote(ctx context.Context, url string, keepIn *ID) error { func (r *Remote) deleteFromRemote(ctx context.Context, url string, keepIn IDList) error {
if keepIn != nil { if keepIn != nil {
url = url + "?keep=" + string(*keepIn) url = url + "?keep=" + keepIn.String()
} }
log.Infof("Delete %s", url) log.Infof("Delete %s", url)

View File

@ -516,7 +516,20 @@ func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) { func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { return l.asyncCall(ctx, sector, MoveStorage, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return nil, l.storage.MoveStorage(ctx, sector, types) if err := l.storage.MoveStorage(ctx, sector, types); err != nil {
return nil, xerrors.Errorf("move to storage: %w", err)
}
for _, fileType := range storiface.PathTypes {
if fileType&types == 0 {
continue
}
if err := l.storage.RemoveCopies(ctx, sector.ID, fileType); err != nil {
return nil, xerrors.Errorf("rm copies (t:%s, s:%v): %w", fileType, sector, err)
}
}
return nil, nil
}) })
} }

View File

@ -111,6 +111,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Committing: planCommitting, Committing: planCommitting,
CommitFinalize: planOne( CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit), on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
on(SectorFinalizeFailed{}, CommitFinalizeFailed), on(SectorFinalizeFailed{}, CommitFinalizeFailed),
), ),
SubmitCommit: planOne( SubmitCommit: planOne(
@ -136,6 +137,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
FinalizeSector: planOne( FinalizeSector: planOne(
on(SectorFinalized{}, Proving), on(SectorFinalized{}, Proving),
on(SectorFinalizedAvailable{}, Available),
on(SectorFinalizeFailed{}, FinalizeFailed), on(SectorFinalizeFailed{}, FinalizeFailed),
), ),
@ -283,7 +285,11 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Proving: planOne( Proving: planOne(
on(SectorFaultReported{}, FaultReported), on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty), on(SectorFaulty{}, Faulty),
on(SectorMarkForUpdate{}, Available),
),
Available: planOne(
on(SectorStartCCUpdate{}, SnapDealsWaitDeals), on(SectorStartCCUpdate{}, SnapDealsWaitDeals),
on(SectorAbortUpgrade{}, Proving),
), ),
Terminating: planOne( Terminating: planOne(
on(SectorTerminating{}, TerminateWait), on(SectorTerminating{}, TerminateWait),
@ -558,6 +564,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Post-seal // Post-seal
case Proving: case Proving:
return m.handleProvingSector, processed, nil return m.handleProvingSector, processed, nil
case Available:
return m.handleAvailableSector, processed, nil
case Terminating: case Terminating:
return m.handleTerminating, processed, nil return m.handleTerminating, processed, nil
case TerminateWait: case TerminateWait:

View File

@ -286,6 +286,10 @@ type SectorFinalized struct{}
func (evt SectorFinalized) apply(*SectorInfo) {} func (evt SectorFinalized) apply(*SectorInfo) {}
type SectorFinalizedAvailable struct{}
func (evt SectorFinalizedAvailable) apply(*SectorInfo) {}
type SectorRetryFinalize struct{} type SectorRetryFinalize struct{}
func (evt SectorRetryFinalize) apply(*SectorInfo) {} func (evt SectorRetryFinalize) apply(*SectorInfo) {}
@ -297,6 +301,10 @@ func (evt SectorFinalizeFailed) apply(*SectorInfo) {}
// Snap deals // CC update path // Snap deals // CC update path
type SectorMarkForUpdate struct{}
func (evt SectorMarkForUpdate) apply(state *SectorInfo) {}
type SectorStartCCUpdate struct{} type SectorStartCCUpdate struct{}
func (evt SectorStartCCUpdate) apply(state *SectorInfo) { func (evt SectorStartCCUpdate) apply(state *SectorInfo) {

View File

@ -12,11 +12,14 @@ import (
"github.com/filecoin-project/go-commp-utils/zerocomm" "github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
@ -30,8 +33,8 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
m.inputLk.Lock() m.inputLk.Lock()
if m.creating != nil && *m.creating == sector.SectorNumber { if m.nextDealSector != nil && *m.nextDealSector == sector.SectorNumber {
m.creating = nil m.nextDealSector = nil
} }
sid := m.minerSectorID(sector.SectorNumber) sid := m.minerSectorID(sector.SectorNumber)
@ -384,8 +387,29 @@ func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {
return m.updateInput(ctx, sp) return m.updateInput(ctx, sp)
} }
type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error)
// called with m.inputLk // called with m.inputLk
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error { func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
memo := make(map[abi.SectorNumber]struct {
e abi.ChainEpoch
p abi.TokenAmount
})
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
if e, ok := memo[sn]; ok {
return e.e, e.p, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, big.Zero(), err
}
memo[sn] = struct {
e abi.ChainEpoch
p abi.TokenAmount
}{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge}
return onChainInfo.Expiration, onChainInfo.InitialPledge, nil
}
ssize, err := sp.SectorSize() ssize, err := sp.SectorSize()
if err != nil { if err != nil {
return err return err
@ -411,19 +435,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
toAssign[proposalCid] = struct{}{} toAssign[proposalCid] = struct{}{}
memo := make(map[abi.SectorNumber]abi.ChainEpoch)
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, error) {
if exp, ok := memo[sn]; ok {
return exp, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, err
}
memo[sn] = onChainInfo.Expiration
return onChainInfo.Expiration, nil
}
for id, sector := range m.openSectors { for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
// check that sector lifetime is long enough to fit deal using latest expiration from on chain // check that sector lifetime is long enough to fit deal using latest expiration from on chain
@ -434,7 +445,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
continue continue
} }
if !ok { if !ok {
exp, _ := expF(sector.number) exp, _, _ := expF(sector.number)
log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch) log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
continue continue
} }
@ -497,7 +508,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
if len(toAssign) > 0 { if len(toAssign) > 0 {
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors) log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
if err := m.tryCreateDealSector(ctx, sp); err != nil { if err := m.tryGetDealSector(ctx, sp, expF); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err) log.Errorw("Failed to create a new sector for deals", "error", err)
} }
} }
@ -505,10 +516,113 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
return nil return nil
} }
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize) (minTarget, target abi.ChainEpoch, err error) {
var candidates []*pendingPiece
for _, piece := range m.pendingPieces {
if piece.assigned {
continue // already assigned to a sector, skip
}
candidates = append(candidates, piece)
}
// earliest expiration first
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].deal.DealProposal.EndEpoch < candidates[j].deal.DealProposal.EndEpoch
})
var totalBytes uint64
for _, candidate := range candidates {
totalBytes += uint64(candidate.size)
if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) {
return candidates[0].deal.DealProposal.EndEpoch, candidate.deal.DealProposal.EndEpoch, nil
}
}
_, curEpoch, err := m.Api.ChainHead(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current epoch: %w", err)
}
minDur, maxDur := policy.DealDurationBounds(0)
return curEpoch + minDur, curEpoch + maxDur, nil
}
func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}
ssize, err := sp.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
}
minExpiration, targetExpiration, err := m.calcTargetExpiration(ctx, ssize)
if err != nil {
return false, xerrors.Errorf("calculating min target expiration: %w", err)
}
var candidate abi.SectorID
var bestExpiration abi.ChainEpoch
bestPledge := types.TotalFilecoinInt
for s := range m.available {
expiration, pledge, err := ef(s.Number)
if err != nil {
log.Errorw("checking sector expiration", "error", err)
continue
}
slowChecks := func(sid abi.SectorNumber) bool {
active, err := m.sectorActive(ctx, TipSetToken{}, sid)
if err != nil {
log.Errorw("checking sector active", "error", err)
return false
}
if !active {
log.Debugw("skipping available sector", "reason", "not active")
return false
}
return true
}
// if best is below target, we want larger expirations
// if best is above target, we want lower pledge, but only if still above target
if bestExpiration < targetExpiration {
if expiration > bestExpiration && slowChecks(s.Number) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
continue
}
if expiration >= targetExpiration && pledge.LessThan(bestPledge) && slowChecks(s.Number) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
}
if bestExpiration < minExpiration {
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpiration, "min", minExpiration, "candidate", candidate)
// didn't find a good sector / no sectors were available
return false, nil
}
log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge))
delete(m.available, candidate)
m.nextDealSector = &candidate.Number
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}
func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait() m.startupWait.Wait()
if m.creating != nil { if m.nextDealSector != nil {
return nil // new sector is being created right now return nil // new sector is being created right now
} }
@ -517,10 +631,6 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return xerrors.Errorf("getting storage config: %w", err) return xerrors.Errorf("getting storage config: %w", err)
} }
if !cfg.MakeNewSectorForDeals {
return nil
}
if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals { if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil return nil
} }
@ -529,12 +639,24 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return nil return nil
} }
got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}
if !cfg.MakeNewSectorForDeals {
return nil
}
sid, err := m.createSector(ctx, cfg, sp) sid, err := m.createSector(ctx, cfg, sp)
if err != nil { if err != nil {
return err return err
} }
m.creating = &sid m.nextDealSector = &sid
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{ return m.sectors.Send(uint64(sid), SectorStart{
@ -573,6 +695,11 @@ func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error { func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error {
m.startupWait.Wait() m.startupWait.Wait()
m.inputLk.Lock()
// always do this early
delete(m.available, m.minerSectorID(sid))
m.inputLk.Unlock()
log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user") log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")}) return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")})
} }

View File

@ -9,6 +9,7 @@ import (
reflect "reflect" reflect "reflect"
address "github.com/filecoin-project/go-address" address "github.com/filecoin-project/go-address"
bitfield "github.com/filecoin-project/go-bitfield"
abi "github.com/filecoin-project/go-state-types/abi" abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big" big "github.com/filecoin-project/go-state-types/big"
crypto "github.com/filecoin-project/go-state-types/crypto" crypto "github.com/filecoin-project/go-state-types/crypto"
@ -214,10 +215,10 @@ func (mr *MockSealingAPIMockRecorder) StateMarketStorageDealProposal(arg0, arg1,
} }
// StateMinerActiveSectors mocks base method. // StateMinerActiveSectors mocks base method.
func (m *MockSealingAPI) StateMinerActiveSectors(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) ([]*miner.SectorOnChainInfo, error) { func (m *MockSealingAPI) StateMinerActiveSectors(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (bitfield.BitField, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateMinerActiveSectors", arg0, arg1, arg2) ret := m.ctrl.Call(m, "StateMinerActiveSectors", arg0, arg1, arg2)
ret0, _ := ret[0].([]*miner.SectorOnChainInfo) ret0, _ := ret[0].(bitfield.BitField)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }

View File

@ -20,6 +20,8 @@ type Config struct {
MakeNewSectorForDeals bool MakeNewSectorForDeals bool
MakeCCSectorsAvailable bool
WaitDealsDelay time.Duration WaitDealsDelay time.Duration
CommittedCapacitySectorLifetime time.Duration CommittedCapacitySectorLifetime time.Duration

View File

@ -13,6 +13,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
@ -63,7 +64,7 @@ type SealingAPI interface {
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, TipSetToken) (big.Int, error) StateMinerAvailableBalance(context.Context, address.Address, TipSetToken) (big.Int, error)
StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error) StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error)
StateMinerActiveSectors(context.Context, address.Address, TipSetToken) ([]*miner.SectorOnChainInfo, error) StateMinerActiveSectors(context.Context, address.Address, TipSetToken) (bitfield.BitField, error)
StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (*api.MarketDeal, error) StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (*api.MarketDeal, error)
StateMarketStorageDealProposal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) StateMarketStorageDealProposal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error)
StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error) StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error)
@ -104,10 +105,11 @@ type Sealing struct {
sectorTimers map[abi.SectorID]*time.Timer sectorTimers map[abi.SectorID]*time.Timer
pendingPieces map[cid.Cid]*pendingPiece pendingPieces map[cid.Cid]*pendingPiece
assignedPieces map[abi.SectorID][]cid.Cid assignedPieces map[abi.SectorID][]cid.Cid
creating *abi.SectorNumber // used to prevent a race where we could create a new sector more than once nextDealSector *abi.SectorNumber // used to prevent a race where we could create a new sector more than once
upgradeLk sync.Mutex upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{} toUpgrade map[abi.SectorNumber]struct{}
available map[abi.SectorID]struct{}
notifee SectorStateNotifee notifee SectorStateNotifee
addrSel AddrSel addrSel AddrSel
@ -129,11 +131,11 @@ type openSector struct {
maybeAccept func(cid.Cid) error // called with inputLk maybeAccept func(cid.Cid) error // called with inputLk
} }
func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi.SectorNumber) (abi.ChainEpoch, error)) (bool, error) { func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF expFn) (bool, error) {
if !o.ccUpdate { if !o.ccUpdate {
return true, nil return true, nil
} }
expiration, err := expF(o.number) expiration, _, err := expF(o.number)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -179,6 +181,8 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
assignedPieces: map[abi.SectorID][]cid.Cid{}, assignedPieces: map[abi.SectorID][]cid.Cid{},
toUpgrade: map[abi.SectorNumber]struct{}{}, toUpgrade: map[abi.SectorNumber]struct{}{},
available: map[abi.SectorID]struct{}{},
notifee: notifee, notifee: notifee,
addrSel: as, addrSel: as,

View File

@ -25,6 +25,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
CommitAggregateWait: {}, CommitAggregateWait: {},
FinalizeSector: {}, FinalizeSector: {},
Proving: {}, Proving: {},
Available: {},
FailedUnrecoverable: {}, FailedUnrecoverable: {},
SealPreCommit1Failed: {}, SealPreCommit1Failed: {},
SealPreCommit2Failed: {}, SealPreCommit2Failed: {},
@ -98,6 +99,7 @@ const (
FinalizeSector SectorState = "FinalizeSector" FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving" Proving SectorState = "Proving"
Available SectorState = "Available" // proving CC available for SnapDeals
// snap deals / cc update // snap deals / cc update
SnapDealsWaitDeals SectorState = "SnapDealsWaitDeals" SnapDealsWaitDeals SectorState = "SnapDealsWaitDeals"
@ -161,7 +163,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
return sstProving return sstProving
} }
return sstSealing return sstSealing
case Proving, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: case Proving, Available, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving return sstProving
} }

View File

@ -238,7 +238,7 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
} }
// Abort upgrade for sectors that went faulty since being marked for upgrade // Abort upgrade for sectors that went faulty since being marked for upgrade
active, err := sectorActive(ctx.Context(), m.Api, m.maddr, tok, sector.SectorNumber) active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber)
if err != nil { if err != nil {
log.Errorf("sector active check: api error, not proceeding: %+v", err) log.Errorf("sector active check: api error, not proceeding: %+v", err)
return nil return nil

View File

@ -130,6 +130,11 @@ func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) er
func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: track sector health / expiration // TODO: track sector health / expiration
m.inputLk.Lock()
// in case we revert into Proving without going into Available
delete(m.available, m.minerSectorID(sector.SectorNumber))
m.inputLk.Unlock()
cfg, err := m.getConfig() cfg, err := m.getConfig()
if err != nil { if err != nil {
return xerrors.Errorf("getting sealing config: %w", err) return xerrors.Errorf("getting sealing config: %w", err)
@ -144,3 +149,13 @@ func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInf
return nil return nil
} }
func (m *Sealing) handleAvailableSector(ctx statemachine.Context, sector SectorInfo) error {
m.inputLk.Lock()
m.available[m.minerSectorID(sector.SectorNumber)] = struct{}{}
m.inputLk.Unlock()
// TODO: Watch termination
// TODO: Auto-extend if set
return nil
}

View File

@ -41,7 +41,7 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect
log.Errorf("handleProveReplicaUpdate: api error, not proceeding: %+v", err) log.Errorf("handleProveReplicaUpdate: api error, not proceeding: %+v", err)
return nil return nil
} }
active, err := sectorActive(ctx.Context(), m.Api, m.maddr, tok, sector.SectorNumber) active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber)
if err != nil { if err != nil {
log.Errorf("sector active check: api error, not proceeding: %+v", err) log.Errorf("sector active check: api error, not proceeding: %+v", err)
return nil return nil

View File

@ -782,5 +782,8 @@ func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorIn
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
} }
if cfg.MakeCCSectorsAvailable && !sector.hasDeals() {
return ctx.Send(SectorFinalizedAvailable{})
}
return ctx.Send(SectorFinalized{}) return ctx.Send(SectorFinalized{})
} }

View File

@ -3,14 +3,13 @@ package sealing
import ( import (
"context" "context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
market7 "github.com/filecoin-project/specs-actors/v7/actors/builtin/market"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
market7 "github.com/filecoin-project/specs-actors/v7/actors/builtin/market"
) )
func (m *Sealing) IsMarkedForUpgrade(id abi.SectorNumber) bool { func (m *Sealing) IsMarkedForUpgrade(id abi.SectorNumber) bool {
@ -50,17 +49,6 @@ func (m *Sealing) MarkForUpgrade(ctx context.Context, id abi.SectorNumber) error
} }
func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error { func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}
curStaging := m.stats.curStaging()
if cfg.MaxWaitDealsSectors > 0 && curStaging >= cfg.MaxWaitDealsSectors {
return xerrors.Errorf("already waiting for deals in %d >= %d (cfg.MaxWaitDealsSectors) sectors, no free resources to wait for deals in another",
curStaging, cfg.MaxWaitDealsSectors)
}
si, err := m.GetSectorInfo(id) si, err := m.GetSectorInfo(id)
if err != nil { if err != nil {
return xerrors.Errorf("getting sector info: %w", err) return xerrors.Errorf("getting sector info: %w", err)
@ -70,11 +58,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
return xerrors.Errorf("can't mark sectors not in the 'Proving' state for upgrade") return xerrors.Errorf("can't mark sectors not in the 'Proving' state for upgrade")
} }
if len(si.Pieces) != 1 { if si.hasDeals() {
return xerrors.Errorf("not a committed-capacity sector, expected 1 piece")
}
if si.Pieces[0].DealInfo != nil {
return xerrors.Errorf("not a committed-capacity sector, has deals") return xerrors.Errorf("not a committed-capacity sector, has deals")
} }
@ -87,7 +71,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
return xerrors.Errorf("failed to read sector on chain info: %w", err) return xerrors.Errorf("failed to read sector on chain info: %w", err)
} }
active, err := sectorActive(ctx, m.Api, m.maddr, tok, id) active, err := m.sectorActive(ctx, tok, id)
if err != nil { if err != nil {
return xerrors.Errorf("failed to check if sector is active") return xerrors.Errorf("failed to check if sector is active")
} }
@ -100,24 +84,16 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
"Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration) "Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration)
} }
return m.sectors.Send(uint64(id), SectorStartCCUpdate{}) return m.sectors.Send(uint64(id), SectorMarkForUpdate{})
} }
func sectorActive(ctx context.Context, api SealingAPI, maddr address.Address, tok TipSetToken, sector abi.SectorNumber) (bool, error) { func (m *Sealing) sectorActive(ctx context.Context, tok TipSetToken, sector abi.SectorNumber) (bool, error) {
active, err := api.StateMinerActiveSectors(ctx, maddr, tok) active, err := m.Api.StateMinerActiveSectors(ctx, m.maddr, tok)
if err != nil { if err != nil {
return false, xerrors.Errorf("failed to check active sectors: %w", err) return false, xerrors.Errorf("failed to check active sectors: %w", err)
} }
// Ensure the upgraded sector is active return active.IsSet(uint64(sector))
var found bool
for _, si := range active {
if si.SectorNumber == sector {
found = true
break
}
}
return found, nil
} }
func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreCommitInfo) big.Int { func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreCommitInfo) big.Int {

2
go.mod
View File

@ -113,7 +113,7 @@ require (
github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-eventbus v0.2.1 github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p v0.18.0-rc6 github.com/libp2p/go-libp2p v0.18.0-rc6
github.com/libp2p/go-libp2p-connmgr v0.3.1 // indirect github.com/libp2p/go-libp2p-connmgr v0.3.1
github.com/libp2p/go-libp2p-core v0.14.0 github.com/libp2p/go-libp2p-core v0.14.0
github.com/libp2p/go-libp2p-discovery v0.6.0 github.com/libp2p/go-libp2p-discovery v0.6.0
github.com/libp2p/go-libp2p-kad-dht v0.15.0 github.com/libp2p/go-libp2p-kad-dht v0.15.0

View File

@ -10,12 +10,14 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/itests/kit"
) )
func TestCCUpgrade(t *testing.T) { func TestCCUpgrade(t *testing.T) {
@ -48,9 +50,6 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
fmt.Printf("CCUpgrade: %d\n", CCUpgrade) fmt.Printf("CCUpgrade: %d\n", CCUpgrade)
// wait for deadline 0 to pass so that committing starts after post on preseals
// this gives max time for post to complete minimizing chances of timeout
// waitForDeadline(ctx, t, 1, client, maddr)
miner.PledgeSectors(ctx, 1, 0, nil) miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx) sl, err := miner.SectorsList(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -89,18 +88,6 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
return client return client
} }
func waitForDeadline(ctx context.Context, t *testing.T, waitIdx uint64, node *kit.TestFullNode, maddr address.Address) {
for {
ts, err := node.ChainHead(ctx)
require.NoError(t, err)
dl, err := node.StateMinerProvingDeadline(ctx, maddr, ts.Key())
require.NoError(t, err)
if dl.Index == waitIdx {
return
}
}
}
func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) { func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) {
for { for {
active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK) active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK)
@ -116,18 +103,6 @@ func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber,
} }
} }
func waitForSectorStartUpgrade(ctx context.Context, t *testing.T, sn abi.SectorNumber, miner *kit.TestMiner) {
for {
si, err := miner.StorageMiner.SectorsStatus(ctx, sn, false)
require.NoError(t, err)
if si.State != api.SectorState("Proving") {
t.Logf("Done proving sector in state: %s", si.State)
return
}
}
}
func TestCCUpgradeAndPoSt(t *testing.T) { func TestCCUpgradeAndPoSt(t *testing.T) {
kit.QuietMiningLogs() kit.QuietMiningLogs()
t.Run("upgrade and then post", func(t *testing.T) { t.Run("upgrade and then post", func(t *testing.T) {
@ -148,13 +123,13 @@ func TestCCUpgradeAndPoSt(t *testing.T) {
}) })
} }
func TestTooManyMarkedForUpgrade(t *testing.T) { func TestAbortUpgradeAvailable(t *testing.T) {
kit.QuietMiningLogs() kit.QuietMiningLogs()
ctx := context.Background() ctx := context.Background()
blockTime := 1 * time.Millisecond blockTime := 1 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15)) client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC())
ens.InterconnectAll().BeginMiningMustPost(blockTime) ens.InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx) maddr, err := miner.ActorAddress(ctx)
@ -163,32 +138,53 @@ func TestTooManyMarkedForUpgrade(t *testing.T) {
} }
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
waitForDeadline(ctx, t, 1, client, maddr) fmt.Printf("CCUpgrade: %d\n", CCUpgrade)
miner.PledgeSectors(ctx, 3, 0, nil)
miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx) sl, err := miner.SectorsList(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, sl, 3, "expected 3 sectors") require.Len(t, sl, 1, "expected 1 sector")
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
{ {
si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK)
require.NoError(t, err) require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration)) require.Less(t, 50000, int(si.Expiration))
} }
waitForSectorActive(ctx, t, CCUpgrade, client, maddr) waitForSectorActive(ctx, t, CCUpgrade, client, maddr)
waitForSectorActive(ctx, t, CCUpgrade+1, client, maddr)
waitForSectorActive(ctx, t, CCUpgrade+2, client, maddr)
err = miner.SectorMarkForUpgrade(ctx, CCUpgrade, true) err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
require.NoError(t, err)
err = miner.SectorMarkForUpgrade(ctx, CCUpgrade+1, true)
require.NoError(t, err) require.NoError(t, err)
waitForSectorStartUpgrade(ctx, t, CCUpgrade, miner) sl, err = miner.SectorsList(ctx)
waitForSectorStartUpgrade(ctx, t, CCUpgrade+1, miner) require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
err = miner.SectorMarkForUpgrade(ctx, CCUpgrade+2, true) ss, err := miner.SectorsStatus(ctx, sl[0], false)
require.Error(t, err) require.NoError(t, err)
assert.Contains(t, err.Error(), "no free resources to wait for deals")
for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Proving) {
time.Sleep(50 * time.Millisecond)
continue
}
require.Equal(t, api.SectorState(sealing.Available), ss.State)
break
}
require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0]))
for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Available) {
time.Sleep(50 * time.Millisecond)
continue
}
require.Equal(t, api.SectorState(sealing.Proving), ss.State)
break
}
} }

View File

@ -15,5 +15,6 @@ func QuietMiningLogs() {
_ = logging.SetLogLevel("storageminer", "ERROR") _ = logging.SetLogLevel("storageminer", "ERROR")
_ = logging.SetLogLevel("pubsub", "ERROR") _ = logging.SetLogLevel("pubsub", "ERROR")
_ = logging.SetLogLevel("gen", "ERROR") _ = logging.SetLogLevel("gen", "ERROR")
_ = logging.SetLogLevel("rpc", "ERROR")
_ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR") _ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR")
} }

View File

@ -40,6 +40,7 @@ func DataTransferLogger(event datatransfer.Event, state datatransfer.ChannelStat
"sent", state.Sent(), "sent", state.Sent(),
"received", state.Received(), "received", state.Received(),
"queued", state.Queued(), "queued", state.Queued(),
"received count", state.ReceivedCidsTotal(),
"total size", state.TotalSize(), "total size", state.TotalSize(),
"remote peer", state.OtherPeer(), "remote peer", state.OtherPeer(),
"event message", event.Message, "event message", event.Message,

View File

@ -222,7 +222,7 @@ var LibP2P = Options(
Override(ConnGaterKey, lp2p.ConnGaterOption), Override(ConnGaterKey, lp2p.ConnGaterOption),
// Services (resource management) // Services (resource management)
Override(new(network.ResourceManager), lp2p.ResourceManager), Override(new(network.ResourceManager), lp2p.ResourceManager(200)),
Override(ResourceManagerKey, lp2p.ResourceManagerOption), Override(ResourceManagerKey, lp2p.ResourceManagerOption),
) )
@ -282,6 +282,7 @@ func ConfigCommon(cfg *config.Common, enableLibp2pNode bool) Option {
cfg.Libp2p.ConnMgrHigh, cfg.Libp2p.ConnMgrHigh,
time.Duration(cfg.Libp2p.ConnMgrGrace), time.Duration(cfg.Libp2p.ConnMgrGrace),
cfg.Libp2p.ProtectedPeers)), cfg.Libp2p.ProtectedPeers)),
Override(new(network.ResourceManager), lp2p.ResourceManager(cfg.Libp2p.ConnMgrHigh)),
Override(new(*pubsub.PubSub), lp2p.GossipSub), Override(new(*pubsub.PubSub), lp2p.GossipSub),
Override(new(*config.Pubsub), &cfg.Pubsub), Override(new(*config.Pubsub), &cfg.Pubsub),

View File

@ -189,7 +189,7 @@ func DefaultStorageMiner() *StorageMiner {
}, },
IndexProvider: IndexProviderConfig{ IndexProvider: IndexProviderConfig{
Enable: false, Enable: true,
EntriesCacheCapacity: 1024, EntriesCacheCapacity: 1024,
EntriesChunkSize: 16384, EntriesChunkSize: 16384,
TopicName: "/indexer/ingest/mainnet", TopicName: "/indexer/ingest/mainnet",

View File

@ -73,6 +73,6 @@ func TestDefaultMinerRoundtrip(t *testing.T) {
func TestDefaultStorageMiner_SetsIndexIngestTopic(t *testing.T) { func TestDefaultStorageMiner_SetsIndexIngestTopic(t *testing.T) {
subject := DefaultStorageMiner() subject := DefaultStorageMiner()
require.False(t, subject.IndexProvider.Enable) require.True(t, subject.IndexProvider.Enable)
require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.TopicName) require.Equal(t, "/indexer/ingest/mainnet", subject.IndexProvider.TopicName)
} }

View File

@ -750,6 +750,12 @@ avoid the relatively high cost of unsealing the data later, at the cost of more
Comment: `Run sector finalization before submitting sector proof to the chain`, Comment: `Run sector finalization before submitting sector proof to the chain`,
}, },
{
Name: "MakeCCSectorsAvailable",
Type: "bool",
Comment: `After sealing CC sectors, make them available for upgrading with deals`,
},
{ {
Name: "CollateralFromMinerBalance", Name: "CollateralFromMinerBalance",
Type: "bool", Type: "bool",

View File

@ -250,6 +250,9 @@ type SealingConfig struct {
// Run sector finalization before submitting sector proof to the chain // Run sector finalization before submitting sector proof to the chain
FinalizeEarly bool FinalizeEarly bool
// After sealing CC sectors, make them available for upgrading with deals
MakeCCSectorsAvailable bool
// Whether to use available miner balance for sector collateral instead of sending it with each message // Whether to use available miner balance for sector collateral instead of sending it with each message
CollateralFromMinerBalance bool CollateralFromMinerBalance bool
// Minimum available balance to keep in the miner actor before sending it with messages // Minimum available balance to keep in the miner actor before sending it with messages

View File

@ -10,10 +10,10 @@ import (
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"go.uber.org/fx" "go.uber.org/fx"
) )

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/bits"
"os" "os"
"path/filepath" "path/filepath"
@ -15,6 +16,8 @@ import (
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager" rcmgr "github.com/libp2p/go-libp2p-resource-manager"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
@ -22,7 +25,50 @@ import (
"go.opencensus.io/tag" "go.opencensus.io/tag"
) )
func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) { func ResourceManager(connMgrHi uint) func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
return func(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceManager, error) {
envvar := os.Getenv("LOTUS_RCMGR")
if envvar == "" || envvar == "0" {
// TODO opt-in for now -- flip this to enabled by default once we are comfortable with testing
log.Info("libp2p resource manager is disabled")
return network.NullResourceManager, nil
}
log.Info("libp2p resource manager is enabled")
// enable debug logs for rcmgr
logging.SetLogLevel("rcmgr", "debug")
// Adjust default limits
// - give it more memory, up to 4G, min of 1G
// - if maxconns are too high, adjust Conn/FD/Stream limits
defaultLimits := rcmgr.DefaultLimits.WithSystemMemory(.125, 1<<30, 4<<30)
maxconns := int(connMgrHi)
if 2*maxconns > defaultLimits.SystemBaseLimit.ConnsInbound {
// adjust conns to 2x to allow for two conns per peer (TCP+QUIC)
defaultLimits.SystemBaseLimit.ConnsInbound = logScale(2 * maxconns)
defaultLimits.SystemBaseLimit.ConnsOutbound = logScale(2 * maxconns)
defaultLimits.SystemBaseLimit.Conns = logScale(4 * maxconns)
defaultLimits.SystemBaseLimit.StreamsInbound = logScale(16 * maxconns)
defaultLimits.SystemBaseLimit.StreamsOutbound = logScale(64 * maxconns)
defaultLimits.SystemBaseLimit.Streams = logScale(64 * maxconns)
if 2*maxconns > defaultLimits.SystemBaseLimit.FD {
defaultLimits.SystemBaseLimit.FD = logScale(2 * maxconns)
}
defaultLimits.ServiceBaseLimit.StreamsInbound = logScale(8 * maxconns)
defaultLimits.ServiceBaseLimit.StreamsOutbound = logScale(32 * maxconns)
defaultLimits.ServiceBaseLimit.Streams = logScale(32 * maxconns)
defaultLimits.ProtocolBaseLimit.StreamsInbound = logScale(8 * maxconns)
defaultLimits.ProtocolBaseLimit.StreamsOutbound = logScale(32 * maxconns)
defaultLimits.ProtocolBaseLimit.Streams = logScale(32 * maxconns)
log.Info("adjusted default resource manager limits")
}
// initialize
var limiter *rcmgr.BasicLimiter var limiter *rcmgr.BasicLimiter
var opts []rcmgr.Option var opts []rcmgr.Option
@ -34,13 +80,13 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan
switch { switch {
case err == nil: case err == nil:
defer limitsIn.Close() //nolint:errcheck defer limitsIn.Close() //nolint:errcheck
limiter, err = rcmgr.NewDefaultLimiterFromJSON(limitsIn) limiter, err = rcmgr.NewLimiterFromJSON(limitsIn, defaultLimits)
if err != nil { if err != nil {
return nil, fmt.Errorf("error parsing limit file: %w", err) return nil, fmt.Errorf("error parsing limit file: %w", err)
} }
case errors.Is(err, os.ErrNotExist): case errors.Is(err, os.ErrNotExist):
limiter = rcmgr.NewDefaultLimiter() limiter = rcmgr.NewStaticLimiter(defaultLimits)
default: default:
return nil, err return nil, err
@ -72,6 +118,12 @@ func ResourceManager(lc fx.Lifecycle, repo repo.LockedRepo) (network.ResourceMan
return mgr, nil return mgr, nil
} }
}
func logScale(val int) int {
bitlen := bits.Len(uint(val))
return 1 << bitlen
}
func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts { func ResourceManagerOption(mgr network.ResourceManager) Libp2pOpts {
return Libp2pOpts{ return Libp2pOpts{

View File

@ -897,6 +897,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime), CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime),
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable,
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy, AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.FinalizeEarly, FinalizeEarly: cfg.FinalizeEarly,
@ -935,6 +936,7 @@ func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
MakeNewSectorForDeals: cfg.Dealmaking.MakeNewSectorForDeals, MakeNewSectorForDeals: cfg.Dealmaking.MakeNewSectorForDeals,
CommittedCapacitySectorLifetime: time.Duration(cfg.Sealing.CommittedCapacitySectorLifetime), CommittedCapacitySectorLifetime: time.Duration(cfg.Sealing.CommittedCapacitySectorLifetime),
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
MakeCCSectorsAvailable: cfg.Sealing.MakeCCSectorsAvailable,
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly, FinalizeEarly: cfg.Sealing.FinalizeEarly,

View File

@ -9,6 +9,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
@ -112,13 +113,25 @@ func (s SealingAPIAdapter) StateMinerSectorAllocated(ctx context.Context, maddr
return s.delegate.StateMinerSectorAllocated(ctx, maddr, sid, tsk) return s.delegate.StateMinerSectorAllocated(ctx, maddr, sid, tsk)
} }
func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) ([]*miner.SectorOnChainInfo, error) { func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) (bitfield.BitField, error) {
tsk, err := types.TipSetKeyFromBytes(tok) tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil { if err != nil {
return nil, xerrors.Errorf("faile dto unmarshal TipSetToken to TipSetKey: %w", err) return bitfield.BitField{}, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err)
} }
return s.delegate.StateMinerActiveSectors(ctx, maddr, tsk) act, err := s.delegate.StateGetActor(ctx, maddr, tsk)
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("getting miner actor: temp error: %+v", err)
}
stor := store.ActorStore(ctx, blockstore.NewAPIBlockstore(s.delegate))
state, err := miner.Load(stor, act)
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("loading miner state: %+v", err)
}
return miner.AllPartSectors(state, miner.Partition.ActiveSectors)
} }
func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing.MsgLookup, error) { func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing.MsgLookup, error) {