diff --git a/storage/pipeline/receive.go b/storage/pipeline/receive.go index b21829772..a9e56a845 100644 --- a/storage/pipeline/receive.go +++ b/storage/pipeline/receive.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) func (m *Sealing) Receive(ctx context.Context, meta api.RemoteSectorMeta) error { @@ -102,6 +103,19 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta info.CommD = meta.CommD // todo check cid prefixes info.CommR = meta.CommR + if meta.DataSealed == nil { + return SectorInfo{}, xerrors.Errorf("expected DataSealed to be set") + } + if meta.DataCache == nil { + return SectorInfo{}, xerrors.Errorf("expected DataCache to be set") + } + info.RemoteDataSealed = meta.DataSealed + info.RemoteDataCache = meta.DataCache + + // If we get a sector after PC2, assume that we're getting finalized sector data + // todo: maybe only set if C1 provider is set? + info.RemoteDataFinalized = true + fallthrough case GetTicket: fallthrough @@ -118,6 +132,11 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta return SectorInfo{}, xerrors.Errorf("checking pieces: %w", err) } + if meta.DataUnsealed == nil { + return SectorInfo{}, xerrors.Errorf("expected DataUnsealed to be set") + } + info.RemoteDataUnsealed = meta.DataUnsealed + return info, nil default: return SectorInfo{}, xerrors.Errorf("imported sector State in not supported") @@ -125,8 +144,32 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta } func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInfo) error { - // todo fetch stuff - // m.sealer.DownloadSectorData(ctx, m.minerSector(sector.SectorType, sector.SectorNumber), ) - panic("todo") + toFetch := map[storiface.SectorFileType]storiface.SectorData{} + + for fileType, data := range map[storiface.SectorFileType]*storiface.SectorData{ + storiface.FTUnsealed: sector.RemoteDataUnsealed, + storiface.FTSealed: sector.RemoteDataSealed, + storiface.FTCache: sector.RemoteDataCache, + } { + if data == nil { + continue + } + + if data.Local { + // todo check exists + continue + } + + toFetch[fileType] = *data + } + + if len(toFetch) > 0 { + if err := m.sealer.DownloadSectorData(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber), sector.RemoteDataFinalized, toFetch); err != nil { + return xerrors.Errorf("downloading sector data: %w", err) // todo send err event + } + } + + // todo data checks? + return ctx.Send(SectorReceived{}) } diff --git a/storage/pipeline/types.go b/storage/pipeline/types.go index 0321d113f..025b5a768 100644 --- a/storage/pipeline/types.go +++ b/storage/pipeline/types.go @@ -93,6 +93,12 @@ type SectorInfo struct { TerminateMessage *cid.Cid TerminatedAt abi.ChainEpoch + // Remote import + RemoteDataUnsealed *storiface.SectorData + RemoteDataSealed *storiface.SectorData + RemoteDataCache *storiface.SectorData + RemoteDataFinalized bool + // Debug LastErr string