1a21b42fd7
* enable storing events (#11712) * fix: commit batch: Always go through commit batcher (#11704) * fix: commit batch: Always go through commit batcher * fix sealing fsm tests * sealing pipeline: Fix panic on padding pieces in WaitDeals (#11708) * sealing pipeline: Fix panic on padding pieces in WaitDeals * sealing pipeline: Catch panics * sealing pipeline: Output DDO pieces in SectorStatus (#11709) * sealing pipeline: Fix failing ProveCommit3 aggregate (#11710) * itests: Repro failing ProveCommit3 aggregate * commit batch: Correctly sort sectors in processBatchV2 * fix imports * ci: Bigger instance for sector_pledge test * itests: Use Must-Post mining in TestPledgeBatching --------- Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com> Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com>
953 lines
32 KiB
Go
953 lines
32 KiB
Go
package sealing
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-address"
|
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
|
"github.com/filecoin-project/go-commp-utils/zerocomm"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
actorstypes "github.com/filecoin-project/go-state-types/actors"
|
|
"github.com/filecoin-project/go-state-types/big"
|
|
miner2 "github.com/filecoin-project/go-state-types/builtin/v13/miner"
|
|
verifreg13 "github.com/filecoin-project/go-state-types/builtin/v13/verifreg"
|
|
"github.com/filecoin-project/go-state-types/builtin/v9/verifreg"
|
|
"github.com/filecoin-project/go-state-types/crypto"
|
|
"github.com/filecoin-project/go-state-types/exitcode"
|
|
"github.com/filecoin-project/go-state-types/network"
|
|
"github.com/filecoin-project/go-state-types/proof"
|
|
"github.com/filecoin-project/go-statemachine"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
|
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
|
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
|
)
|
|
|
|
const MinDDONetworkVersion = network.Version22
|
|
|
|
var DealSectorPriority = 1024
|
|
var MaxTicketAge = policy.MaxPreCommitRandomnessLookback
|
|
|
|
func (m *Sealing) cleanupAssignedDeals(sector SectorInfo) {
|
|
m.inputLk.Lock()
|
|
// make sure we are not accepting deals into this sector
|
|
for _, c := range m.assignedPieces[m.minerSectorID(sector.SectorNumber)] {
|
|
pp := m.pendingPieces[c]
|
|
delete(m.pendingPieces, c)
|
|
if pp == nil {
|
|
log.Errorf("nil assigned pending piece %s", c)
|
|
continue
|
|
}
|
|
|
|
// todo: return to the sealing queue (this is extremely unlikely to happen)
|
|
pp.accepted(sector.SectorNumber, 0, xerrors.Errorf("sector %d entered packing state early", sector.SectorNumber))
|
|
}
|
|
|
|
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
|
delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber))
|
|
m.inputLk.Unlock()
|
|
}
|
|
|
|
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
|
|
m.cleanupAssignedDeals(sector)
|
|
|
|
// if this is a snapdeals sector, but it ended up not having any deals, abort the upgrade
|
|
if sector.State == SnapDealsPacking && !sector.hasData() {
|
|
return ctx.Send(SectorAbortUpgrade{xerrors.New("sector had no deals")})
|
|
}
|
|
|
|
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber)
|
|
|
|
var allocated abi.UnpaddedPieceSize
|
|
for _, piece := range sector.Pieces {
|
|
allocated += piece.Piece().Size.Unpadded()
|
|
}
|
|
|
|
ssize, err := sector.SectorType.SectorSize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ubytes := abi.PaddedPieceSize(ssize).Unpadded()
|
|
|
|
if allocated > ubytes {
|
|
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
|
|
}
|
|
|
|
fillerSizes, err := fillersFromRem(ubytes - allocated)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(fillerSizes) > 0 {
|
|
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber)
|
|
}
|
|
|
|
fillerPieces, err := m.padSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.existingPieceSizes(), fillerSizes...)
|
|
if err != nil {
|
|
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
|
}
|
|
|
|
return ctx.Send(SectorPacked{FillerPieces: fillerPieces})
|
|
}
|
|
|
|
func (m *Sealing) padSector(ctx context.Context, sectorID storiface.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
|
|
if len(sizes) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
log.Infof("Pledge %d, contains %+v", sectorID, existingPieceSizes)
|
|
|
|
out := make([]abi.PieceInfo, len(sizes))
|
|
for i, size := range sizes {
|
|
expectCid := zerocomm.ZeroPieceCommitment(size)
|
|
|
|
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, nullreader.NewNullReader(size))
|
|
if err != nil {
|
|
return nil, xerrors.Errorf("add piece: %w", err)
|
|
}
|
|
if !expectCid.Equals(ppi.PieceCID) {
|
|
return nil, xerrors.Errorf("got unexpected padding piece CID: expected:%s, got:%s", expectCid, ppi.PieceCID)
|
|
}
|
|
|
|
existingPieceSizes = append(existingPieceSizes, size)
|
|
|
|
out[i] = ppi
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func checkTicketExpired(ticket, head abi.ChainEpoch) bool {
|
|
return head-ticket > MaxTicketAge // TODO: allow configuring expected seal durations
|
|
}
|
|
|
|
func checkProveCommitExpired(preCommitEpoch, msd abi.ChainEpoch, currEpoch abi.ChainEpoch) bool {
|
|
return currEpoch > preCommitEpoch+msd
|
|
}
|
|
|
|
func (m *Sealing) getTicket(ctx statemachine.Context, sector SectorInfo) (abi.SealRandomness, abi.ChainEpoch, bool, error) {
|
|
ts, err := m.Api.ChainHead(ctx.Context())
|
|
if err != nil {
|
|
log.Errorf("getTicket: api error, not proceeding: %+v", err)
|
|
return nil, 0, false, nil
|
|
}
|
|
|
|
// the reason why the StateMinerSectorAllocated function is placed here, if it is outside,
|
|
// if the MarshalCBOR function and StateSectorPreCommitInfo function return err, it will be executed
|
|
allocated, aerr := m.Api.StateMinerSectorAllocated(ctx.Context(), m.maddr, sector.SectorNumber, types.EmptyTSK)
|
|
if aerr != nil {
|
|
log.Errorf("getTicket: api error, checking if sector is allocated: %+v", aerr)
|
|
return nil, 0, false, nil
|
|
}
|
|
|
|
ticketEpoch := ts.Height() - policy.SealRandomnessLookback
|
|
buf := new(bytes.Buffer)
|
|
if err := m.maddr.MarshalCBOR(buf); err != nil {
|
|
return nil, 0, allocated, err
|
|
}
|
|
|
|
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
|
|
if err != nil {
|
|
return nil, 0, allocated, xerrors.Errorf("getting precommit info: %w", err)
|
|
}
|
|
|
|
if pci != nil {
|
|
ticketEpoch = pci.Info.SealRandEpoch
|
|
|
|
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
|
|
if err != nil {
|
|
return nil, 0, allocated, xerrors.Errorf("getTicket: StateNetworkVersion: api error, not proceeding: %+v", err)
|
|
}
|
|
|
|
av, err := actorstypes.VersionForNetwork(nv)
|
|
if err != nil {
|
|
return nil, 0, allocated, xerrors.Errorf("getTicket: actor version for network error, not proceeding: %w", err)
|
|
}
|
|
msd, err := policy.GetMaxProveCommitDuration(av, sector.SectorType)
|
|
if err != nil {
|
|
return nil, 0, allocated, xerrors.Errorf("getTicket: max prove commit duration policy error, not proceeding: %w", err)
|
|
}
|
|
|
|
if checkProveCommitExpired(pci.PreCommitEpoch, msd, ts.Height()) {
|
|
return nil, 0, allocated, xerrors.Errorf("ticket expired for precommitted sector")
|
|
}
|
|
}
|
|
|
|
if pci == nil && allocated { // allocated is true, sector precommitted but expired, will SectorCommitFailed or SectorRemove
|
|
return nil, 0, allocated, xerrors.Errorf("sector %s precommitted but expired", sector.SectorNumber)
|
|
}
|
|
|
|
rand, err := m.Api.StateGetRandomnessFromTickets(ctx.Context(), crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
|
|
if err != nil {
|
|
return nil, 0, allocated, err
|
|
}
|
|
|
|
return abi.SealRandomness(rand), ticketEpoch, allocated, nil
|
|
}
|
|
|
|
func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) error {
|
|
ticketValue, ticketEpoch, allocated, err := m.getTicket(ctx, sector)
|
|
if err != nil {
|
|
if allocated {
|
|
if sector.CommitMessage != nil {
|
|
// Some recovery paths with unfortunate timing lead here
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector %s is committed but got into the GetTicket state", sector.SectorNumber)})
|
|
}
|
|
|
|
log.Errorf("Sector %s precommitted but expired", sector.SectorNumber)
|
|
return ctx.Send(SectorRemove{})
|
|
}
|
|
|
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("getting ticket failed: %w", err)})
|
|
}
|
|
|
|
return ctx.Send(SectorTicket{
|
|
TicketValue: ticketValue,
|
|
TicketEpoch: ticketEpoch,
|
|
})
|
|
}
|
|
|
|
var SoftErrRetryWait = 5 * time.Second
|
|
|
|
func retrySoftErr(ctx context.Context, cb func() error) error {
|
|
for {
|
|
err := cb()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
var cerr storiface.WorkError
|
|
|
|
if errors.As(err, &cerr) {
|
|
switch cerr.ErrCode() {
|
|
case storiface.ErrTempWorkerRestart:
|
|
fallthrough
|
|
case storiface.ErrTempAllocateSpace:
|
|
// retry
|
|
log.Errorw("retrying soft error", "err", err, "code", cerr.ErrCode())
|
|
default:
|
|
// non-temp error
|
|
return err
|
|
}
|
|
|
|
// check if the context got cancelled early
|
|
if ctx.Err() != nil {
|
|
return ctx.Err()
|
|
}
|
|
|
|
// retry
|
|
time.Sleep(SoftErrRetryWait)
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
|
|
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state
|
|
switch err.(type) {
|
|
case *ErrApi:
|
|
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
|
|
return nil
|
|
case *ErrInvalidDeals:
|
|
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
|
|
return ctx.Send(SectorInvalidDealIDs{Return: RetPreCommit1})
|
|
case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
|
|
return ctx.Send(SectorDealsExpired{xerrors.Errorf("expired dealIDs in sector: %w", err)})
|
|
default:
|
|
return xerrors.Errorf("checkPieces sanity check error: %w", err)
|
|
}
|
|
}
|
|
|
|
ts, err := m.Api.ChainHead(ctx.Context())
|
|
if err != nil {
|
|
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
|
|
return nil
|
|
}
|
|
|
|
if checkTicketExpired(sector.TicketEpoch, ts.Height()) {
|
|
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
|
|
if err != nil {
|
|
log.Errorf("handlePreCommit1: StateSectorPreCommitInfo: api error, not proceeding: %+v", err)
|
|
return nil
|
|
}
|
|
|
|
if pci == nil {
|
|
return ctx.Send(SectorOldTicket{}) // go get new ticket
|
|
}
|
|
|
|
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
|
|
if err != nil {
|
|
log.Errorf("handlePreCommit1: StateNetworkVersion: api error, not proceeding: %+v", err)
|
|
return nil
|
|
}
|
|
|
|
av, err := actorstypes.VersionForNetwork(nv)
|
|
if err != nil {
|
|
log.Errorf("handlePreCommit1: VersionForNetwork error, not proceeding: %w", err)
|
|
return nil
|
|
}
|
|
msd, err := policy.GetMaxProveCommitDuration(av, sector.SectorType)
|
|
if err != nil {
|
|
log.Errorf("handlePreCommit1: GetMaxProveCommitDuration error, not proceeding: %w", err)
|
|
return nil
|
|
}
|
|
|
|
// if height > PreCommitEpoch + msd, there is no need to recalculate
|
|
if checkProveCommitExpired(pci.PreCommitEpoch, msd, ts.Height()) {
|
|
return ctx.Send(SectorOldTicket{}) // will be removed
|
|
}
|
|
}
|
|
|
|
var pc1o storiface.PreCommit1Out
|
|
err = retrySoftErr(ctx.Context(), func() (err error) {
|
|
pc1o, err = m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
|
|
}
|
|
|
|
return ctx.Send(SectorPreCommit1{
|
|
PreCommit1Out: pc1o,
|
|
})
|
|
}
|
|
|
|
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
|
|
var cids storiface.SectorCids
|
|
|
|
err := retrySoftErr(ctx.Context(), func() (err error) {
|
|
cids, err = m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out)
|
|
return err
|
|
})
|
|
if err != nil {
|
|
return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
|
|
}
|
|
|
|
if cids.Unsealed == cid.Undef {
|
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(2) returned undefined CommD")})
|
|
}
|
|
|
|
return ctx.Send(SectorPreCommit2{
|
|
Unsealed: cids.Unsealed,
|
|
Sealed: cids.Sealed,
|
|
})
|
|
}
|
|
|
|
func (m *Sealing) preCommitInfo(ctx statemachine.Context, sector SectorInfo) (*miner.SectorPreCommitInfo, big.Int, types.TipSetKey, error) {
|
|
ts, err := m.Api.ChainHead(ctx.Context())
|
|
if err != nil {
|
|
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
|
return nil, big.Zero(), types.EmptyTSK, nil
|
|
}
|
|
|
|
if err := checkPrecommit(ctx.Context(), m.Address(), sector, ts.Key(), ts.Height(), m.Api); err != nil {
|
|
switch err := err.(type) {
|
|
case *ErrApi:
|
|
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
|
return nil, big.Zero(), types.EmptyTSK, nil
|
|
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too)
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad CommD error: %w", err)})
|
|
case *ErrExpiredTicket:
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("ticket expired: %w", err)})
|
|
case *ErrBadTicket:
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
|
|
case *ErrInvalidDeals:
|
|
log.Warnf("invalid deals in sector %d: %v", sector.SectorNumber, err)
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorInvalidDealIDs{Return: RetPreCommitting})
|
|
case *ErrExpiredDeals:
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorDealsExpired{xerrors.Errorf("sector deals expired: %w", err)})
|
|
case *ErrPrecommitOnChain:
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorPreCommitLanded{TipSet: ts.Key()}) // we re-did precommit
|
|
case *ErrSectorNumberAllocated:
|
|
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
|
|
// TODO: check if the sector is committed (not sure how we'd end up here)
|
|
return nil, big.Zero(), types.EmptyTSK, nil
|
|
default:
|
|
return nil, big.Zero(), types.EmptyTSK, xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
|
}
|
|
}
|
|
|
|
expiration, err := m.pcp.Expiration(ctx.Context(), sector.Pieces...)
|
|
if err != nil {
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("handlePreCommitting: failed to compute pre-commit expiry: %w", err)})
|
|
}
|
|
|
|
nv, err := m.Api.StateNetworkVersion(ctx.Context(), ts.Key())
|
|
if err != nil {
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get network version: %w", err)})
|
|
}
|
|
|
|
av, err := actorstypes.VersionForNetwork(nv)
|
|
if err != nil {
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get actors version: %w", err)})
|
|
}
|
|
msd, err := policy.GetMaxProveCommitDuration(av, sector.SectorType)
|
|
if err != nil {
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get max prove commit duration: %w", err)})
|
|
}
|
|
|
|
if minExpiration := sector.TicketEpoch + policy.MaxPreCommitRandomnessLookback + msd + miner.MinSectorExpiration; expiration < minExpiration {
|
|
expiration = minExpiration
|
|
}
|
|
|
|
// Assume: both precommit msg & commit msg land on chain as early as possible
|
|
maxExtension, err := policy.GetMaxSectorExpirationExtension(nv)
|
|
if err != nil {
|
|
return nil, big.Zero(), types.EmptyTSK, ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("failed to get max extension: %w", err)})
|
|
}
|
|
|
|
maxExpiration := ts.Height() + policy.GetPreCommitChallengeDelay() + maxExtension
|
|
if expiration > maxExpiration {
|
|
expiration = maxExpiration
|
|
}
|
|
|
|
params := &miner.SectorPreCommitInfo{
|
|
Expiration: expiration,
|
|
SectorNumber: sector.SectorNumber,
|
|
SealProof: sector.SectorType,
|
|
|
|
SealedCID: *sector.CommR,
|
|
SealRandEpoch: sector.TicketEpoch,
|
|
}
|
|
|
|
if sector.hasData() {
|
|
// only CC sectors don't have UnsealedCID
|
|
params.UnsealedCid = sector.CommD
|
|
|
|
// true when the sector has non-builtin-marked data
|
|
sectorIsDDO := false
|
|
|
|
for _, piece := range sector.Pieces {
|
|
err := piece.handleDealInfo(handleDealInfoParams{
|
|
FillerHandler: func(info UniversalPieceInfo) error {
|
|
return nil // ignore
|
|
},
|
|
BuiltinMarketHandler: func(info UniversalPieceInfo) error {
|
|
if sectorIsDDO {
|
|
return nil // will be passed later in the Commit message
|
|
}
|
|
params.DealIDs = append(params.DealIDs, info.Impl().DealID)
|
|
return nil
|
|
},
|
|
DDOHandler: func(info UniversalPieceInfo) error {
|
|
if nv < MinDDONetworkVersion {
|
|
return xerrors.Errorf("DDO sectors are not supported on network version %d", nv)
|
|
}
|
|
|
|
log.Infow("DDO piece in sector", "sector", sector.SectorNumber, "piece", info.String())
|
|
|
|
sectorIsDDO = true
|
|
|
|
// DDO sectors don't carry DealIDs, we will pass those
|
|
// deals in the Commit message later
|
|
params.DealIDs = nil
|
|
return nil
|
|
},
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, big.Zero(), types.EmptyTSK, xerrors.Errorf("handleDealInfo: %w", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
collateral, err := m.Api.StateMinerPreCommitDepositForPower(ctx.Context(), m.maddr, *params, ts.Key())
|
|
if err != nil {
|
|
return nil, big.Zero(), types.EmptyTSK, xerrors.Errorf("getting initial pledge collateral: %w", err)
|
|
}
|
|
|
|
return params, collateral, ts.Key(), nil
|
|
}
|
|
|
|
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
|
// note: this is a legacy state handler, normally new sectors won't enter this state
|
|
// but we keep this handler in order to not break existing sector state machines.
|
|
// todo: drop after nv21
|
|
return ctx.Send(SectorPreCommitBatch{})
|
|
}
|
|
|
|
func (m *Sealing) handleSubmitPreCommitBatch(ctx statemachine.Context, sector SectorInfo) error {
|
|
if sector.CommD == nil || sector.CommR == nil {
|
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("sector had nil commR or commD")})
|
|
}
|
|
|
|
params, deposit, _, err := m.preCommitInfo(ctx, sector)
|
|
if err != nil {
|
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("preCommitInfo: %w", err)})
|
|
}
|
|
if params == nil {
|
|
return nil // event was sent in preCommitInfo
|
|
}
|
|
|
|
res, err := m.precommiter.AddPreCommit(ctx.Context(), sector, deposit, params)
|
|
if err != nil {
|
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("queuing precommit batch failed: %w", err)})
|
|
}
|
|
|
|
if res.Error != "" {
|
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit batch error: %s", res.Error)})
|
|
}
|
|
|
|
if res.Msg == nil {
|
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("batch message was nil")})
|
|
}
|
|
|
|
return ctx.Send(SectorPreCommitBatchSent{*res.Msg})
|
|
}
|
|
|
|
func (m *Sealing) handlePreCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
|
if sector.PreCommitMessage == nil {
|
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("precommit message was nil")})
|
|
}
|
|
|
|
// would be ideal to just use the events.Called handler, but it wouldn't be able to handle individual message timeouts
|
|
log.Info("Sector precommitted: ", sector.SectorNumber)
|
|
mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage, build.MessageConfidence, api.LookbackNoLimit, true)
|
|
if err != nil {
|
|
return ctx.Send(SectorChainPreCommitFailed{err})
|
|
}
|
|
|
|
switch mw.Receipt.ExitCode {
|
|
case exitcode.Ok:
|
|
// this is what we expect
|
|
case exitcode.SysErrInsufficientFunds:
|
|
fallthrough
|
|
case exitcode.SysErrOutOfGas:
|
|
// gas estimator guessed a wrong number / out of funds:
|
|
return ctx.Send(SectorRetryPreCommit{})
|
|
default:
|
|
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
|
|
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
|
|
return ctx.Send(SectorChainPreCommitFailed{err})
|
|
}
|
|
|
|
log.Info("precommit message landed on chain: ", sector.SectorNumber)
|
|
|
|
return ctx.Send(SectorPreCommitLanded{TipSet: mw.TipSet})
|
|
}
|
|
|
|
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
|
|
ts, err := m.Api.ChainHead(ctx.Context())
|
|
if err != nil {
|
|
log.Errorf("handleWaitSeed: api error, not proceeding: %+v", err)
|
|
return nil
|
|
}
|
|
|
|
pci, err := m.Api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, ts.Key())
|
|
if err != nil {
|
|
return xerrors.Errorf("getting precommit info: %w", err)
|
|
}
|
|
if pci == nil {
|
|
return ctx.Send(SectorChainPreCommitFailed{error: xerrors.Errorf("precommit info not found on chain")})
|
|
}
|
|
|
|
randHeight := pci.PreCommitEpoch + policy.GetPreCommitChallengeDelay()
|
|
|
|
err = m.events.ChainAt(context.Background(), func(ectx context.Context, _ *types.TipSet, curH abi.ChainEpoch) error {
|
|
// in case of null blocks the randomness can land after the tipset we
|
|
// get from the events API
|
|
ts, err := m.Api.ChainHead(ctx.Context())
|
|
if err != nil {
|
|
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
|
return nil
|
|
}
|
|
|
|
buf := new(bytes.Buffer)
|
|
if err := m.maddr.MarshalCBOR(buf); err != nil {
|
|
return err
|
|
}
|
|
rand, err := m.Api.StateGetRandomnessFromBeacon(ectx, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, buf.Bytes(), ts.Key())
|
|
if err != nil {
|
|
err = xerrors.Errorf("failed to get randomness for computing seal proof (ch %d; rh %d; tsk %x): %w", curH, randHeight, ts.Key(), err)
|
|
|
|
_ = ctx.Send(SectorChainPreCommitFailed{error: err})
|
|
return err
|
|
}
|
|
|
|
_ = ctx.Send(SectorSeedReady{SeedValue: abi.InteractiveSealRandomness(rand), SeedEpoch: randHeight})
|
|
|
|
return nil
|
|
}, func(ctx context.Context, ts *types.TipSet) error {
|
|
log.Warn("revert in interactive commit sector step")
|
|
// TODO: need to cancel running process and restart...
|
|
return nil
|
|
}, InteractivePoRepConfidence, randHeight)
|
|
if err != nil {
|
|
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
|
if sector.CommitMessage != nil {
|
|
log.Warnf("sector %d entered committing state with a commit message cid", sector.SectorNumber)
|
|
|
|
ml, err := m.Api.StateSearchMsg(ctx.Context(), types.EmptyTSK, *sector.CommitMessage, api.LookbackNoLimit, true)
|
|
if err != nil {
|
|
log.Warnf("sector %d searching existing commit message %s: %+v", sector.SectorNumber, *sector.CommitMessage, err)
|
|
}
|
|
|
|
if ml != nil {
|
|
// some weird retry paths can lead here
|
|
return ctx.Send(SectorRetryCommitWait{})
|
|
}
|
|
}
|
|
|
|
cfg, err := m.getConfig()
|
|
if err != nil {
|
|
return xerrors.Errorf("getting config: %w", err)
|
|
}
|
|
|
|
if sector.CommD == nil || sector.CommR == nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
|
}
|
|
|
|
var c2in storiface.Commit1Out
|
|
if sector.RemoteCommit1Endpoint == "" {
|
|
// Local Commit1
|
|
cids := storiface.SectorCids{
|
|
Unsealed: *sector.CommD,
|
|
Sealed: *sector.CommR,
|
|
}
|
|
c2in, err = m.sealer.SealCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids)
|
|
if err != nil {
|
|
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(1): %w", err)})
|
|
}
|
|
} else {
|
|
// Remote Commit1
|
|
|
|
reqData := api.RemoteCommit1Params{
|
|
Ticket: sector.TicketValue,
|
|
Seed: sector.SeedValue,
|
|
Unsealed: *sector.CommD,
|
|
Sealed: *sector.CommR,
|
|
ProofType: sector.SectorType,
|
|
}
|
|
reqBody, err := json.Marshal(&reqData)
|
|
if err != nil {
|
|
return xerrors.Errorf("marshaling remote commit1 request: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", sector.RemoteCommit1Endpoint, bytes.NewReader(reqBody))
|
|
if err != nil {
|
|
return ctx.Send(SectorRemoteCommit1Failed{xerrors.Errorf("creating new remote commit1 request: %w", err)})
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req = req.WithContext(ctx.Context())
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return ctx.Send(SectorRemoteCommit1Failed{xerrors.Errorf("requesting remote commit1: %w", err)})
|
|
}
|
|
|
|
defer resp.Body.Close() //nolint:errcheck
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return ctx.Send(SectorRemoteCommit1Failed{xerrors.Errorf("remote commit1 received non-200 http response %s", resp.Status)})
|
|
}
|
|
|
|
c2in, err = io.ReadAll(resp.Body) // todo some len constraint
|
|
if err != nil {
|
|
return ctx.Send(SectorRemoteCommit1Failed{xerrors.Errorf("reading commit1 response: %w", err)})
|
|
}
|
|
}
|
|
|
|
var porepProof storiface.Proof
|
|
|
|
if sector.RemoteCommit2Endpoint == "" {
|
|
// Local Commit2
|
|
|
|
porepProof, err = m.sealer.SealCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), c2in)
|
|
if err != nil {
|
|
log.Errorw("Commit2 error", "error", err)
|
|
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed(2): %w", err)})
|
|
}
|
|
} else {
|
|
// Remote Commit2
|
|
|
|
reqData := api.RemoteCommit2Params{
|
|
ProofType: sector.SectorType,
|
|
Sector: m.minerSectorID(sector.SectorNumber),
|
|
|
|
Commit1Out: c2in,
|
|
}
|
|
reqBody, err := json.Marshal(&reqData)
|
|
if err != nil {
|
|
return xerrors.Errorf("marshaling remote commit2 request: %w", err)
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", sector.RemoteCommit2Endpoint, bytes.NewReader(reqBody))
|
|
if err != nil {
|
|
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("creating new remote commit2 request: %w", err)})
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req = req.WithContext(ctx.Context())
|
|
resp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("requesting remote commit2: %w", err)})
|
|
}
|
|
|
|
defer resp.Body.Close() //nolint:errcheck
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("remote commit2 received non-200 http response %s", resp.Status)})
|
|
}
|
|
|
|
porepProof, err = io.ReadAll(resp.Body) // todo some len constraint
|
|
if err != nil {
|
|
return ctx.Send(SectorRemoteCommit2Failed{xerrors.Errorf("reading commit2 response: %w", err)})
|
|
}
|
|
}
|
|
|
|
{
|
|
ts, err := m.Api.ChainHead(ctx.Context())
|
|
if err != nil {
|
|
log.Errorf("handleCommitting: api error, not proceeding: %+v", err)
|
|
return nil
|
|
}
|
|
|
|
if err := m.checkCommit(ctx.Context(), sector, porepProof, ts.Key()); err != nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
|
|
}
|
|
}
|
|
|
|
if cfg.FinalizeEarly {
|
|
return ctx.Send(SectorProofReady{
|
|
Proof: porepProof,
|
|
})
|
|
}
|
|
|
|
return ctx.Send(SectorCommitted{
|
|
Proof: porepProof,
|
|
})
|
|
}
|
|
|
|
func (m *Sealing) handleSubmitCommit(ctx statemachine.Context, sector SectorInfo) error {
|
|
// like precommit this is a deprecated state, but we keep it around for
|
|
// existing state machines
|
|
// todo: drop after nv21
|
|
return ctx.Send(SectorSubmitCommitAggregate{})
|
|
}
|
|
|
|
// processPieces returns either:
|
|
// - a list of piece activation manifests
|
|
// - a list of deal IDs, if all non-filler pieces are deal-id pieces
|
|
func (m *Sealing) processPieces(ctx context.Context, sector SectorInfo) ([]miner.PieceActivationManifest, []abi.DealID, error) {
|
|
pams := make([]miner.PieceActivationManifest, 0, len(sector.Pieces))
|
|
dealIDs := make([]abi.DealID, 0, len(sector.Pieces))
|
|
var hasDDO bool
|
|
|
|
for _, piece := range sector.Pieces {
|
|
piece := piece
|
|
|
|
// first figure out if this is a ddo sector
|
|
err := piece.handleDealInfo(handleDealInfoParams{
|
|
FillerHandler: func(info UniversalPieceInfo) error {
|
|
// Fillers are implicit (todo review: Are they??)
|
|
return nil
|
|
},
|
|
BuiltinMarketHandler: func(info UniversalPieceInfo) error {
|
|
return nil
|
|
},
|
|
DDOHandler: func(info UniversalPieceInfo) error {
|
|
hasDDO = true
|
|
return nil
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, nil, xerrors.Errorf("handleDealInfo: %w", err)
|
|
}
|
|
}
|
|
for _, piece := range sector.Pieces {
|
|
piece := piece
|
|
|
|
err := piece.handleDealInfo(handleDealInfoParams{
|
|
FillerHandler: func(info UniversalPieceInfo) error {
|
|
// Fillers are implicit (todo review: Are they??)
|
|
return nil
|
|
},
|
|
BuiltinMarketHandler: func(info UniversalPieceInfo) error {
|
|
if hasDDO {
|
|
alloc, err := m.Api.StateGetAllocationIdForPendingDeal(ctx, info.Impl().DealID, types.EmptyTSK)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting allocation for deal %d: %w", info.Impl().DealID, err)
|
|
}
|
|
clid, err := m.Api.StateLookupID(ctx, info.Impl().DealProposal.Client, types.EmptyTSK)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting client address for deal %d: %w", info.Impl().DealID, err)
|
|
}
|
|
|
|
clientId, err := address.IDFromAddress(clid)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting client address for deal %d: %w", info.Impl().DealID, err)
|
|
}
|
|
|
|
var vac *miner2.VerifiedAllocationKey
|
|
if alloc != verifreg.NoAllocationID {
|
|
vac = &miner2.VerifiedAllocationKey{
|
|
Client: abi.ActorID(clientId),
|
|
ID: verifreg13.AllocationId(alloc),
|
|
}
|
|
}
|
|
|
|
payload, err := cborutil.Dump(info.Impl().DealID)
|
|
if err != nil {
|
|
return xerrors.Errorf("serializing deal id: %w", err)
|
|
}
|
|
|
|
pams = append(pams, miner.PieceActivationManifest{
|
|
CID: piece.Piece().PieceCID,
|
|
Size: piece.Piece().Size,
|
|
VerifiedAllocationKey: vac,
|
|
Notify: []miner2.DataActivationNotification{
|
|
{
|
|
Address: market.Address,
|
|
Payload: payload,
|
|
},
|
|
},
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
dealIDs = append(dealIDs, info.Impl().DealID)
|
|
return nil
|
|
},
|
|
DDOHandler: func(info UniversalPieceInfo) error {
|
|
pams = append(pams, *piece.Impl().PieceActivationManifest)
|
|
return nil
|
|
},
|
|
})
|
|
if err != nil {
|
|
return nil, nil, xerrors.Errorf("handleDealInfo: %w", err)
|
|
}
|
|
}
|
|
|
|
return pams, dealIDs, nil
|
|
}
|
|
|
|
func (m *Sealing) handleSubmitCommitAggregate(ctx statemachine.Context, sector SectorInfo) error {
|
|
if sector.CommD == nil || sector.CommR == nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector had nil commR or commD")})
|
|
}
|
|
|
|
pams, dealIDs, err := m.processPieces(ctx.Context(), sector)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
res, err := m.commiter.AddCommit(ctx.Context(), sector, AggregateInput{
|
|
Info: proof.AggregateSealVerifyInfo{
|
|
Number: sector.SectorNumber,
|
|
Randomness: sector.TicketValue,
|
|
InteractiveRandomness: sector.SeedValue,
|
|
SealedCID: *sector.CommR,
|
|
UnsealedCID: *sector.CommD,
|
|
},
|
|
Proof: sector.Proof,
|
|
Spt: sector.SectorType,
|
|
|
|
ActivationManifest: miner2.SectorActivationManifest{
|
|
SectorNumber: sector.SectorNumber,
|
|
Pieces: pams,
|
|
},
|
|
DealIDPrecommit: len(dealIDs) > 0,
|
|
})
|
|
|
|
if err != nil || res.Error != "" {
|
|
ts, err := m.Api.ChainHead(ctx.Context())
|
|
if err != nil {
|
|
log.Errorf("handleSubmitCommit: api error, not proceeding: %+v", err)
|
|
return nil
|
|
}
|
|
|
|
if err := m.checkCommit(ctx.Context(), sector, sector.Proof, ts.Key()); err != nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
|
|
}
|
|
|
|
return ctx.Send(SectorRetrySubmitCommit{})
|
|
}
|
|
|
|
if e, found := res.FailedSectors[sector.SectorNumber]; found {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("sector failed in aggregate processing: %s", e)})
|
|
}
|
|
|
|
if res.Msg == nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("aggregate message was nil")})
|
|
}
|
|
|
|
return ctx.Send(SectorCommitAggregateSent{*res.Msg})
|
|
}
|
|
|
|
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
|
if sector.CommitMessage == nil {
|
|
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorNumber)
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")})
|
|
}
|
|
|
|
mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.CommitMessage, build.MessageConfidence, api.LookbackNoLimit, true)
|
|
if err != nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)})
|
|
}
|
|
|
|
switch mw.Receipt.ExitCode {
|
|
case exitcode.Ok:
|
|
// this is what we expect
|
|
case exitcode.SysErrInsufficientFunds:
|
|
fallthrough
|
|
case exitcode.SysErrOutOfGas:
|
|
// gas estimator guessed a wrong number / out of funds
|
|
return ctx.Send(SectorRetrySubmitCommit{})
|
|
default:
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)})
|
|
}
|
|
|
|
si, err := m.Api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, mw.TipSet)
|
|
if err != nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, calling StateSectorGetInfo: %w", err)})
|
|
}
|
|
if si == nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("proof validation failed, sector not found in sector set after cron")})
|
|
}
|
|
|
|
return ctx.Send(SectorProving{})
|
|
}
|
|
|
|
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
|
|
// TODO: Maybe wait for some finality
|
|
|
|
cfg, err := m.getConfig()
|
|
if err != nil {
|
|
return xerrors.Errorf("getting sealing config: %w", err)
|
|
}
|
|
|
|
if err := m.sealer.ReleaseUnsealed(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(sector.Pieces, false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
|
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("release unsealed: %w", err)})
|
|
}
|
|
|
|
if err := m.sealer.FinalizeSector(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
|
}
|
|
|
|
if cfg.MakeCCSectorsAvailable && !sector.hasData() {
|
|
return ctx.Send(SectorFinalizedAvailable{})
|
|
}
|
|
return ctx.Send(SectorFinalized{})
|
|
}
|