307 lines
11 KiB
Go
307 lines
11 KiB
Go
package sealing
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/filecoin-project/specs-actors/actors/crypto"
|
|
|
|
"github.com/filecoin-project/go-sectorbuilder/fs"
|
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
|
"github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/actors"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
|
)
|
|
|
|
func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) error {
|
|
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
|
|
|
|
var allocated abi.UnpaddedPieceSize
|
|
for _, piece := range sector.Pieces {
|
|
allocated += piece.Size
|
|
}
|
|
|
|
ubytes := abi.PaddedPieceSize(m.sb.SectorSize()).Unpadded()
|
|
|
|
if allocated > ubytes {
|
|
return xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes)
|
|
}
|
|
|
|
fillerSizes, err := fillersFromRem(ubytes - allocated)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(fillerSizes) > 0 {
|
|
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
|
|
}
|
|
|
|
pieces, err := m.pledgeSector(ctx.Context(), sector.SectorID, sector.existingPieces(), fillerSizes...)
|
|
if err != nil {
|
|
return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err)
|
|
}
|
|
|
|
return ctx.Send(SectorPacked{pieces: pieces})
|
|
}
|
|
|
|
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
|
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
|
|
switch err.(type) {
|
|
case *ErrApi:
|
|
log.Errorf("handleUnsealed: api error, not proceeding: %+v", err)
|
|
return nil
|
|
case *ErrInvalidDeals:
|
|
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)})
|
|
case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
|
|
return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired deals in sector: %w", err)})
|
|
default:
|
|
return xerrors.Errorf("checkPieces sanity check error: %w", err)
|
|
}
|
|
}
|
|
|
|
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
|
ticket, err := m.tktFn(ctx.Context())
|
|
if err != nil {
|
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("getting ticket failed: %w", err)})
|
|
}
|
|
|
|
sealed, unsealed, err := m.sb.SealPreCommit(ctx.Context(), sector.SectorID, ticket.TicketBytes, sector.pieceInfos())
|
|
if err != nil {
|
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("seal pre commit failed: %w", err)})
|
|
}
|
|
|
|
return ctx.Send(SectorSealed{
|
|
commD: unsealed,
|
|
commR: sealed,
|
|
ticket: ticket,
|
|
})
|
|
}
|
|
|
|
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
|
if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
|
switch err.(type) {
|
|
case *ErrApi:
|
|
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
|
return nil
|
|
case *ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too)
|
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
|
|
case *ErrExpiredTicket:
|
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
|
|
default:
|
|
return xerrors.Errorf("checkSeal sanity check error: %w", err)
|
|
}
|
|
}
|
|
|
|
params := &miner.SectorPreCommitInfo{
|
|
Expiration: 10000000, // TODO: implement
|
|
SectorNumber: sector.SectorID,
|
|
RegisteredProof: abi.RegisteredProof_StackedDRG32GiBSeal,
|
|
|
|
SealedCID: *sector.CommR,
|
|
SealRandEpoch: sector.Ticket.BlockHeight,
|
|
DealIDs: sector.deals(),
|
|
}
|
|
enc, aerr := actors.SerializeParams(params)
|
|
if aerr != nil {
|
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
|
|
}
|
|
|
|
msg := &types.Message{
|
|
To: m.maddr,
|
|
From: m.worker,
|
|
Method: builtin.MethodsMiner.PreCommitSector,
|
|
Params: enc,
|
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
|
GasPrice: types.NewInt(1),
|
|
}
|
|
|
|
log.Info("submitting precommit for sector: ", sector.SectorID)
|
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
|
if err != nil {
|
|
return ctx.Send(SectorPreCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
|
}
|
|
|
|
return ctx.Send(SectorPreCommitted{message: smsg.Cid()})
|
|
}
|
|
|
|
func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) error {
|
|
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
|
|
log.Info("Sector precommitted: ", sector.SectorID)
|
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.PreCommitMessage)
|
|
if err != nil {
|
|
return ctx.Send(SectorPreCommitFailed{err})
|
|
}
|
|
|
|
if mw.Receipt.ExitCode != 0 {
|
|
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
|
|
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
|
|
return ctx.Send(SectorPreCommitFailed{err})
|
|
}
|
|
log.Info("precommit message landed on chain: ", sector.SectorID)
|
|
|
|
randHeight := mw.TipSet.Height() + miner.PreCommitChallengeDelay - 1 // -1 because of how the messages are applied
|
|
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
|
|
|
|
err = m.events.ChainAt(func(ectx context.Context, ts *types.TipSet, curH abi.ChainEpoch) error {
|
|
rand, err := m.api.ChainGetRandomness(ectx, ts.Key(), crypto.DomainSeparationTag_InteractiveSealChallengeSeed, randHeight, nil)
|
|
if err != nil {
|
|
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
|
|
|
|
ctx.Send(SectorFatalError{error: err})
|
|
return err
|
|
}
|
|
|
|
ctx.Send(SectorSeedReady{seed: SealSeed{
|
|
BlockHeight: randHeight,
|
|
TicketBytes: abi.InteractiveSealRandomness(rand),
|
|
}})
|
|
|
|
return nil
|
|
}, func(ctx context.Context, ts *types.TipSet) error {
|
|
log.Warn("revert in interactive commit sector step")
|
|
// TODO: need to cancel running process and restart...
|
|
return nil
|
|
}, build.InteractivePoRepConfidence, mw.TipSet.Height()+miner.PreCommitChallengeDelay)
|
|
if err != nil {
|
|
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
|
log.Info("scheduling seal proof computation...")
|
|
|
|
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorID, sector.Ticket.TicketBytes, sector.Ticket.BlockHeight, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.pieceInfos(), sector.CommR, sector.CommD)
|
|
|
|
proof, err := m.sb.SealCommit(ctx.Context(), sector.SectorID, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.pieceInfos(), *sector.CommR, *sector.CommD)
|
|
if err != nil {
|
|
return ctx.Send(SectorComputeProofFailed{xerrors.Errorf("computing seal proof failed: %w", err)})
|
|
}
|
|
|
|
// TODO: Consider splitting states and persist proof for faster recovery
|
|
|
|
params := &miner.ProveCommitSectorParams{
|
|
SectorNumber: sector.SectorID,
|
|
Proof: proof,
|
|
}
|
|
|
|
enc, aerr := actors.SerializeParams(params)
|
|
if aerr != nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("could not serialize commit sector parameters: %w", aerr)})
|
|
}
|
|
|
|
msg := &types.Message{
|
|
To: m.maddr,
|
|
From: m.worker,
|
|
Method: builtin.MethodsMiner.ProveCommitSector,
|
|
Params: enc,
|
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
|
GasPrice: types.NewInt(1),
|
|
}
|
|
|
|
// TODO: check seed / ticket are up to date
|
|
|
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
|
if err != nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("pushing message to mpool: %w", err)})
|
|
}
|
|
|
|
return ctx.Send(SectorCommitted{
|
|
proof: proof,
|
|
message: smsg.Cid(),
|
|
})
|
|
}
|
|
|
|
func (m *Sealing) handleCommitWait(ctx statemachine.Context, sector SectorInfo) error {
|
|
if sector.CommitMessage == nil {
|
|
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("entered commit wait with no commit cid")})
|
|
}
|
|
|
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.CommitMessage)
|
|
if err != nil {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("failed to wait for porep inclusion: %w", err)})
|
|
}
|
|
|
|
if mw.Receipt.ExitCode != 0 {
|
|
return ctx.Send(SectorCommitFailed{xerrors.Errorf("submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)})
|
|
}
|
|
|
|
return ctx.Send(SectorProving{})
|
|
}
|
|
|
|
func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorInfo) error {
|
|
// TODO: Maybe wait for some finality
|
|
|
|
if err := m.sb.FinalizeSector(ctx.Context(), sector.SectorID); err != nil {
|
|
if !xerrors.Is(err, fs.ErrNoSuitablePath) {
|
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
|
|
}
|
|
log.Warnf("finalize sector: %v", err)
|
|
}
|
|
|
|
if err := m.sb.DropStaged(ctx.Context(), sector.SectorID); err != nil {
|
|
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("drop staged: %w", err)})
|
|
}
|
|
|
|
return ctx.Send(SectorFinalized{})
|
|
}
|
|
|
|
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
|
// TODO: check if the fault has already been reported, and that this sector is even valid
|
|
|
|
// TODO: coalesce faulty sector reporting
|
|
bf := abi.NewBitField()
|
|
bf.Set(uint64(sector.SectorID))
|
|
|
|
enc, aerr := actors.SerializeParams(&miner.DeclareTemporaryFaultsParams{
|
|
SectorNumbers: bf,
|
|
Duration: 99999999, // TODO: This is very unlikely to be the correct number
|
|
})
|
|
if aerr != nil {
|
|
return xerrors.Errorf("failed to serialize declare fault params: %w", aerr)
|
|
}
|
|
|
|
msg := &types.Message{
|
|
To: m.maddr,
|
|
From: m.worker,
|
|
Method: builtin.MethodsMiner.DeclareTemporaryFaults,
|
|
Params: enc,
|
|
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
|
|
GasLimit: types.NewInt(1000000 /* i dont know help */),
|
|
GasPrice: types.NewInt(1),
|
|
}
|
|
|
|
smsg, err := m.api.MpoolPushMessage(ctx.Context(), msg)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to push declare faults message to network: %w", err)
|
|
}
|
|
|
|
return ctx.Send(SectorFaultReported{reportMsg: smsg.Cid()})
|
|
}
|
|
|
|
func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInfo) error {
|
|
if sector.FaultReportMsg == nil {
|
|
return xerrors.Errorf("entered fault reported state without a FaultReportMsg cid")
|
|
}
|
|
|
|
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.FaultReportMsg)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to wait for fault declaration: %w", err)
|
|
}
|
|
|
|
if mw.Receipt.ExitCode != 0 {
|
|
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
|
|
return xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode)
|
|
}
|
|
|
|
return ctx.Send(SectorFaultedFinal{})
|
|
}
|