lotus/storage/pipeline/input.go
Phi-rjan 6f7498b622
chore: Merge nv22 into master (#11699)
* [WIP] feat: Add nv22 skeleton

Addition of Network Version 22 skeleton

* update FFI

* feat: drand: refactor round verification

* feat: sealing: Support nv22 DDO features in the sealing pipeline (#11226)

* Initial work supporting DDO pieces in lotus-miner

* sealing: Update pipeline input to operate on UniversalPiece

* sealing: Update pipeline checks/sealing states to operate on UniversalPiece

* sealing: Make pipeline build with UniversalPiece

* move PieceDealInfo out of api

* make gen

* make sealing pipeline unit tests pass

* fix itest ensemble build

* don't panic in SectorsStatus with deals

* stop linter from complaining about checkPieces

* fix sector import tests

* mod tidy

* sealing: Add logic for (pre)committing DDO sectors

* sealing: state-types with method defs

* DDO non-snap pipeline works(?), DDO Itests

* DDO support in snapdeals pipeline

* make gen

* update actor bundles

* update the gst market fix

* fix: chain: use PreCommitSectorsBatch2 when setting up genesis

* some bug fixes

* integration working changes

* update actor bundles

* Make TestOnboardRawPieceSnap pass

* Appease the linter

* Make deadlines test pass with v12 actors

* Update go-state-types, abstract market DealState

* make gen

* mod tidy, lint fixes

* Fix some more tests

* Bump version in master

Bump version in master

* Make gen

Make gen

* fix sender

* fix: lotus-provider: Fix winning PoSt

* fix: sql Scan cannot write to an object

* Actually show miner-addrs in info-log

Actually show miner-addrs in lotus-provider info-log

* [WIP] feat: Add nv22 skeleton

Addition of Network Version 22 skeleton

* update FFI

* ddo is now nv22

* make gen

* temp actor bundle with ddo

* use working go-state-types

* gst with v13 market migration

* update bundle, builtin.MethodsMiner.ProveCommitSectors2 -> 3

* actually working v13 migration, v13 migration itest

* Address review

* sealing: Correct DDO snap pledge math

* itests: Mixed ddo itest

* pipeline: Fix sectorWeight

* sealing: convert market deals into PAMs in mixed sectors

* sealing: make market to ddo conversion work

* fix lint

* update gst

* Update actors and GST to lastest integ branch

* commit batcher: Update ProveCommitSectors3Params builder logic

* make gen

* use builtin-actors master

* ddo: address review

* itests: Add commd assertions to ddo tests

* make gen

* gst with fixed types

* config knobs for RequireActivationSuccess

* storage: Drop obsolete flaky tasts

---------

Co-authored-by: Jennifer Wang <jiayingw703@gmail.com>
Co-authored-by: Aayush <arajasek94@gmail.com>
Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai>
Co-authored-by: Phi <orjan.roren@gmail.com>
Co-authored-by: Andrew Jackson (Ajax) <snadrus@gmail.com>
Co-authored-by: TippyFlits <james.bluett@protocol.ai>

* feat: implement FIP-0063

* chore: deps: update to go-multiaddr v0.12.2 (#11602)

* feat: fvm: update the FVM/FFI to v4.1 (#11608) (#11612)

This:

1. Adds nv22 support.
2. Updates the message tracing format.

Co-authored-by: Steven Allen <steven@stebalien.com>

* AggregateProofType nil when doing batch updates

Use latest nv22 go-state-types version with matching update

* Update to v13.0.0-rc.2 bundle

* chore: Upgrade heights and codename

Update upgrade heights

Co-Authored-By: Steven Allen <steven@stebalien.com>

* Update epoch after nv22 DRAND switch

Update epoch after nv22 DRAND switch

* Update Mango codename to Phoneix

Make the codename for the Drand-change inline with Dragon style.

* Add UpgradePhoenixHeight to API params

* set UpgradePhoenixHeight to be one hour after Dragon

* Make gen

Make gen and UpgradePhoenixHeight in butterfly and local devnet to be in line with Calibration and Mainnet

* Update epoch heights (#11637)

Update epoch heights

* new: add forest bootstrap nodes (#11636)

Signed-off-by: samuelarogbonlo <sbayo971@gmail.com>

* Merge pull request #11491 from filecoin-project/fix/remove-decommissioned-pl-bootstrap-nodes

Remove PL operated bootstrap nodes from mainnet.pi

* feat: api: new verified registry methods to get all allocations and claims (#11631)

* new verireg methods

* update changelog and add itest

* update itest and cli

* update new method's support till v9

* remove gateway APIs

* fix cli internal var names

* chore:: backport #11609 to the feat/nv22 branch (#11644)

* feat: api: improve the correctness of Eth's trace_block (#11609)

* Improve the correctness of Eth's trace_block

- Improve encoding/decoding of parameters and return values:
  - Encode "native" parameters and return values with Solidity ABI.
  - Correctly decode parameters to "create" calls.
  - Use the correct (ish) output for "create" calls.
  - Handle all forms of "create".
- Make robust with respect to reverts:
  - Use the actor ID/address from the trace instead of looking it up in
    the state-tree (may not exist in the state-tree due to a revert).
  - Gracefully handle failed actor/contract creation.
- Improve performance:
  - We avoid looking anything up in the state-tree when translating the
    trace, which should significantly improve performance.
- Improve code readability:
  - Remove all "backtracking" logic.
  - Use an "environment" struct to store temporary state instead of
    attaching it to the trace.
- Fix random bugs:
  - Fix an allocation bug in the "address" logic (need to set the
    capacity before modifying the slice).
  - Improved error checking/handling.
- Use correct types for `trace_block` action/results (create, call, etc.).
  - And use the correct types for Result/Action structs instead of reusing the same "Call" action every time.
- Improve error messages.

* Make gen

Make gen

---------

Co-authored-by: Steven Allen <steven@stebalien.com>

* fix: add UpgradePhoenixHeight to StateGetNetworkParams (#11648)

* chore: deps: update to go-state-types v13.0.0-rc.1

* do NOT update the cache when running the real migration

* Merge pull request #11632 from hanabi1224/hm/drand-test

feat: drand quicknet: allow scheduling drand quicknet upgrade before nv22 on 2k devnet

* chore: deps: update to go-state-types v13.0.0-rc.2

chore: deps: update to go-state-types v13.0.0-rc.2

* feat: set migration config UpgradeEpoch for v13 actors upgrade

* Built-in actor events first draft

* itest for DDO non-market verified data w/ builtin actor events

* Tests for builtin actor events API

* Clean up DDO+Events tests, add lots of explainer comments

* Minor tweaks to events types

* Avoid duplicate messages when looking for receipts

* Rename internal events modules for clarity

* Adjust actor event API after review

* s/ActorEvents/Events/g in global config

* Manage event sending rate for SubscribeActorEvents

* Terminate SubscribeActorEvents chan when at max height

* Document future API changes

* More clarity in actor event API docs

* More post-review changes, lots of tests for SubscribeActorEvents

Use BlockDelay as the window for receiving events on the SubscribeActorEvents
channel. We expect the user to have received the initial batch of historical
events (if any) in one block's time. For real-time events we expect them to
not fall behind by roughly one block's time.

* Remove duplicate code from actor event type marshalling tests

Reduce verbosity and remove duplicate test logic from actor event types
JSON marshalling tests.

* Rename actor events test to follow go convention

Add missing `s` to `actor_events` test file to follow golang convention
used across the repo.

* Run actor events table tests in deterministic order

Refactor `map` usage for actor event table tests to ensure deterministic
test execution order, making debugging potential issues easier. If
non-determinism is a target, leverage Go's built-in parallel testing
capabilities.

* Reduce scope for filter removal failure when getting actor events

Use a fresh context to remove the temporary filter installed solely to
get the actor events. This should reduce chances of failure in a case
where the original context may be expired/cancelled.

Refactor removal into a `defer` statement for a more readable, concise
return statement.

* Use fixed RNG seed for actor event tests

Improve determinism in actor event tests by using a fixed RNG seed. This
makes up a more reproducible test suit.

* Use provided libraries to assert eventual conditions

Use the functionalities already provided by `testify` to assert eventual
conditions, and remove the use of `time.Sleep`.

Remove duplicate code in utility functions that are already defined.

Refactor assertion helper functions to use consistent terminology:
"require" implies fatal error, whereas "assert" implies error where the
test may proceed executing.

* Update changelog for actor events APIs

* Fix concerns and docs identified by review

* Update actor bundle to v13.0.0-rc3

Update actor bundle to v13.0.0-rc3

* Prep Lotus v1.26.0-rc1

- For sanity reverting the mainnet upgrade epoch to 99999999, and then only set it when cutting the final release

-Update Calibnet CIDs to v13.0.0-rc3

- Add GetActorEvents, SubscribeActorEvents, GetAllClaims and GetAllAllocations methods to the changelog

Co-Authored-By: Jiaying Wang <42981373+jennijuju@users.noreply.github.com>

* Update CHANGELOG.md

Co-authored-by: Masih H. Derkani <m@derkani.org>

* Make gen

Make gen

* fix: beacon: validate drand change at nv16 correctly

* bump to v1.26.0-rc2

* test: cleanup ddo verified itest, extract steps to functions

also add allocation-removed event case

* test: extract verified DDO test to separate file, add more checks

* test: add additional actor events checks

* Add verification for "deal-activated" actor event

* docs(drand): document the meaning of "IsChained" (#11692)

* Resolve conflicts

I encountered multiple issues when trying to run make gen. And these changes fixed a couple of them:
- go mod tidy
- Remove RaftState/RaftLeader
- Revert `if ts.Height() > claim.TermMax+claim.TermStart || !cctx.IsSet("expired")` to the what is in the release/v1.26.0: `if tsHeight > val.TermMax || !expired`

* fixup imports, make jen

* Update version

Update version in master to v1.27.0-dev

* Update node/impl/full/dummy.go

Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com>

* Adjust ListClaimsCmd

Adjust ListClaimsCmd according to review

---------

Signed-off-by: samuelarogbonlo <sbayo971@gmail.com>
Co-authored-by: TippyFlits <james.bluett@protocol.ai>
Co-authored-by: Aayush <arajasek94@gmail.com>
Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com>
Co-authored-by: Jennifer Wang <jiayingw703@gmail.com>
Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai>
Co-authored-by: Andrew Jackson (Ajax) <snadrus@gmail.com>
Co-authored-by: Steven Allen <steven@stebalien.com>
Co-authored-by: Rod Vagg <rod@vagg.org>
Co-authored-by: Samuel Arogbonlo <47984109+samuelarogbonlo@users.noreply.github.com>
Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com>
Co-authored-by: tom123222 <160735201+tom123222@users.noreply.github.com>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
Co-authored-by: Masih H. Derkani <m@derkani.org>
Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com>
2024-03-12 10:33:58 +01:00

1020 lines
30 KiB
Go

package sealing
import (
"context"
"sort"
"time"
"go.uber.org/zap"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/result"
"github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
"github.com/filecoin-project/lotus/storage/pipeline/piece"
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
"github.com/filecoin-project/lotus/storage/sealer"
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/filecoin-project/lotus/storage/sectorblocks"
)
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
var used abi.UnpaddedPieceSize
var lastDealEnd abi.ChainEpoch
for _, piece := range sector.Pieces {
used += piece.Piece().Size.Unpadded()
endEpoch, err := piece.EndEpoch()
if err != nil {
return xerrors.Errorf("piece.EndEpoch: %w", err)
}
if piece.HasDealInfo() && endEpoch > lastDealEnd {
lastDealEnd = endEpoch
}
}
m.inputLk.Lock()
if m.nextDealSector != nil && *m.nextDealSector == sector.SectorNumber {
m.nextDealSector = nil
}
sid := m.minerSectorID(sector.SectorNumber)
if len(m.assignedPieces[sid]) > 0 {
m.inputLk.Unlock()
// got assigned more pieces in the AddPiece state
return ctx.Send(SectorAddPiece{})
}
started, err := m.maybeStartSealing(ctx, sector, used)
if err != nil || started {
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
m.inputLk.Unlock()
return err
}
if _, has := m.openSectors[sid]; !has {
m.openSectors[sid] = &openSector{
used: used,
maybeAccept: func(pk piece.PieceKey) error {
// todo check deal start deadline (configurable)
m.assignedPieces[sid] = append(m.assignedPieces[sid], pk)
return ctx.Send(SectorAddPiece{})
},
number: sector.SectorNumber,
ccUpdate: sector.CCUpdate,
}
} else {
// make sure we're only accounting for pieces which were correctly added
// (note that m.assignedPieces[sid] will always be empty here)
m.openSectors[sid].used = used
}
m.openSectors[sid].lastDealEnd = lastDealEnd
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil {
log.Errorf("%+v", err)
}
}()
return nil
}
func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) {
log := log.WithOptions(zap.Fields(
zap.Uint64("sector", uint64(sector.SectorNumber)),
zap.Int("dataPieces", len(sector.nonPaddingPieceInfos())),
))
now := time.Now()
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
if st != nil {
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
// we send another SectorStartPacking in case one was sent in the handleAddPiece state
log.Infow("starting to seal deal sector", "trigger", "wait-timeout")
return true, ctx.Send(SectorStartPacking{})
}
}
ssize, err := sector.SectorType.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size")
}
maxDeals, err := getDealPerSectorLimit(ssize)
if err != nil {
return false, xerrors.Errorf("getting per-sector deal limit: %w", err)
}
if len(sector.nonPaddingPieceInfos()) >= maxDeals {
// can't accept more deals
log.Infow("starting to seal deal sector", "trigger", "maxdeals")
return true, ctx.Send(SectorStartPacking{})
}
if used.Padded() == abi.PaddedPieceSize(ssize) {
// sector full
log.Infow("starting to seal deal sector", "trigger", "filled")
return true, ctx.Send(SectorStartPacking{})
}
if sector.CreationTime != 0 {
cfg, err := m.getConfig()
if err != nil {
return false, xerrors.Errorf("getting storage config: %w", err)
}
sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay)
// check deal age, start sealing when the deal closest to starting is within slack time
ts, err := m.Api.ChainHead(ctx.Context())
blockTime := time.Second * time.Duration(build.BlockDelaySecs)
if err != nil {
return false, xerrors.Errorf("API error getting head: %w", err)
}
var dealSafeSealEpoch abi.ChainEpoch
for _, piece := range sector.Pieces {
if !piece.HasDealInfo() {
continue
}
startEpoch, err := piece.StartEpoch()
if err != nil {
log.Errorw("failed to get start epoch for deal", "piece", piece.String(), "error", err)
continue // not ideal, but skipping the check should break things less
}
dealSafeSealEpoch = startEpoch - cfg.StartEpochSealingBuffer
alloc, err := piece.GetAllocation(ctx.Context(), m.Api, types.EmptyTSK)
if err != nil {
log.Errorw("failed to get allocation for deal", "piece", piece.String(), "error", err)
continue // not ideal, but skipping the check should break things less
}
// alloc is nil if this is not a verified deal in nv17 or later
if alloc == nil {
continue
}
if alloc.Expiration-cfg.StartEpochSealingBuffer < dealSafeSealEpoch {
dealSafeSealEpoch = alloc.Expiration - cfg.StartEpochSealingBuffer
log.Debugw("checking safe seal epoch", "dealSafeSealEpoch", dealSafeSealEpoch)
}
}
dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-ts.Height()) * blockTime)
if dealSafeSealTime.Before(sealTime) {
log.Debugw("deal safe time is before seal time", "dealSafeSealTime", dealSafeSealTime, "sealTime", sealTime)
sealTime = dealSafeSealTime
}
if now.After(sealTime) {
log.Infow("starting to seal deal sector", "trigger", "wait-timeout", "creation", sector.CreationTime)
return true, ctx.Send(SectorStartPacking{})
}
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
log.Infow("starting to seal deal sector", "trigger", "wait-timer")
if err := ctx.Send(SectorStartPacking{}); err != nil {
log.Errorw("sending SectorStartPacking event failed", "error", err)
}
})
}
return false, nil
}
func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error {
ssize, err := sector.SectorType.SectorSize()
if err != nil {
return err
}
res := SectorPieceAdded{}
m.inputLk.Lock()
pending, ok := m.assignedPieces[m.minerSectorID(sector.SectorNumber)]
if ok {
delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber))
}
m.inputLk.Unlock()
if !ok {
// nothing to do here (might happen after a restart in AddPiece)
return ctx.Send(res)
}
var offset abi.UnpaddedPieceSize
pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces))
for i, p := range sector.Pieces {
pieceSizes[i] = p.Piece().Size.Unpadded()
offset += p.Piece().Size.Unpadded()
}
maxDeals, err := getDealPerSectorLimit(ssize)
if err != nil {
return xerrors.Errorf("getting per-sector deal limit: %w", err)
}
for i, piece := range pending {
m.inputLk.Lock()
deal, ok := m.pendingPieces[piece]
m.inputLk.Unlock()
if !ok {
return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber)
}
if len(sector.nonPaddingPieceInfos())+(i+1) > maxDeals {
// todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber))
continue
}
pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded())
if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) {
// todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("piece %s assigned to sector %d with not enough space", piece, sector.SectorNumber))
continue
}
offset += padLength.Unpadded()
for _, p := range pads {
expectCid := zerocomm.ZeroPieceCommitment(p.Unpadded())
ppi, err := m.sealer.AddPiece(sealer.WithPriority(ctx.Context(), DealSectorPriority),
m.minerSector(sector.SectorType, sector.SectorNumber),
pieceSizes,
p.Unpadded(),
nullreader.NewNullReader(p.Unpadded()))
if err != nil {
err = xerrors.Errorf("writing padding piece: %w", err)
deal.accepted(sector.SectorNumber, offset, err)
return ctx.Send(SectorAddPieceFailed{err})
}
if !ppi.PieceCID.Equals(expectCid) {
err = xerrors.Errorf("got unexpected padding piece CID: expected:%s, got:%s", expectCid, ppi.PieceCID)
deal.accepted(sector.SectorNumber, offset, err)
return ctx.Send(SectorAddPieceFailed{err})
}
pieceSizes = append(pieceSizes, p.Unpadded())
res.NewPieces = append(res.NewPieces, SafeSectorPiece{
api.SectorPiece{
Piece: ppi,
},
})
}
ppi, err := m.sealer.AddPiece(sealer.WithPriority(ctx.Context(), DealSectorPriority),
m.minerSector(sector.SectorType, sector.SectorNumber),
pieceSizes,
deal.size,
deal.data)
if err != nil {
err = xerrors.Errorf("writing piece: %w", err)
deal.accepted(sector.SectorNumber, offset, err)
return ctx.Send(SectorAddPieceFailed{err})
}
if !ppi.PieceCID.Equals(deal.deal.PieceCID()) {
err = xerrors.Errorf("got unexpected piece CID: expected:%s, got:%s", deal.deal.PieceCID(), ppi.PieceCID)
deal.accepted(sector.SectorNumber, offset, err)
return ctx.Send(SectorAddPieceFailed{err})
}
log.Infow("deal added to a sector", "pieceID", deal.deal.String(), "sector", sector.SectorNumber, "piece", ppi.PieceCID)
deal.accepted(sector.SectorNumber, offset, nil)
offset += deal.size
pieceSizes = append(pieceSizes, deal.size)
dinfo := deal.deal.Impl()
res.NewPieces = append(res.NewPieces, SafeSectorPiece{
api.SectorPiece{
Piece: ppi,
DealInfo: &dinfo,
},
})
}
return ctx.Send(res)
}
func (m *Sealing) handleAddPieceFailed(ctx statemachine.Context, sector SectorInfo) error {
return ctx.Send(SectorRetryWaitDeals{})
}
func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, pieceInfo piece.PieceDealInfo) (api.SectorOffset, error) {
return m.sectorAddPieceToAny(ctx, size, data, &pieceInfo)
}
func (m *Sealing) sectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, pieceInfo UniversalPieceInfo) (api.SectorOffset, error) {
log.Infof("Adding piece %s", pieceInfo.String())
if (padreader.PaddedSize(uint64(size))) != size {
return api.SectorOffset{}, xerrors.Errorf("cannot allocate unpadded piece")
}
sp, err := m.currentSealProof(ctx)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting current seal proof type: %w", err)
}
ssize, err := sp.SectorSize()
if err != nil {
return api.SectorOffset{}, err
}
if size > abi.PaddedPieceSize(ssize).Unpadded() {
return api.SectorOffset{}, xerrors.Errorf("piece cannot fit into a sector")
}
cfg, err := m.getConfig()
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting config: %w", err)
}
ts, err := m.Api.ChainHead(ctx)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("couldnt get chain head: %w", err)
}
nv, err := m.Api.StateNetworkVersion(ctx, types.EmptyTSK)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting network version: %w", err)
}
if err := pieceInfo.Valid(nv); err != nil {
return api.SectorOffset{}, xerrors.Errorf("piece metadata invalid: %w", err)
}
startEpoch, err := pieceInfo.StartEpoch()
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting last start epoch: %w", err)
}
if ts.Height()+cfg.StartEpochSealingBuffer > startEpoch {
return api.SectorOffset{}, xerrors.Errorf(
"cannot add piece for deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
pieceInfo.PieceCID(), ts.Height(), startEpoch)
}
claimTerms, err := m.getClaimTerms(ctx, pieceInfo, ts.Key())
if err != nil {
return api.SectorOffset{}, err
}
m.inputLk.Lock()
if pp, exist := m.pendingPieces[pieceInfo.Key()]; exist {
m.inputLk.Unlock()
// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
if res.err == nil {
// all good, return the response
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
// if there was an error waiting for a pre-existing add piece call, let's retry
m.inputLk.Lock()
}
// addPendingPiece takes over m.inputLk
pp := m.addPendingPiece(ctx, size, data, pieceInfo, claimTerms, sp)
res, err := waitAddPieceResp(ctx, pp)
if err != nil {
return api.SectorOffset{}, err
}
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
}
func (m *Sealing) getClaimTerms(ctx context.Context, deal UniversalPieceInfo, tsk types.TipSetKey) (pieceClaimBounds, error) {
all, err := deal.GetAllocation(ctx, m.Api, tsk)
if err != nil {
return pieceClaimBounds{}, err
}
if all != nil {
startEpoch, err := deal.StartEpoch()
if err != nil {
return pieceClaimBounds{}, err
}
return pieceClaimBounds{
claimTermEnd: startEpoch + all.TermMax,
}, nil
}
nv, err := m.Api.StateNetworkVersion(ctx, tsk)
if err != nil {
return pieceClaimBounds{}, err
}
endEpoch, err := deal.EndEpoch()
if err != nil {
return pieceClaimBounds{}, err
}
// no allocation for this deal, so just use a really high number for "term end"
return pieceClaimBounds{
claimTermEnd: endEpoch + policy.GetSectorMaxLifetime(abi.RegisteredSealProof_StackedDrg32GiBV1_1, nv),
}, nil
}
// called with m.inputLk; transfers the lock to another goroutine!
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, deal UniversalPieceInfo, ct pieceClaimBounds, sp abi.RegisteredSealProof) *pendingPiece {
doneCh := make(chan struct{})
pp := &pendingPiece{
size: size,
deal: deal,
claimTerms: ct,
data: data,
doneCh: doneCh,
assigned: false,
}
pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
pp.resp = &pieceAcceptResp{sn, offset, err}
close(pp.doneCh)
}
log.Debugw("new pending piece", "pieceID", deal.String(),
"dealStart", result.Wrap(deal.StartEpoch()),
"dealEnd", result.Wrap(deal.EndEpoch()),
"termEnd", ct.claimTermEnd)
m.pendingPieces[deal.Key()] = pp
go func() {
defer m.inputLk.Unlock()
if err := m.updateInput(ctx, sp); err != nil {
log.Errorf("%+v", err)
}
}()
return pp
}
func waitAddPieceResp(ctx context.Context, pp *pendingPiece) (*pieceAcceptResp, error) {
select {
case <-pp.doneCh:
res := pp.resp
return res, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (m *Sealing) SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error {
sp, err := m.currentSealProof(ctx)
if err != nil {
return xerrors.Errorf("failed to get current seal proof: %w", err)
}
log.Debug("pieces to sector matching waiting for lock")
m.inputLk.Lock()
defer m.inputLk.Unlock()
return m.updateInput(ctx, sp)
}
type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error)
// called with m.inputLk
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
memo := make(map[abi.SectorNumber]struct {
e abi.ChainEpoch
p abi.TokenAmount
})
getExpirationCached := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
if e, ok := memo[sn]; ok {
return e.e, e.p, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, types.TipSetKey{})
if err != nil {
return 0, big.Zero(), err
}
if onChainInfo == nil {
return 0, big.Zero(), xerrors.Errorf("sector info for sector %d not found", sn)
}
memo[sn] = struct {
e abi.ChainEpoch
p abi.TokenAmount
}{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge}
return onChainInfo.Expiration, onChainInfo.InitialPledge, nil
}
ssize, err := sp.SectorSize()
if err != nil {
return err
}
type match struct {
sector abi.SectorID
deal piece.PieceKey
dealEnd abi.ChainEpoch
claimTermEnd abi.ChainEpoch
size abi.UnpaddedPieceSize
padding abi.UnpaddedPieceSize
}
var matches []match
toAssign := map[piece.PieceKey]struct{}{} // used to maybe create new sectors
// todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners
// (unlikely to be a problem now)
for proposalCid, piece := range m.pendingPieces {
if piece.assigned {
continue // already assigned to a sector, skip
}
toAssign[proposalCid] = struct{}{}
for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
// check that sector lifetime is long enough to fit deal using latest expiration from on chain
ok, err := sector.checkDealAssignable(piece, getExpirationCached)
if err != nil {
log.Errorf("failed to check expiration for cc Update sector %d", sector.number)
continue
}
if !ok {
continue
}
endEpoch, err := piece.deal.EndEpoch()
if err != nil {
log.Errorf("failed to get end epoch for deal %s", piece.deal)
continue
}
if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding)
matches = append(matches, match{
sector: id,
deal: proposalCid,
dealEnd: endEpoch,
claimTermEnd: piece.claimTerms.claimTermEnd,
size: piece.size,
padding: avail % piece.size,
})
}
}
}
sort.Slice(matches, func(i, j int) bool {
// todo maybe sort by expiration
if matches[i].padding != matches[j].padding { // less padding is better
return matches[i].padding < matches[j].padding
}
if matches[i].size != matches[j].size { // larger pieces are better
return matches[i].size < matches[j].size
}
return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors
})
log.Debugw("updateInput matching", "matches", len(matches), "toAssign", len(toAssign), "openSectors", len(m.openSectors), "pieces", len(m.pendingPieces))
var assigned int
for _, mt := range matches {
if m.pendingPieces[mt.deal].assigned {
assigned++
continue
}
if _, found := m.openSectors[mt.sector]; !found {
continue
}
// late checks
avail := abi.PaddedPieceSize(ssize).Unpadded() - m.openSectors[mt.sector].used
if mt.size > avail {
continue
}
if m.openSectors[mt.sector].lastDealEnd > mt.claimTermEnd {
continue
}
// assign the piece!
err := m.openSectors[mt.sector].maybeAccept(mt.deal)
if err != nil {
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
}
m.openSectors[mt.sector].used += mt.padding + mt.size
if mt.dealEnd > m.openSectors[mt.sector].lastDealEnd {
m.openSectors[mt.sector].lastDealEnd = mt.dealEnd
}
m.pendingPieces[mt.deal].assigned = true
delete(toAssign, mt.deal)
if err != nil {
log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err)
continue
}
}
log.Debugw("updateInput matching done", "matches", len(matches), "toAssign", len(toAssign), "assigned", assigned, "openSectors", len(m.openSectors), "pieces", len(m.pendingPieces))
if len(toAssign) > 0 {
if err := m.tryGetDealSector(ctx, sp, getExpirationCached); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err)
}
}
return nil
}
// pendingPieceIndex is an index in the Sealing.pendingPieces map
type pendingPieceIndex piece.PieceKey
type pieceBound struct {
epoch abi.ChainEpoch
// boundStart marks deal /end/ epoch; only deals with boundStart lower or equal to expiration of a given sector can be
// put into that sector
boundStart []pendingPieceIndex
// boundEnd marks deal claim TermMax; only deals with boundEnd higher or equal to expiration of a given sector can be
// put into that sector
boundEnd []pendingPieceIndex
dealBytesInBound abi.UnpaddedPieceSize
}
func (m *Sealing) pendingPieceEpochBounds() []pieceBound {
boundsByEpoch := map[abi.ChainEpoch]*pieceBound{}
for ppi, piece := range m.pendingPieces {
if piece.assigned {
continue
}
endEpoch, err := piece.deal.EndEpoch()
if err != nil {
// this really should never happen, at this point we have validated
// the piece enough times
log.Errorf("failed to get end epoch for deal %s: %v", ppi, err)
continue
}
// start bound on deal end
if boundsByEpoch[endEpoch] == nil {
boundsByEpoch[endEpoch] = &pieceBound{
epoch: endEpoch,
}
}
boundsByEpoch[endEpoch].boundStart = append(boundsByEpoch[endEpoch].boundStart, pendingPieceIndex(ppi))
// end bound on term max
if boundsByEpoch[piece.claimTerms.claimTermEnd] == nil {
boundsByEpoch[piece.claimTerms.claimTermEnd] = &pieceBound{
epoch: piece.claimTerms.claimTermEnd,
}
}
boundsByEpoch[piece.claimTerms.claimTermEnd].boundEnd = append(boundsByEpoch[piece.claimTerms.claimTermEnd].boundEnd, pendingPieceIndex(ppi))
}
out := make([]pieceBound, 0, len(boundsByEpoch))
for _, bound := range boundsByEpoch {
out = append(out, *bound)
}
sort.Slice(out, func(i, j int) bool {
return out[i].epoch < out[j].epoch
})
var curBoundBytes abi.UnpaddedPieceSize
for i, bound := range out {
for _, ppi := range bound.boundStart {
curBoundBytes += m.pendingPieces[piece.PieceKey(ppi)].size
}
for _, ppi := range bound.boundEnd {
curBoundBytes -= m.pendingPieces[piece.PieceKey(ppi)].size
}
out[i].dealBytesInBound = curBoundBytes
}
return out
}
func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, cfg sealiface.Config, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}
ts, err := m.Api.ChainHead(ctx)
if err != nil {
return false, err
}
ssize, err := sp.SectorSize()
if err != nil {
return false, err
}
pieceBounds := m.pendingPieceEpochBounds()
findBound := func(sectorExp abi.ChainEpoch) *pieceBound {
if len(pieceBounds) == 0 {
return nil
}
f := sort.Search(len(pieceBounds), func(i int) bool {
return sectorExp <= pieceBounds[i].epoch
})
if f == 0 {
// all piece bounds are after sector expiration
return nil
}
return &pieceBounds[f-1]
}
minExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinUpgradeSectorExpiration)
var candidate abi.SectorID
var bestExpiration abi.ChainEpoch
var bestDealBytes abi.PaddedPieceSize
bestPledge := types.TotalFilecoinInt
for s := range m.available {
expirationEpoch, pledge, err := ef(s.Number)
if err != nil {
log.Errorw("checking sector expiration", "error", err)
continue
}
slowChecks := func(sid abi.SectorNumber) bool {
active, err := m.sectorActive(ctx, types.TipSetKey{}, sid)
if err != nil {
log.Errorw("checking sector active", "error", err)
return false
}
if !active {
log.Debugw("skipping available sector", "sector", sid, "reason", "not active")
return false
}
return true
}
if expirationEpoch < minExpirationEpoch {
log.Debugw("skipping available sector", "sector", s.Number, "reason", "expiration below MinUpgradeSectorExpiration")
}
pb := findBound(expirationEpoch)
if pb == nil {
log.Debugw("skipping available sector", "sector", s.Number, "reason", "expiration below deal bounds")
continue
}
if pb.dealBytesInBound.Padded() == 0 {
log.Debugw("skipping available sector", "sector", s.Number, "reason", "no deals in expiration bounds", "expiration", expirationEpoch)
continue
}
// if the sector has less than one sector worth of candidate deals, and
// the best candidate has more candidate deals, this sector isn't better
lessThanSectorOfData := pb.dealBytesInBound.Padded() < abi.PaddedPieceSize(ssize)
moreDealsThanBest := pb.dealBytesInBound.Padded() > bestDealBytes
// we want lower pledge, but only if we have more than one sector worth of deals
preferDueToDealSize := lessThanSectorOfData && moreDealsThanBest
preferDueToPledge := pledge.LessThan(bestPledge) && !lessThanSectorOfData
prefer := preferDueToDealSize || preferDueToPledge
if prefer && slowChecks(s.Number) {
bestExpiration = expirationEpoch
bestPledge = pledge
bestDealBytes = pb.dealBytesInBound.Padded()
candidate = s
}
}
if bestExpiration < minExpirationEpoch {
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "min", minExpirationEpoch, "candidate", candidate)
// didn't find a good sector / no sectors were available
return false, nil
}
log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge), "pieces", len(m.pendingPieces), "dealBytesAtExp", bestDealBytes)
delete(m.available, candidate)
m.nextDealSector = &candidate.Number
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}
// call with m.inputLk
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
sid, err := m.NextSectorNumber(ctx)
if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err)
}
err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
if err != nil {
return 0, xerrors.Errorf("initializing sector: %w", err)
}
// update stats early, fsm planner would do that async
m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState)
return sid, err
}
func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()
if m.nextDealSector != nil {
return nil // new sector is being created right now
}
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}
// if we're above WaitDeals limit, we don't want to add more staging sectors
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
return nil
}
maxUpgrading := cfg.MaxSealingSectorsForDeals
if cfg.MaxUpgradingSectors > 0 {
maxUpgrading = cfg.MaxUpgradingSectors
}
canCreate := cfg.MakeNewSectorForDeals && !(cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals)
canUpgrade := !(maxUpgrading > 0 && m.stats.curSealing() >= maxUpgrading)
// we want to try to upgrade when:
// - we can upgrade and prefer upgrades
// - we don't prefer upgrades, but can't create a new sector
shouldUpgrade := canUpgrade && (!cfg.PreferNewSectorsForDeals || !canCreate)
log.Infow("new deal sector decision",
"sealing", m.stats.curSealing(),
"maxSeal", cfg.MaxSealingSectorsForDeals,
"maxUpgrade", maxUpgrading,
"preferNew", cfg.PreferNewSectorsForDeals,
"canCreate", canCreate,
"canUpgrade", canUpgrade,
"shouldUpgrade", shouldUpgrade)
if shouldUpgrade {
got, err := m.maybeUpgradeSector(ctx, sp, cfg, ef)
if err != nil {
return err
}
if got {
return nil
}
}
if canCreate {
sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err
}
m.nextDealSector = &sid
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
if err := m.sectors.Send(uint64(sid), SectorStart{
ID: sid,
SectorType: sp,
}); err != nil {
return err
}
}
return nil
}
func (m *Sealing) StartPackingSector(sid abi.SectorNumber) error {
m.startupWait.Wait()
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorStartPacking{})
}
func (m *Sealing) SectorAbortUpgrade(sid abi.SectorNumber) error {
m.startupWait.Wait()
m.inputLk.Lock()
// always do this early
delete(m.available, m.minerSectorID(sid))
m.inputLk.Unlock()
log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")})
}
func (m *Sealing) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
if showOnChainInfo {
return api.SectorInfo{}, xerrors.Errorf("on-chain info not supported")
}
info, err := m.GetSectorInfo(sid)
if err != nil {
return api.SectorInfo{}, err
}
deals := make([]abi.DealID, len(info.Pieces))
pieces := make([]api.SectorPiece, len(info.Pieces))
for i, piece := range info.Pieces {
// todo make this work with DDO deals in some reasonable way
pieces[i].Piece = piece.Piece()
if !piece.HasDealInfo() || piece.Impl().PublishCid == nil {
continue
}
pdi := piece.DealInfo().Impl() // copy
pieces[i].DealInfo = &pdi
deals[i] = piece.DealInfo().Impl().DealID
}
log := make([]api.SectorLog, len(info.Log))
for i, l := range info.Log {
log[i] = api.SectorLog{
Kind: l.Kind,
Timestamp: l.Timestamp,
Trace: l.Trace,
Message: l.Message,
}
}
sInfo := api.SectorInfo{
SectorID: sid,
State: api.SectorState(info.State),
CommD: info.CommD,
CommR: info.CommR,
Proof: info.Proof,
Deals: deals,
Pieces: pieces,
Ticket: api.SealTicket{
Value: info.TicketValue,
Epoch: info.TicketEpoch,
},
Seed: api.SealSeed{
Value: info.SeedValue,
Epoch: info.SeedEpoch,
},
PreCommitMsg: info.PreCommitMessage,
CommitMsg: info.CommitMessage,
Retries: info.InvalidProofs,
ToUpgrade: false,
ReplicaUpdateMessage: info.ReplicaUpdateMessage,
LastErr: info.LastErr,
Log: log,
// on chain info
SealProof: info.SectorType,
Activation: 0,
Expiration: 0,
DealWeight: big.Zero(),
VerifiedDealWeight: big.Zero(),
InitialPledge: big.Zero(),
OnTime: 0,
Early: 0,
}
return sInfo, nil
}
var _ sectorblocks.SectorBuilder = &Sealing{}