lotus/storage/sector_states.go

290 lines
9.6 KiB
Go
Raw Normal View History

2019-11-01 13:58:48 +00:00
package storage
import (
"context"
sectorbuilder "github.com/filecoin-project/go-sectorbuilder"
2019-11-01 13:58:48 +00:00
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api"
2019-11-08 20:11:56 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/types"
2019-11-01 13:58:48 +00:00
)
type providerHandlerFunc func(ctx context.Context, deal SectorInfo) *sectorUpdate
2019-11-01 13:58:48 +00:00
func (m *Miner) handleSectorUpdate(ctx context.Context, sector SectorInfo, cb providerHandlerFunc) {
2019-11-01 13:58:48 +00:00
go func() {
update := cb(ctx, sector)
2019-11-01 13:58:48 +00:00
if update == nil {
return // async
2019-11-01 13:58:48 +00:00
}
select {
case m.sectorUpdated <- *update:
2019-11-01 13:58:48 +00:00
case <-m.stop:
}
}()
}
func (m *Miner) handlePacking(ctx context.Context, sector SectorInfo) *sectorUpdate {
2019-11-06 23:09:48 +00:00
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorID)
2019-11-07 18:22:59 +00:00
var allocated uint64
for _, piece := range sector.Pieces {
allocated += piece.Size
}
2019-11-07 18:43:15 +00:00
ubytes := sectorbuilder.UserBytesForSectorSize(m.sb.SectorSize())
if allocated > ubytes {
return sector.upd().fatal(xerrors.Errorf("too much data in sector: %d > %d", allocated, ubytes))
2019-11-07 18:22:59 +00:00
}
2019-11-07 18:43:15 +00:00
fillerSizes, err := fillersFromRem(ubytes - allocated)
2019-11-06 23:09:48 +00:00
if err != nil {
return sector.upd().fatal(err)
2019-11-06 23:09:48 +00:00
}
if len(fillerSizes) > 0 {
log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorID)
}
pieces, err := m.pledgeSector(ctx, sector.SectorID, sector.existingPieces(), fillerSizes...)
2019-11-06 23:09:48 +00:00
if err != nil {
return sector.upd().fatal(xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err))
2019-11-06 23:09:48 +00:00
}
return sector.upd().to(api.Unsealed).state(func(info *SectorInfo) {
2019-11-07 18:22:59 +00:00
info.Pieces = append(info.Pieces, pieces...)
})
2019-11-06 23:09:48 +00:00
}
func (m *Miner) handleUnsealed(ctx context.Context, sector SectorInfo) *sectorUpdate {
2019-11-01 13:58:48 +00:00
log.Infow("performing sector replication...", "sector", sector.SectorID)
2019-11-07 18:22:59 +00:00
ticket, err := m.tktFn(ctx)
if err != nil {
return sector.upd().fatal(err)
2019-11-07 18:22:59 +00:00
}
rspco, err := m.sb.SealPreCommit(sector.SectorID, *ticket, sector.pieceInfos())
2019-11-01 13:58:48 +00:00
if err != nil {
return sector.upd().to(api.SealFailed).error(xerrors.Errorf("seal pre commit failed: %w", err))
2019-11-01 13:58:48 +00:00
}
return sector.upd().to(api.PreCommitting).state(func(info *SectorInfo) {
2019-11-07 18:22:59 +00:00
info.CommD = rspco.CommD[:]
info.CommR = rspco.CommR[:]
2019-11-01 13:58:48 +00:00
info.Ticket = SealTicket{
2019-11-07 18:22:59 +00:00
BlockHeight: ticket.BlockHeight,
TicketBytes: ticket.TicketBytes[:],
2019-11-01 13:58:48 +00:00
}
})
2019-11-01 13:58:48 +00:00
}
func (m *Miner) handlePreCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
2019-11-07 12:03:18 +00:00
params := &actors.SectorPreCommitInfo{
2019-11-01 13:58:48 +00:00
SectorNumber: sector.SectorID,
2019-11-07 12:03:18 +00:00
CommR: sector.CommR,
SealEpoch: sector.Ticket.BlockHeight,
2019-11-07 18:22:59 +00:00
DealIDs: sector.deals(),
2019-11-01 13:58:48 +00:00
}
enc, aerr := actors.SerializeParams(params)
if aerr != nil {
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
2019-11-01 13:58:48 +00:00
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.PreCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
log.Info("submitting precommit for sector: ", sector.SectorID)
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return sector.upd().to(api.PreCommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
2019-11-01 13:58:48 +00:00
}
return sector.upd().to(api.PreCommitted).state(func(info *SectorInfo) {
2019-11-01 13:58:48 +00:00
mcid := smsg.Cid()
info.PreCommitMessage = &mcid
})
2019-11-01 13:58:48 +00:00
}
func (m *Miner) handlePreCommitted(ctx context.Context, sector SectorInfo) *sectorUpdate {
2019-11-01 13:58:48 +00:00
// would be ideal to just use the events.Called handler, but it wouldnt be able to handle individual message timeouts
log.Info("Sector precommitted: ", sector.SectorID)
2019-11-01 13:58:48 +00:00
mw, err := m.api.StateWaitMsg(ctx, *sector.PreCommitMessage)
if err != nil {
return sector.upd().to(api.PreCommitFailed).error(err)
2019-11-01 13:58:48 +00:00
}
if mw.Receipt.ExitCode != 0 {
log.Error("sector precommit failed: ", mw.Receipt.ExitCode)
err := xerrors.Errorf("sector precommit failed: %d", mw.Receipt.ExitCode)
return sector.upd().to(api.PreCommitFailed).error(err)
2019-11-01 13:58:48 +00:00
}
log.Info("precommit message landed on chain: ", sector.SectorID)
2019-11-01 13:58:48 +00:00
2019-11-02 15:07:18 +00:00
randHeight := mw.TipSet.Height() + build.InteractivePoRepDelay - 1 // -1 because of how the messages are applied
2019-11-05 14:03:59 +00:00
log.Infof("precommit for sector %d made it on chain, will start proof computation at height %d", sector.SectorID, randHeight)
2019-11-01 13:58:48 +00:00
updateNonce := sector.Nonce
2019-11-05 14:03:59 +00:00
err = m.events.ChainAt(func(ctx context.Context, ts *types.TipSet, curH uint64) error {
rand, err := m.api.ChainGetRandomness(ctx, ts.Key(), int64(randHeight))
2019-11-08 18:15:13 +00:00
if err != nil {
err = xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)
2019-11-08 18:15:13 +00:00
m.sectorUpdated <- *sector.upd().fatal(err)
return err
2019-11-01 13:58:48 +00:00
}
m.sectorUpdated <- *sector.upd().to(api.Committing).setNonce(updateNonce).state(func(info *SectorInfo) {
info.Seed = SealSeed{
BlockHeight: randHeight,
TicketBytes: rand,
}
})
updateNonce++
2019-11-01 13:58:48 +00:00
return nil
2019-11-05 14:03:59 +00:00
}, func(ctx context.Context, ts *types.TipSet) error {
2019-11-01 13:58:48 +00:00
log.Warn("revert in interactive commit sector step")
// TODO: need to cancel running process and restart...
2019-11-01 13:58:48 +00:00
return nil
}, build.InteractivePoRepConfidence, mw.TipSet.Height()+build.InteractivePoRepDelay)
2019-11-01 13:58:48 +00:00
if err != nil {
log.Warn("waitForPreCommitMessage ChainAt errored: ", err)
}
return nil
2019-11-01 13:58:48 +00:00
}
func (m *Miner) handleCommitting(ctx context.Context, sector SectorInfo) *sectorUpdate {
2019-11-01 13:58:48 +00:00
log.Info("scheduling seal proof computation...")
proof, err := m.sb.SealCommit(sector.SectorID, sector.Ticket.SB(), sector.Seed.SB(), sector.pieceInfos(), sector.rspco())
if err != nil {
return sector.upd().to(api.SealCommitFailed).error(xerrors.Errorf("computing seal proof failed: %w", err))
}
2019-11-08 18:15:13 +00:00
// TODO: Consider splitting states and persist proof for faster recovery
2019-11-01 13:58:48 +00:00
params := &actors.SectorProveCommitInfo{
Proof: proof,
SectorID: sector.SectorID,
2019-11-07 18:22:59 +00:00
DealIDs: sector.deals(),
2019-11-01 13:58:48 +00:00
}
enc, aerr := actors.SerializeParams(params)
2019-11-01 13:58:48 +00:00
if aerr != nil {
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("could not serialize commit sector parameters: %w", aerr))
2019-11-01 13:58:48 +00:00
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.ProveCommitSector,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("pushing message to mpool: %w", err))
2019-11-01 13:58:48 +00:00
}
// TODO: Separate state before this wait, so we persist message cid?
return sector.upd().to(api.CommitWait).state(func(info *SectorInfo) {
mcid := smsg.Cid()
info.CommitMessage = &mcid
info.Proof = proof
})
}
2019-11-01 13:58:48 +00:00
func (m *Miner) handleCommitWait(ctx context.Context, sector SectorInfo) *sectorUpdate {
if sector.CommitMessage == nil {
log.Errorf("sector %d entered commit wait state without a message cid", sector.SectorID)
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("entered commit wait with no commit cid"))
}
mw, err := m.api.StateWaitMsg(ctx, *sector.CommitMessage)
2019-11-01 13:58:48 +00:00
if err != nil {
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for porep inclusion: %w", err))
2019-11-01 13:58:48 +00:00
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: submitting sector proof failed (exit=%d, msg=%s) (t:%x; s:%x(%d); p:%x)", mw.Receipt.ExitCode, sector.CommitMessage, sector.Ticket.TicketBytes, sector.Seed.TicketBytes, sector.Seed.BlockHeight, sector.Proof)
2019-12-08 22:39:08 +00:00
return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting sector proof failed (exit: %d)", mw.Receipt.ExitCode))
2019-11-01 13:58:48 +00:00
}
return sector.upd().to(api.Proving).state(func(info *SectorInfo) {
})
2019-11-01 13:58:48 +00:00
}
func (m *Miner) handleFaulty(ctx context.Context, sector SectorInfo) *sectorUpdate {
// TODO: check if the fault has already been reported, and that this sector is even valid
// TODO: coalesce faulty sector reporting
bf := types.NewBitField()
bf.Set(sector.SectorID)
fp := &actors.DeclareFaultsParams{bf}
_ = fp
enc, aerr := actors.SerializeParams(nil)
if aerr != nil {
return sector.upd().fatal(xerrors.Errorf("failed to serialize declare fault params: %w", aerr))
}
msg := &types.Message{
To: m.maddr,
From: m.worker,
Method: actors.MAMethods.DeclareFaults,
Params: enc,
Value: types.NewInt(0), // TODO: need to ensure sufficient collateral
GasLimit: types.NewInt(1000000 /* i dont know help */),
GasPrice: types.NewInt(1),
}
smsg, err := m.api.MpoolPushMessage(ctx, msg)
if err != nil {
return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("failed to push declare faults message to network: %w", err))
}
return sector.upd().to(api.FaultReported).state(func(info *SectorInfo) {
c := smsg.Cid()
info.FaultReportMsg = &c
})
}
func (m *Miner) handleFaultReported(ctx context.Context, sector SectorInfo) *sectorUpdate {
if sector.FaultReportMsg == nil {
return sector.upd().to(api.FailedUnrecoverable).error(xerrors.Errorf("entered fault reported state without a FaultReportMsg cid"))
}
mw, err := m.api.StateWaitMsg(ctx, *sector.FaultReportMsg)
if err != nil {
return sector.upd().to(api.CommitFailed).error(xerrors.Errorf("failed to wait for fault declaration: %w", err))
}
if mw.Receipt.ExitCode != 0 {
log.Errorf("UNHANDLED: declaring sector fault failed (exit=%d, msg=%s) (id: %d)", mw.Receipt.ExitCode, *sector.FaultReportMsg, sector.SectorID)
return sector.upd().fatal(xerrors.Errorf("UNHANDLED: submitting fault declaration failed (exit %d)", mw.Receipt.ExitCode))
}
return sector.upd().to(api.FaultedFinal).state(func(info *SectorInfo) {})
}