sector import: Put the imported sector into the sealing pipeline

This commit is contained in:
Łukasz Magiera 2022-08-31 12:43:40 +02:00
parent 2b644525f8
commit 39e4845f42
6 changed files with 61 additions and 22 deletions

View File

@ -498,6 +498,8 @@ var stateList = []stateMeta{
{col: color.FgGreen, state: sealing.Available}, {col: color.FgGreen, state: sealing.Available},
{col: color.FgGreen, state: sealing.UpdateActivating}, {col: color.FgGreen, state: sealing.UpdateActivating},
{col: color.FgMagenta, state: sealing.ReceiveSector},
{col: color.FgBlue, state: sealing.Empty}, {col: color.FgBlue, state: sealing.Empty},
{col: color.FgBlue, state: sealing.WaitDeals}, {col: color.FgBlue, state: sealing.WaitDeals},
{col: color.FgBlue, state: sealing.AddPiece}, {col: color.FgBlue, state: sealing.AddPiece},

View File

@ -40,6 +40,11 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface
} }
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){
// external import
ReceiveSector: planOne(
onReturning(SectorReceived{}),
),
// Sealing // Sealing
UndefinedSectorState: planOne( UndefinedSectorState: planOne(
@ -457,6 +462,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
} }
switch state.State { switch state.State {
case ReceiveSector:
return m.handleReceiveSector, processed, nil
// Happy path // Happy path
case Empty: case Empty:
fallthrough fallthrough

View File

@ -526,3 +526,15 @@ type SectorRemoveFailed struct{ error }
func (evt SectorRemoveFailed) FormatError(xerrors.Printer) (next error) { return evt.error } func (evt SectorRemoveFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
func (evt SectorRemoveFailed) apply(*SectorInfo) {} func (evt SectorRemoveFailed) apply(*SectorInfo) {}
type SectorReceive struct {
State SectorInfo
}
func (evt SectorReceive) apply(state *SectorInfo) {
*state = evt.State
}
type SectorReceived struct{}
func (evt SectorReceived) apply(state *SectorInfo) {}

View File

@ -2,9 +2,7 @@ package sealing
import ( import (
"context" "context"
"errors" "github.com/filecoin-project/go-statemachine"
cbg "github.com/whyrusleeping/cbor-gen"
"github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -15,23 +13,30 @@ import (
) )
func (m *Sealing) Receive(ctx context.Context, meta api.RemoteSectorMeta) error { func (m *Sealing) Receive(ctx context.Context, meta api.RemoteSectorMeta) error {
if err := m.checkSectorMeta(ctx, meta); err != nil { si, err := m.checkSectorMeta(ctx, meta)
if err != nil {
return err return err
} }
err := m.sectors.Get(uint64(meta.Sector.Number)).Get(&cbg.Deferred{}) exists, err := m.sectors.Has(uint64(meta.Sector.Number))
if errors.Is(err, datastore.ErrNotFound) { if err != nil {
} else if err != nil {
return xerrors.Errorf("checking if sector exists: %w", err) return xerrors.Errorf("checking if sector exists: %w", err)
} else if err == nil { }
if exists {
return xerrors.Errorf("sector %d state already exists", meta.Sector.Number) return xerrors.Errorf("sector %d state already exists", meta.Sector.Number)
} }
panic("impl me") err = m.sectors.Send(uint64(meta.Sector.Number), SectorReceive{
State: si,
})
if err != nil {
return xerrors.Errorf("receiving sector: %w", err)
}
return nil
} }
func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta) error { func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta) (SectorInfo, error) {
{ {
mid, err := address.IDFromAddress(m.maddr) mid, err := address.IDFromAddress(m.maddr)
if err != nil { if err != nil {
@ -39,7 +44,7 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
} }
if meta.Sector.Miner != abi.ActorID(mid) { if meta.Sector.Miner != abi.ActorID(mid) {
return xerrors.Errorf("sector for wrong actor - expected actor id %d, sector was for actor %d", mid, meta.Sector.Miner) return SectorInfo{}, xerrors.Errorf("sector for wrong actor - expected actor id %d, sector was for actor %d", mid, meta.Sector.Miner)
} }
} }
@ -47,21 +52,21 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
// initial sanity check, doesn't prevent races // initial sanity check, doesn't prevent races
_, err := m.GetSectorInfo(meta.Sector.Number) _, err := m.GetSectorInfo(meta.Sector.Number)
if err != nil && !xerrors.Is(err, datastore.ErrNotFound) { if err != nil && !xerrors.Is(err, datastore.ErrNotFound) {
return err return SectorInfo{}, err
} }
if err == nil { if err == nil {
return xerrors.Errorf("sector with ID %d already exists in the sealing pipeline", meta.Sector.Number) return SectorInfo{}, xerrors.Errorf("sector with ID %d already exists in the sealing pipeline", meta.Sector.Number)
} }
} }
{ {
spt, err := m.currentSealProof(ctx) spt, err := m.currentSealProof(ctx)
if err != nil { if err != nil {
return err return SectorInfo{}, err
} }
if meta.Type != spt { if meta.Type != spt {
return xerrors.Errorf("sector seal proof type doesn't match current seal proof type (%d!=%d)", meta.Type, spt) return SectorInfo{}, xerrors.Errorf("sector seal proof type doesn't match current seal proof type (%d!=%d)", meta.Type, spt)
} }
} }
@ -101,17 +106,25 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta
fallthrough fallthrough
case Packing: case Packing:
// todo check num free // todo check num free
info.State = SectorState(meta.State) // todo dedupe states info.Return = ReturnState(meta.State) // todo dedupe states
info.State = ReceiveSector
info.SectorNumber = meta.Sector.Number info.SectorNumber = meta.Sector.Number
info.Pieces = meta.Pieces info.Pieces = meta.Pieces
info.SectorType = meta.Type info.SectorType = meta.Type
if err := checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false); err != nil { if err := checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false); err != nil {
return xerrors.Errorf("checking pieces: %w", err) return SectorInfo{}, xerrors.Errorf("checking pieces: %w", err)
} }
return nil return info, nil
default: default:
return xerrors.Errorf("imported sector State in not supported") return SectorInfo{}, xerrors.Errorf("imported sector State in not supported")
} }
} }
func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInfo) error {
// todo fetch stuff
return ctx.Send(SectorReceived{})
}

View File

@ -63,6 +63,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
ReleaseSectorKeyFailed: {}, ReleaseSectorKeyFailed: {},
FinalizeReplicaUpdateFailed: {}, FinalizeReplicaUpdateFailed: {},
AbortUpgrade: {}, AbortUpgrade: {},
ReceiveSector: {},
} }
// cmd/lotus-miner/info.go defines CLI colors corresponding to these states // cmd/lotus-miner/info.go defines CLI colors corresponding to these states
@ -113,6 +114,9 @@ const (
UpdateActivating SectorState = "UpdateActivating" UpdateActivating SectorState = "UpdateActivating"
ReleaseSectorKey SectorState = "ReleaseSectorKey" ReleaseSectorKey SectorState = "ReleaseSectorKey"
// external import
ReceiveSector SectorState = "ReceiveSector"
// error modes // error modes
FailedUnrecoverable SectorState = "FailedUnrecoverable" FailedUnrecoverable SectorState = "FailedUnrecoverable"
AddPieceFailed SectorState = "AddPieceFailed" AddPieceFailed SectorState = "AddPieceFailed"
@ -153,7 +157,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
switch st { switch st {
case UndefinedSectorState, Empty, WaitDeals, AddPiece, AddPieceFailed, SnapDealsWaitDeals, SnapDealsAddPiece: case UndefinedSectorState, Empty, WaitDeals, AddPiece, AddPieceFailed, SnapDealsWaitDeals, SnapDealsAddPiece:
return sstStaging return sstStaging
case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate: case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate, ReceiveSector:
return sstSealing return sstSealing
case SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, SubmitReplicaUpdate, ReplicaUpdateWait: case SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, SubmitReplicaUpdate, ReplicaUpdateWait:
if finEarly { if finEarly {

View File

@ -86,7 +86,7 @@ type SectorInfo struct {
// Faults // Faults
FaultReportMsg *cid.Cid FaultReportMsg *cid.Cid
// Recovery // Recovery / Import
Return ReturnState Return ReturnState
// Termination // Termination