lotus/storage/sealing/states.go

296 lines
11 KiB
Go
Raw Normal View History

package sealing
2019-11-01 13:58:48 +00:00
import (
"bytes"
2019-11-01 13:58:48 +00:00
"context"
2020-02-27 00:42:39 +00:00
"golang.org/x/xerrors"
2020-04-06 20:23:37 +00:00
"github.com/filecoin-project/go-statemachine"
2020-02-08 02:18:32 +00:00
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/abi/big"
2020-02-12 07:44:20 +00:00
"github.com/filecoin-project/specs-actors/actors/builtin"
2020-02-11 03:31:28 +00:00
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/filecoin-project/specs-storage/storage"
2019-11-01 13:58:48 +00:00
)
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
2019-11-06 23:09:48 +00:00
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
2020-02-08 02:18:32 +00:00
var allocated abi.UnpaddedPieceSize
2019-11-07 18:22:59 +00:00
for _, piece := range sector.Pieces {
allocated += piece.Size
}
2020-03-03 22:19:22 +00:00
ubytes := abi.PaddedPieceSize(m.sealer.SectorSize()).Unpadded()
2019-11-07 18:43:15 +00:00
if allocated > ubytes {
2020-01-10 02:11:00 +00:00
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
2019-11-07 18:22:59 +00:00
}
2019-11-07 18:43:15 +00:00
fillerSizes, err := fillersFromRem(ubytes - allocated)
2019-11-06 23:09:48 +00:00
if err != nil {
2020-01-10 02:11:00 +00:00
return err
2019-11-06 23:09:48 +00:00
}
if len(fillerSizes) > 0 {
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
}
2020-03-17 20:19:52 +00:00
pieces, err := m.pledgeSector(ctx.Context(), m.minerSector(sector.SectorID), sector.existingPieces(), fillerSizes...)
2019-11-06 23:09:48 +00:00
if err != nil {
2020-01-10 02:11:00 +00:00
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
2019-11-06 23:09:48 +00:00
}
2020-03-22 20:44:27 +00:00
return ctx.Send(SectorPacked{Pieces: pieces})
2019-11-06 23:09:48 +00:00
}
2020-04-03 16:54:01 +00:00
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
switch err.(type) {
2020-01-23 16:02:55 +00:00
case *ErrApi:
2020-04-03 16:54:01 +00:00
log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err)
return nil
2020-01-23 16:02:55 +00:00
case *ErrInvalidDeals:
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)})
2020-01-23 16:02:55 +00:00
case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired deals in sector: %w", err)})
default:
return xerrors.Errorf("checkPieces sanity check error: %w", err)
}
}
2019-11-01 13:58:48 +00:00
log.Infow("performing sector replication...", "sector", sector.SectorID)
ticketValue, ticketEpoch, err := m.tktFn(ctx.Context())
2019-11-07 18:22:59 +00:00
if err != nil {
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("getting ticket failed: %w", err)})
2019-11-07 18:22:59 +00:00
}
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorID), ticketValue, sector.pieceInfos())
2020-03-03 22:19:22 +00:00
if err != nil {
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
2020-03-03 22:19:22 +00:00
}
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorPreCommit1{
PreCommit1Out: pc1o,
TicketValue: ticketValue,
TicketEpoch: ticketEpoch,
2020-04-03 16:54:01 +00:00
})
}
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
cids, err := m.sealer.SealPreCommit2(ctx.Context(), m.minerSector(sector.SectorID), sector.PreCommit1Out)
2019-11-01 13:58:48 +00:00
if err != nil {
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
2019-11-01 13:58:48 +00:00
}
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorPreCommit2{
2020-03-22 20:44:27 +00:00
Unsealed: cids.Unsealed,
Sealed: cids.Sealed,
})
2019-11-01 13:58:48 +00:00
}
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
if err := checkPrecommit(ctx.Context(), m.Address(), sector, m.api); err != nil {
switch err.(type) {
2020-01-23 16:02:55 +00:00
case *ErrApi:
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
return nil
2020-04-03 16:54:01 +00:00
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handlePreCommit1 will do that too)
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("bad CommD error: %w", err)})
2020-01-23 16:02:55 +00:00
case *ErrExpiredTicket:
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired: %w", err)})
default:
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
}
}
2020-02-14 00:24:24 +00:00
params := &miner.SectorPreCommitInfo{
2020-02-23 15:50:36 +00:00
Expiration: 10000000, // TODO: implement
SectorNumber: sector.SectorID,
2020-02-28 20:52:14 +00:00
RegisteredProof: sector.SectorType,
2020-02-12 00:58:55 +00:00
2020-02-27 00:42:39 +00:00
SealedCID: *sector.CommR,
SealRandEpoch: sector.TicketEpoch,
2020-02-23 20:32:14 +00:00
DealIDs: sector.deals(),
2020-02-14 21:38:30 +00:00
}
2019-11-01 13:58:48 +00:00
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("could not serialize pre-commit sector parameters: %w", err)})
2019-11-01 13:58:48 +00:00
}
log.Info("submitting precommit for sector: ", sector.SectorID)
mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.PreCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
2019-11-01 13:58:48 +00:00
if err != nil {
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
2019-11-01 13:58:48 +00:00
}
return ctx.Send(SectorPreCommitted{Message: mcid})
2019-11-01 13:58:48 +00:00
}
2020-01-20 22:04:46 +00:00
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
2019-11-01 13:58:48 +00:00
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
log.Info("Sector precommitted: ", sector.SectorID)
2020-01-10 02:11:00 +00:00
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
2019-11-01 13:58:48 +00:00
if err != nil {
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorChainPreCommitFailed{err})
2019-11-01 13:58:48 +00:00
}
if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
2020-04-03 16:54:01 +00:00
return ctx.Send(SectorChainPreCommitFailed{err})
2019-11-01 13:58:48 +00:00
}
log.Info("precommit message landed on chain: ", sector.SectorID)
2019-11-01 13:58:48 +00:00
2020-04-06 20:23:37 +00:00
pci, err := m.api.StateGetSectorPreCommitOnChainInfo(ctx.Context(), m.maddr, sector.SectorID, mw.TipSetTok)
if err != nil {
return xerrors.Errorf("getting precommit info: %w", err)
}
randHeight := pci.PreCommitEpoch + miner.PreCommitChallengeDelay
2019-11-01 13:58:48 +00:00
err = m.events.ChainAt(func(ectx context.Context, tok TipSetToken, curH abi.ChainEpoch) error {
rand, err := m.api.ChainGetRandomness(ectx, tok, crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, nil)
2019-11-08 18:15:13 +00:00
if err != nil {
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
2019-11-08 18:15:13 +00:00
2020-04-06 20:23:37 +00:00
_ = ctx.Send(SectorFatalError{error: err})
return err
2019-11-01 13:58:48 +00:00
}
2020-04-06 20:23:37 +00:00
_ = ctx.Send(SectorSeedReady{SeedValue: abi.InteractiveSealRandomness(rand), SeedEpoch: randHeight})
2019-11-01 13:58:48 +00:00
return nil
}, func(ctx context.Context, ts TipSetToken) error {
2019-11-01 13:58:48 +00:00
log.Warn("revert in interactive commit sector step")
// TODO: need to cancel running process and restart...
2019-11-01 13:58:48 +00:00
return nil
2020-04-06 20:23:37 +00:00
}, InteractivePoRepConfidence, randHeight)
2019-11-01 13:58:48 +00:00
if err != nil {
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
}
return nil
2019-11-01 13:58:48 +00:00
}
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
2019-11-01 13:58:48 +00:00
log.Info("scheduling seal proof computation...")
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
2020-02-23 20:32:14 +00:00
2020-03-17 20:19:52 +00:00
cids := storage.SectorCids{
Unsealed: *sector.CommD,
Sealed: *sector.CommR,
}
c2in, err := m.sealer.SealCommit1(ctx.Context(), m.minerSector(sector.SectorID), sector.TicketValue, sector.SeedValue, sector.pieceInfos(), cids)
2020-03-03 22:19:22 +00:00
if err != nil {
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
2020-03-17 20:19:52 +00:00
proof, err := m.sealer.SealCommit2(ctx.Context(), m.minerSector(sector.SectorID), c2in)
if err != nil {
2020-01-20 22:04:46 +00:00
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
}
if err := m.checkCommit(ctx.Context(), sector, proof); err != nil {
2020-04-04 01:50:05 +00:00
return ctx.Send(SectorCommitFailed{xerrors.Errorf("commit check error: %w", err)})
}
2019-11-08 18:15:13 +00:00
// TODO: Consider splitting states and persist proof for faster recovery
2020-02-12 07:44:20 +00:00
params := &miner.ProveCommitSectorParams{
SectorNumber: sector.SectorID,
Proof: proof,
2020-02-12 07:44:20 +00:00
}
2019-11-01 13:58:48 +00:00
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", err)})
2019-11-01 13:58:48 +00:00
}
// TODO: check seed / ticket are up to date
mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.ProveCommitSector, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
2019-11-01 13:58:48 +00:00
if err != nil {
2020-01-10 02:11:00 +00:00
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
2019-11-01 13:58:48 +00:00
}
2020-01-10 02:11:00 +00:00
return ctx.Send(SectorCommitted{
2020-03-22 20:44:27 +00:00
Proof: proof,
Message: mcid,
})
}
2019-11-01 13:58:48 +00:00
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
if sector.CommitMessage == nil {
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
2020-01-10 02:11:00 +00:00
return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")})
}
2020-01-10 02:11:00 +00:00
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.CommitMessage)
2019-11-01 13:58:48 +00:00
if err != nil {
2020-01-10 02:11:00 +00:00
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)})
2019-11-01 13:58:48 +00:00
}
if mw.Receipt.ExitCode != 0 {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.TicketValue, sector.SeedValue, sector.SeedEpoch, sector.Proof)})
2019-11-01 13:58:48 +00:00
}
2020-01-10 02:11:00 +00:00
return ctx.Send(SectorProving{})
2019-11-01 13:58:48 +00:00
}
2020-01-29 21:25:06 +00:00
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
2020-01-29 22:37:31 +00:00
// TODO: Maybe wait for some finality
2020-03-17 20:19:52 +00:00
if err := m.sealer.FinalizeSector(ctx.Context(), m.minerSector(sector.SectorID)); err != nil {
2020-03-03 22:19:22 +00:00
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
2020-01-29 21:25:06 +00:00
}
return ctx.Send(SectorFinalized{})
}
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
// TODO: check if the fault has already been reported, and that this sector is even valid
// TODO: coalesce faulty sector reporting
2020-02-14 00:24:24 +00:00
bf := abi.NewBitField()
bf.Set(uint64(sector.SectorID))
params := &miner.DeclareTemporaryFaultsParams{
2020-02-14 00:24:24 +00:00
SectorNumbers: bf,
2020-02-12 07:44:20 +00:00
Duration: 99999999, // TODO: This is very unlikely to be the correct number
}
enc := new(bytes.Buffer)
if err := params.MarshalCBOR(enc); err != nil {
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to serialize declare fault params: %w", err)})
}
mcid, err := m.api.SendMsg(ctx.Context(), m.worker, m.maddr, builtin.MethodsMiner.DeclareTemporaryFaults, big.NewInt(0), big.NewInt(1), 1000000, enc.Bytes())
if err != nil {
2020-01-10 02:11:00 +00:00
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
}
return ctx.Send(SectorFaultReported{reportMsg: mcid})
}
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
if sector.FaultReportMsg == nil {
2020-01-10 02:11:00 +00:00
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
}
2020-01-10 02:11:00 +00:00
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
if err != nil {
2020-01-10 02:11:00 +00:00
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
2020-01-10 02:11:00 +00:00
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
}
2020-01-10 02:11:00 +00:00
return ctx.Send(SectorFaultedFinal{})
}