diff --git a/api/api_storage.go b/api/api_storage.go index 32309f388..7ae95cde2 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -3,7 +3,6 @@ package api import ( "bytes" "context" - "net/http" "time" "github.com/google/uuid" @@ -170,6 +169,7 @@ type StorageMiner interface { ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error //perm:admin retry:true + ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true // SealingSchedDiag dumps internal sealing scheduler state @@ -563,11 +563,11 @@ type RemoteSectorMeta struct { // Sector urls - lotus will use those for fetching files into local storage // Required in all states - DataUnsealed *SectorData + DataUnsealed *storiface.SectorData // Required in PreCommitting and later - DataSealed *SectorData - DataCache *SectorData + DataSealed *storiface.SectorData + DataCache *storiface.SectorData //////// // SEALING SERVICE HOOKS @@ -575,20 +575,3 @@ type RemoteSectorMeta struct { // todo Commit1Provider // todo OnDone / OnStateChange } - -type SectorData struct { - // Local when set to true indicates to lotus that sector data is already - // available locally; When set lotus will skip fetching sector data, and - // only check that sector data exists in sector storage - Local bool - - // URL to the sector data - // For sealed/unsealed sector, lotus expects octet-stream - // For cache, lotus expects a tar archive with cache files (todo maybe use not-tar; specify what files with what paths must be present) - // Valid schemas: - // - http:// / https:// - URL string - - // optional http headers to use when requesting sector data - Headers http.Header -} diff --git a/api/api_worker.go b/api/api_worker.go index 609cb4271..4b56d1154 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -49,6 +49,7 @@ type Worker interface { MoveStorage(ctx context.Context, sector storiface.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin UnsealPiece(context.Context, storiface.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin Fetch(context.Context, storiface.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) //perm:admin + DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) //perm:admin GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) //perm:admin GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) //perm:admin diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 47192bfd9..e078cbe0f 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -344,6 +344,14 @@ func init() { addExample(http.Header{ "Authorization": []string{"Bearer ey.."}, }) + + addExample(map[storiface.SectorFileType]storiface.SectorData{ + storiface.FTSealed: { + Local: false, + URL: "https://example.com/sealingservice/sectors/s-f0123-12345", + Headers: nil, + }, + }) } func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) { diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 6cb86ce54..04e79c2de 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -778,6 +778,8 @@ type StorageMinerStruct struct { ReturnDataCid func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"` + ReturnDownloadSector func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"` + ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"` ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"` @@ -953,6 +955,8 @@ type WorkerStruct struct { DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storiface.Data) (storiface.CallID, error) `perm:"admin"` + DownloadSectorData func(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) `perm:"admin"` + Enabled func(p0 context.Context) (bool, error) `perm:"admin"` Fetch func(p0 context.Context, p1 storiface.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"` @@ -4667,6 +4671,17 @@ func (s *StorageMinerStub) ReturnDataCid(p0 context.Context, p1 storiface.CallID return ErrNotSupported } +func (s *StorageMinerStruct) ReturnDownloadSector(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error { + if s.Internal.ReturnDownloadSector == nil { + return ErrNotSupported + } + return s.Internal.ReturnDownloadSector(p0, p1, p2) +} + +func (s *StorageMinerStub) ReturnDownloadSector(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) ReturnFetch(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error { if s.Internal.ReturnFetch == nil { return ErrNotSupported @@ -5536,6 +5551,17 @@ func (s *WorkerStub) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 st return *new(storiface.CallID), ErrNotSupported } +func (s *WorkerStruct) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) { + if s.Internal.DownloadSectorData == nil { + return *new(storiface.CallID), ErrNotSupported + } + return s.Internal.DownloadSectorData(p0, p1, p2, p3) +} + +func (s *WorkerStub) DownloadSectorData(p0 context.Context, p1 storiface.SectorRef, p2 bool, p3 map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) { + return *new(storiface.CallID), ErrNotSupported +} + func (s *WorkerStruct) Enabled(p0 context.Context) (bool, error) { if s.Internal.Enabled == nil { return false, ErrNotSupported diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 4d823b135..942fb190a 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index 570715c79..b3db3736c 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 4912f8306..2f9273ea7 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index f0d2db3c6..2a35ab134 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 8960857bc..4034c4407 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -113,6 +113,7 @@ * [Return](#Return) * [ReturnAddPiece](#ReturnAddPiece) * [ReturnDataCid](#ReturnDataCid) + * [ReturnDownloadSector](#ReturnDownloadSector) * [ReturnFetch](#ReturnFetch) * [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate) * [ReturnFinalizeSector](#ReturnFinalizeSector) @@ -2389,6 +2390,30 @@ Inputs: Response: `{}` +### ReturnDownloadSector + + +Perms: admin + +Inputs: +```json +[ + { + "Sector": { + "Miner": 1000, + "Number": 9 + }, + "ID": "07070707-0707-0707-0707-070707070707" + }, + { + "Code": 0, + "Message": "string value" + } +] +``` + +Response: `{}` + ### ReturnFetch diff --git a/documentation/en/api-v0-methods-worker.md b/documentation/en/api-v0-methods-worker.md index 554f8666d..e53211089 100644 --- a/documentation/en/api-v0-methods-worker.md +++ b/documentation/en/api-v0-methods-worker.md @@ -12,6 +12,8 @@ * [AddPiece](#AddPiece) * [Data](#Data) * [DataCid](#DataCid) +* [Download](#Download) + * [DownloadSectorData](#DownloadSectorData) * [Finalize](#Finalize) * [FinalizeReplicaUpdate](#FinalizeReplicaUpdate) * [FinalizeSector](#FinalizeSector) @@ -1542,6 +1544,46 @@ Response: } ``` +## Download + + +### DownloadSectorData + + +Perms: admin + +Inputs: +```json +[ + { + "ID": { + "Miner": 1000, + "Number": 9 + }, + "ProofType": 8 + }, + true, + { + "2": { + "Local": false, + "URL": "https://example.com/sealingservice/sectors/s-f0123-12345", + "Headers": null + } + } +] +``` + +Response: +```json +{ + "Sector": { + "Miner": 1000, + "Number": 9 + }, + "ID": "07070707-0707-0707-0707-070707070707" +} +``` + ## Finalize diff --git a/storage/pipeline/receive.go b/storage/pipeline/receive.go index 4a70932a5..b21829772 100644 --- a/storage/pipeline/receive.go +++ b/storage/pipeline/receive.go @@ -2,12 +2,13 @@ package sealing import ( "context" - "github.com/filecoin-project/go-statemachine" + "github.com/ipfs/go-datastore" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/lotus/api" ) @@ -125,6 +126,7 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta func (m *Sealing) handleReceiveSector(ctx statemachine.Context, sector SectorInfo) error { // todo fetch stuff - + // m.sealer.DownloadSectorData(ctx, m.minerSector(sector.SectorType, sector.SectorNumber), ) + panic("todo") return ctx.Send(SectorReceived{}) } diff --git a/storage/sealer/ffiwrapper/sealer_cgo.go b/storage/sealer/ffiwrapper/sealer_cgo.go index cf4eddff0..c1d557713 100644 --- a/storage/sealer/ffiwrapper/sealer_cgo.go +++ b/storage/sealer/ffiwrapper/sealer_cgo.go @@ -1127,6 +1127,10 @@ func (sb *Sealer) Remove(ctx context.Context, sector storiface.SectorRef) error return xerrors.Errorf("not supported at this layer") // happens in localworker } +func (sb *Sealer) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error { + panic("todo") +} + func GetRequiredPadding(oldLength abi.PaddedPieceSize, newPieceLength abi.PaddedPieceSize) ([]abi.PaddedPieceSize, abi.PaddedPieceSize) { padPieces := make([]abi.PaddedPieceSize, 0) diff --git a/storage/sealer/manager.go b/storage/sealer/manager.go index 6733674ff..78c0622f4 100644 --- a/storage/sealer/manager.go +++ b/storage/sealer/manager.go @@ -5,6 +5,7 @@ import ( "errors" "io" "net/http" + "sort" "sync" "github.com/google/uuid" @@ -1084,6 +1085,78 @@ func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storiface.Sect return out, waitErr } +func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var toFetch storiface.SectorFileType + + // get a sorted list of sectors files to make a consistent work key from + ents := make([]struct { + T storiface.SectorFileType + S storiface.SectorData + }, 0, len(src)) + for fileType, data := range src { + if len(fileType.AllSet()) != 1 { + return xerrors.Errorf("sector data entry must be for a single file type") + } + + toFetch |= fileType + + ents = append(ents, struct { + T storiface.SectorFileType + S storiface.SectorData + }{T: fileType, S: data}) + } + sort.Slice(ents, func(i, j int) bool { + return ents[i].T < ents[j].T + }) + + // get a work key + wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTDownloadSector, sector, ents) + if err != nil { + return xerrors.Errorf("getWork: %w", err) + } + defer cancel() + + var waitErr error + waitRes := func() { + _, werr := m.waitWork(ctx, wk) + if werr != nil { + waitErr = werr + return + } + } + + if wait { // already in progress + waitRes() + return waitErr + } + + ptype := storiface.PathSealing + if finalized { + ptype = storiface.PathStorage + } + + selector := newAllocSelector(m.index, toFetch, ptype) + + err = m.sched.Schedule(ctx, sector, sealtasks.TTDownloadSector, selector, schedNop, func(ctx context.Context, w Worker) error { + err := m.startWork(ctx, w, wk)(w.DownloadSectorData(ctx, sector, finalized, src)) + if err != nil { + return err + } + + waitRes() + return nil + }) + + if err != nil { + return err + } + + return waitErr +} + func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { return m.returnResult(ctx, callID, pi, err) } @@ -1148,6 +1221,10 @@ func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, return m.returnResult(ctx, callID, ok, err) } +func (m *Manager) ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { + return m.returnResult(ctx, callID, nil, err) +} + func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return m.returnResult(ctx, callID, nil, err) } diff --git a/storage/sealer/sealtasks/task.go b/storage/sealer/sealtasks/task.go index 53aa1cbc7..1a06f7d34 100644 --- a/storage/sealer/sealtasks/task.go +++ b/storage/sealer/sealtasks/task.go @@ -31,6 +31,8 @@ const ( TTRegenSectorKey TaskType = "seal/v0/regensectorkey" TTFinalizeReplicaUpdate TaskType = "seal/v0/finalize/replicaupdate" + TTDownloadSector TaskType = "seal/v0/download/sector" + TTGenerateWindowPoSt TaskType = "post/v0/windowproof" TTGenerateWinningPoSt TaskType = "post/v0/winningproof" ) @@ -48,11 +50,12 @@ var order = map[TaskType]int{ TTCommit1: 2, TTUnseal: 1, - TTFetch: -1, - TTFinalize: -2, + TTFetch: -1, + TTDownloadSector: -2, + TTFinalize: -3, - TTGenerateWindowPoSt: -3, - TTGenerateWinningPoSt: -4, // most priority + TTGenerateWindowPoSt: -4, + TTGenerateWinningPoSt: -5, // most priority } var shortNames = map[TaskType]string{ @@ -75,6 +78,8 @@ var shortNames = map[TaskType]string{ TTRegenSectorKey: "GSK", TTFinalizeReplicaUpdate: "FRU", + TTDownloadSector: "DL", + TTGenerateWindowPoSt: "WDP", TTGenerateWinningPoSt: "WNP", } diff --git a/storage/sealer/storiface/storage.go b/storage/sealer/storiface/storage.go index 2271c95b0..4248cd71e 100644 --- a/storage/sealer/storiface/storage.go +++ b/storage/sealer/storiface/storage.go @@ -3,6 +3,7 @@ package storiface import ( "context" "io" + "net/http" "github.com/ipfs/go-cid" @@ -85,6 +86,8 @@ type Sealer interface { GenerateSectorKeyFromData(ctx context.Context, sector SectorRef, unsealed cid.Cid) error FinalizeReplicaUpdate(ctx context.Context, sector SectorRef, keepUnsealed []Range) error + + DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorData) error } type Unsealer interface { @@ -119,3 +122,20 @@ type Prover interface { AggregateSealProofs(aggregateInfo proof.AggregateSealVerifyProofAndInfos, proofs [][]byte) ([]byte, error) } + +type SectorData struct { + // Local when set to true indicates to lotus that sector data is already + // available locally; When set lotus will skip fetching sector data, and + // only check that sector data exists in sector storage + Local bool + + // URL to the sector data + // For sealed/unsealed sector, lotus expects octet-stream + // For cache, lotus expects a tar archive with cache files (todo maybe use not-tar; specify what files with what paths must be present) + // Valid schemas: + // - http:// / https:// + URL string + + // optional http headers to use when requesting sector data + Headers http.Header +} diff --git a/storage/sealer/storiface/worker.go b/storage/sealer/storiface/worker.go index 5d2781ec8..11a4d265d 100644 --- a/storage/sealer/storiface/worker.go +++ b/storage/sealer/storiface/worker.go @@ -134,6 +134,7 @@ type WorkerCalls interface { MoveStorage(ctx context.Context, sector SectorRef, types SectorFileType) (CallID, error) UnsealPiece(context.Context, SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error) Fetch(context.Context, SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error) + DownloadSectorData(ctx context.Context, sector SectorRef, finalized bool, src map[SectorFileType]SectorData) (CallID, error) // sync GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) @@ -215,5 +216,6 @@ type WorkerReturn interface { ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error + ReturnDownloadSector(ctx context.Context, callID CallID, err *CallError) error ReturnFetch(ctx context.Context, callID CallID, err *CallError) error } diff --git a/storage/sealer/worker_local.go b/storage/sealer/worker_local.go index 1311f2f2c..2e8846ab1 100644 --- a/storage/sealer/worker_local.go +++ b/storage/sealer/worker_local.go @@ -202,6 +202,7 @@ const ( ReleaseUnsealed ReturnType = "ReleaseUnsealed" MoveStorage ReturnType = "MoveStorage" UnsealPiece ReturnType = "UnsealPiece" + DownloadSector ReturnType = "DownloadSector" Fetch ReturnType = "Fetch" ) @@ -255,6 +256,7 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac FinalizeReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnFinalizeReplicaUpdate), MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage), UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece), + DownloadSector: rfunc(storiface.WorkerReturn.ReturnDownloadSector), Fetch: rfunc(storiface.WorkerReturn.ReturnFetch), } @@ -586,6 +588,17 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRe }) } +func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorData) (storiface.CallID, error) { + sb, err := l.executor() + if err != nil { + return storiface.UndefCall, err + } + + return l.asyncCall(ctx, sector, DownloadSector, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { + return nil, sb.DownloadSectorData(ctx, sector, finalized, src) + }) +} + func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error) { sb, err := l.executor() if err != nil {