diff --git a/cmd/lotus-miner/info.go b/cmd/lotus-miner/info.go index e8cbfd8b7..ecce9c6c2 100644 --- a/cmd/lotus-miner/info.go +++ b/cmd/lotus-miner/info.go @@ -498,6 +498,8 @@ var stateList = []stateMeta{ {col: color.FgGreen, state: sealing.Available}, {col: color.FgGreen, state: sealing.UpdateActivating}, + {col: color.FgMagenta, state: sealing.ReceiveSector}, + {col: color.FgBlue, state: sealing.Empty}, {col: color.FgBlue, state: sealing.WaitDeals}, {col: color.FgBlue, state: sealing.AddPiece}, diff --git a/storage/pipeline/fsm.go b/storage/pipeline/fsm.go index 3df57a501..59e99bc59 100644 --- a/storage/pipeline/fsm.go +++ b/storage/pipeline/fsm.go @@ -40,6 +40,11 @@ func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface } var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) (uint64, error){ + // external import + ReceiveSector: planOne( + onReturning(SectorReceived{}), + ), + // Sealing UndefinedSectorState: planOne( @@ -457,6 +462,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta } switch state.State { + case ReceiveSector: + return m.handleReceiveSector, processed, nil + // Happy path case Empty: fallthrough diff --git a/storage/pipeline/fsm_events.go b/storage/pipeline/fsm_events.go index ac9c5775d..ec87f8e38 100644 --- a/storage/pipeline/fsm_events.go +++ b/storage/pipeline/fsm_events.go @@ -526,3 +526,15 @@ type SectorRemoveFailed struct{ error } func (evt SectorRemoveFailed) FormatError(xerrors.Printer) (next error) { return evt.error } func (evt SectorRemoveFailed) apply(*SectorInfo) {} + +type SectorReceive struct { + State SectorInfo +} + +func (evt SectorReceive) apply(state *SectorInfo) { + *state = evt.State +} + +type SectorReceived struct{} + +func (evt SectorReceived) apply(state *SectorInfo) {} diff --git a/storage/pipeline/receive.go b/storage/pipeline/receive.go index 1d6813ae8..4a70932a5 100644 --- a/storage/pipeline/receive.go +++ b/storage/pipeline/receive.go @@ -2,9 +2,7 @@ package sealing import ( "context" - "errors" - cbg "github.com/whyrusleeping/cbor-gen" - + "github.com/filecoin-project/go-statemachine" "github.com/ipfs/go-datastore" "golang.org/x/xerrors" @@ -15,23 +13,30 @@ import ( ) func (m *Sealing) Receive(ctx context.Context, meta api.RemoteSectorMeta) error { - if err := m.checkSectorMeta(ctx, meta); err != nil { + si, err := m.checkSectorMeta(ctx, meta) + if err != nil { return err } - err := m.sectors.Get(uint64(meta.Sector.Number)).Get(&cbg.Deferred{}) - if errors.Is(err, datastore.ErrNotFound) { - - } else if err != nil { + exists, err := m.sectors.Has(uint64(meta.Sector.Number)) + if err != nil { return xerrors.Errorf("checking if sector exists: %w", err) - } else if err == nil { + } + if exists { return xerrors.Errorf("sector %d state already exists", meta.Sector.Number) } - panic("impl me") + err = m.sectors.Send(uint64(meta.Sector.Number), SectorReceive{ + State: si, + }) + if err != nil { + return xerrors.Errorf("receiving sector: %w", err) + } + + return nil } -func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta) error { +func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta) (SectorInfo, error) { { mid, err := address.IDFromAddress(m.maddr) if err != nil { @@ -39,7 +44,7 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta } if meta.Sector.Miner != abi.ActorID(mid) { - return xerrors.Errorf("sector for wrong actor - expected actor id %d, sector was for actor %d", mid, meta.Sector.Miner) + return SectorInfo{}, xerrors.Errorf("sector for wrong actor - expected actor id %d, sector was for actor %d", mid, meta.Sector.Miner) } } @@ -47,21 +52,21 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta // initial sanity check, doesn't prevent races _, err := m.GetSectorInfo(meta.Sector.Number) if err != nil && !xerrors.Is(err, datastore.ErrNotFound) { - return err + return SectorInfo{}, err } if err == nil { - return xerrors.Errorf("sector with ID %d already exists in the sealing pipeline", meta.Sector.Number) + return SectorInfo{}, xerrors.Errorf("sector with ID %d already exists in the sealing pipeline", meta.Sector.Number) } } { spt, err := m.currentSealProof(ctx) if err != nil { - return err + return SectorInfo{}, err } if meta.Type != spt { - return xerrors.Errorf("sector seal proof type doesn't match current seal proof type (%d!=%d)", meta.Type, spt) + return SectorInfo{}, xerrors.Errorf("sector seal proof type doesn't match current seal proof type (%d!=%d)", meta.Type, spt) } } @@ -101,17 +106,25 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta fallthrough case Packing: // todo check num free - info.State = SectorState(meta.State) // todo dedupe states + info.Return = ReturnState(meta.State) // todo dedupe states + info.State = ReceiveSector + info.SectorNumber = meta.Sector.Number info.Pieces = meta.Pieces info.SectorType = meta.Type if err := checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false); err != nil { - return xerrors.Errorf("checking pieces: %w", err) + return SectorInfo{}, xerrors.Errorf("checking pieces: %w", err) } - return nil + return info, nil default: - return xerrors.Errorf("imported sector State in not supported") + return SectorInfo{}, xerrors.Errorf("imported sector State in not supported") } } + +func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInfo) error { + // todo fetch stuff + + return ctx.Send(SectorReceived{}) +} diff --git a/storage/pipeline/sector_state.go b/storage/pipeline/sector_state.go index 4f81f5544..ad77fdbd5 100644 --- a/storage/pipeline/sector_state.go +++ b/storage/pipeline/sector_state.go @@ -63,6 +63,7 @@ var ExistSectorStateList = map[SectorState]struct{}{ ReleaseSectorKeyFailed: {}, FinalizeReplicaUpdateFailed: {}, AbortUpgrade: {}, + ReceiveSector: {}, } // cmd/lotus-miner/info.go defines CLI colors corresponding to these states @@ -113,6 +114,9 @@ const ( UpdateActivating SectorState = "UpdateActivating" ReleaseSectorKey SectorState = "ReleaseSectorKey" + // external import + ReceiveSector SectorState = "ReceiveSector" + // error modes FailedUnrecoverable SectorState = "FailedUnrecoverable" AddPieceFailed SectorState = "AddPieceFailed" @@ -153,7 +157,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState { switch st { case UndefinedSectorState, Empty, WaitDeals, AddPiece, AddPieceFailed, SnapDealsWaitDeals, SnapDealsAddPiece: return sstStaging - case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate: + case Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, SubmitPreCommitBatch, PreCommitBatchWait, WaitSeed, Committing, CommitFinalize, FinalizeSector, SnapDealsPacking, UpdateReplica, ProveReplicaUpdate, FinalizeReplicaUpdate, ReceiveSector: return sstSealing case SubmitCommit, CommitWait, SubmitCommitAggregate, CommitAggregateWait, SubmitReplicaUpdate, ReplicaUpdateWait: if finEarly { diff --git a/storage/pipeline/types.go b/storage/pipeline/types.go index 4f175ae61..0321d113f 100644 --- a/storage/pipeline/types.go +++ b/storage/pipeline/types.go @@ -86,7 +86,7 @@ type SectorInfo struct { // Faults FaultReportMsg *cid.Cid - // Recovery + // Recovery / Import Return ReturnState // Termination