diff --git a/api/api_storage.go b/api/api_storage.go index 0411af537..a86e0e210 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -124,6 +124,7 @@ type StorageMiner interface { WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) //perm:admin //storiface.WorkerReturn + ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error //perm:admin retry:true ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error //perm:admin retry:true diff --git a/api/api_worker.go b/api/api_worker.go index 0c4fb3d14..cd4cde151 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -34,6 +34,7 @@ type Worker interface { Info(context.Context) (storiface.WorkerInfo, error) //perm:admin // storiface.WorkerCalls + DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index b5bc36ade..4e07ab54c 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -743,6 +743,8 @@ type StorageMinerStruct struct { ReturnAddPiece func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"` + ReturnDataCid func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"` + ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"` ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"` @@ -894,6 +896,8 @@ type WorkerStruct struct { Internal struct { AddPiece func(p0 context.Context, p1 storage.SectorRef, p2 []abi.UnpaddedPieceSize, p3 abi.UnpaddedPieceSize, p4 storage.Data) (storiface.CallID, error) `perm:"admin"` + DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) `perm:"admin"` + Enabled func(p0 context.Context) (bool, error) `perm:"admin"` Fetch func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"` @@ -4422,6 +4426,17 @@ func (s *StorageMinerStub) ReturnAddPiece(p0 context.Context, p1 storiface.CallI return ErrNotSupported } +func (s *StorageMinerStruct) ReturnDataCid(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error { + if s.Internal.ReturnDataCid == nil { + return ErrNotSupported + } + return s.Internal.ReturnDataCid(p0, p1, p2, p3) +} + +func (s *StorageMinerStub) ReturnDataCid(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) ReturnFetch(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error { if s.Internal.ReturnFetch == nil { return ErrNotSupported @@ -5159,6 +5174,17 @@ func (s *WorkerStub) AddPiece(p0 context.Context, p1 storage.SectorRef, p2 []abi return *new(storiface.CallID), ErrNotSupported } +func (s *WorkerStruct) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) { + if s.Internal.DataCid == nil { + return *new(storiface.CallID), ErrNotSupported + } + return s.Internal.DataCid(p0, p1, p2) +} + +func (s *WorkerStub) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) { + return *new(storiface.CallID), ErrNotSupported +} + func (s *WorkerStruct) Enabled(p0 context.Context) (bool, error) { if s.Internal.Enabled == nil { return false, ErrNotSupported diff --git a/build/builtin-actors/builtin-actors-v7.car b/build/builtin-actors/builtin-actors-v7.car new file mode 100644 index 000000000..e69de29bb diff --git a/build/builtin-actors/builtin-actors-v8.car b/build/builtin-actors/builtin-actors-v8.car new file mode 100644 index 000000000..1193b93b3 Binary files /dev/null and b/build/builtin-actors/builtin-actors-v8.car differ diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 1eaaf6a84..574109472 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 7b1191138..a0ce8af64 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 554082fd6..2e28ed734 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-worker/main.go b/cmd/lotus-worker/main.go index 12c5f8dc8..c341f52c2 100644 --- a/cmd/lotus-worker/main.go +++ b/cmd/lotus-worker/main.go @@ -313,7 +313,7 @@ var runCmd = &cli.Command{ } if (workerType == sealtasks.WorkerSealing || cctx.IsSet("addpiece")) && cctx.Bool("addpiece") { - taskTypes = append(taskTypes, sealtasks.TTAddPiece) + taskTypes = append(taskTypes, sealtasks.TTAddPiece, sealtasks.TTDataCid) } if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit1")) && cctx.Bool("precommit1") { taskTypes = append(taskTypes, sealtasks.TTPreCommit1) diff --git a/cmd/lotus-worker/tasks.go b/cmd/lotus-worker/tasks.go index 52133d09d..880381fd2 100644 --- a/cmd/lotus-worker/tasks.go +++ b/cmd/lotus-worker/tasks.go @@ -23,6 +23,7 @@ var tasksCmd = &cli.Command{ var allowSetting = map[sealtasks.TaskType]struct{}{ sealtasks.TTAddPiece: {}, + sealtasks.TTDataCid: {}, sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}, sealtasks.TTCommit2: {}, diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 64d09971b..2357058de 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -105,6 +105,7 @@ * [PledgeSector](#PledgeSector) * [Return](#Return) * [ReturnAddPiece](#ReturnAddPiece) + * [ReturnDataCid](#ReturnDataCid) * [ReturnFetch](#ReturnFetch) * [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate) * [ReturnFinalizeSector](#ReturnFinalizeSector) @@ -2195,6 +2196,36 @@ Response: ### ReturnAddPiece + + +Perms: admin + +Inputs: +```json +[ + { + "Sector": { + "Miner": 1000, + "Number": 9 + }, + "ID": "07070707-0707-0707-0707-070707070707" + }, + { + "Size": 1032, + "PieceCID": { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } + }, + { + "Code": 0, + "Message": "string value" + } +] +``` + +Response: `{}` + +### ReturnDataCid storiface.WorkerReturn @@ -4020,6 +4051,88 @@ Response: "BaseMinMemory": 68719476736 } }, + "seal/v0/datacid": { + "0": { + "MinMemory": 2048, + "MaxMemory": 2048, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 2048 + }, + "1": { + "MinMemory": 8388608, + "MaxMemory": 8388608, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 8388608 + }, + "2": { + "MinMemory": 1073741824, + "MaxMemory": 1073741824, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "3": { + "MinMemory": 4294967296, + "MaxMemory": 4294967296, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "4": { + "MinMemory": 8589934592, + "MaxMemory": 8589934592, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "5": { + "MinMemory": 2048, + "MaxMemory": 2048, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 2048 + }, + "6": { + "MinMemory": 8388608, + "MaxMemory": 8388608, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 8388608 + }, + "7": { + "MinMemory": 1073741824, + "MaxMemory": 1073741824, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "8": { + "MinMemory": 4294967296, + "MaxMemory": 4294967296, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "9": { + "MinMemory": 8589934592, + "MaxMemory": 8589934592, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + } + }, "seal/v0/fetch": { "0": { "MinMemory": 1048576, diff --git a/documentation/en/api-v0-methods-worker.md b/documentation/en/api-v0-methods-worker.md index 4a09e5301..57a371008 100644 --- a/documentation/en/api-v0-methods-worker.md +++ b/documentation/en/api-v0-methods-worker.md @@ -9,6 +9,8 @@ * [Version](#Version) * [Add](#Add) * [AddPiece](#AddPiece) +* [Data](#Data) + * [DataCid](#DataCid) * [Finalize](#Finalize) * [FinalizeReplicaUpdate](#FinalizeReplicaUpdate) * [FinalizeSector](#FinalizeSector) @@ -520,6 +522,88 @@ Response: "BaseMinMemory": 68719476736 } }, + "seal/v0/datacid": { + "0": { + "MinMemory": 2048, + "MaxMemory": 2048, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 2048 + }, + "1": { + "MinMemory": 8388608, + "MaxMemory": 8388608, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 8388608 + }, + "2": { + "MinMemory": 1073741824, + "MaxMemory": 1073741824, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "3": { + "MinMemory": 4294967296, + "MaxMemory": 4294967296, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "4": { + "MinMemory": 8589934592, + "MaxMemory": 8589934592, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "5": { + "MinMemory": 2048, + "MaxMemory": 2048, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 2048 + }, + "6": { + "MinMemory": 8388608, + "MaxMemory": 8388608, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 8388608 + }, + "7": { + "MinMemory": 1073741824, + "MaxMemory": 1073741824, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "8": { + "MinMemory": 4294967296, + "MaxMemory": 4294967296, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + }, + "9": { + "MinMemory": 8589934592, + "MaxMemory": 8589934592, + "GPUUtilization": 0, + "MaxParallelism": 1, + "MaxParallelismGPU": 0, + "BaseMinMemory": 1073741824 + } + }, "seal/v0/fetch": { "0": { "MinMemory": 1048576, @@ -1242,7 +1326,6 @@ Response: `131584` ### AddPiece -storiface.WorkerCalls Perms: admin @@ -1276,6 +1359,34 @@ Response: } ``` +## Data + + +### DataCid +storiface.WorkerCalls + + +Perms: admin + +Inputs: +```json +[ + 1024, + {} +] +``` + +Response: +```json +{ + "Sector": { + "Miner": 1000, + "Number": 9 + }, + "ID": "07070707-0707-0707-0707-070707070707" +} +``` + ## Finalize diff --git a/documentation/en/cli-lotus-worker.md b/documentation/en/cli-lotus-worker.md index 9212d59cf..68d0d45c2 100644 --- a/documentation/en/cli-lotus-worker.md +++ b/documentation/en/cli-lotus-worker.md @@ -173,7 +173,7 @@ NAME: lotus-worker tasks enable - Enable a task type USAGE: - lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK] + lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|DC|GSK] OPTIONS: --help, -h show help (default: false) @@ -186,7 +186,7 @@ NAME: lotus-worker tasks disable - Disable a task type USAGE: - lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK] + lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|DC|GSK] OPTIONS: --help, -h show help (default: false) diff --git a/extern/sector-storage/ffiwrapper/sealer_cgo.go b/extern/sector-storage/ffiwrapper/sealer_cgo.go index 3f596d250..816a025e9 100644 --- a/extern/sector-storage/ffiwrapper/sealer_cgo.go +++ b/extern/sector-storage/ffiwrapper/sealer_cgo.go @@ -51,6 +51,128 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error return nil } +func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + // TODO: allow tuning those: + chunk := abi.PaddedPieceSize(4 << 20) + parallel := runtime.NumCPU() + + maxSizeSpt := abi.RegisteredSealProof_StackedDrg64GiBV1_1 + + var done func() + + defer func() { + if done != nil { + done() + } + }() + + throttle := make(chan []byte, parallel) + piecePromises := make([]func() (abi.PieceInfo, error), 0) + + buf := make([]byte, chunk.Unpadded()) + for i := 0; i < parallel; i++ { + if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize { + break // won't use this many buffers + } + throttle <- make([]byte, chunk.Unpadded()) + } + + for { + var read int + for rbuf := buf; len(rbuf) > 0; { + n, err := pieceData.Read(rbuf) + if err != nil && err != io.EOF { + return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err) + } + + rbuf = rbuf[n:] + read += n + + if err == io.EOF { + break + } + } + if read == 0 { + break + } + + done := make(chan struct { + cid.Cid + error + }, 1) + pbuf := <-throttle + copy(pbuf, buf[:read]) + + go func(read int) { + defer func() { + throttle <- pbuf + }() + + c, err := sb.pieceCid(maxSizeSpt, pbuf[:read]) + done <- struct { + cid.Cid + error + }{c, err} + }(read) + + piecePromises = append(piecePromises, func() (abi.PieceInfo, error) { + select { + case e := <-done: + if e.error != nil { + return abi.PieceInfo{}, e.error + } + + return abi.PieceInfo{ + Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(), + PieceCID: e.Cid, + }, nil + case <-ctx.Done(): + return abi.PieceInfo{}, ctx.Err() + } + }) + } + + if len(piecePromises) == 1 { + return piecePromises[0]() + } + + var payloadRoundedBytes abi.PaddedPieceSize + pieceCids := make([]abi.PieceInfo, len(piecePromises)) + for i, promise := range piecePromises { + pinfo, err := promise() + if err != nil { + return abi.PieceInfo{}, err + } + + pieceCids[i] = pinfo + payloadRoundedBytes += pinfo.Size + } + + pieceCID, err := ffi.GenerateUnsealedCID(maxSizeSpt, pieceCids) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err) + } + + // validate that the pieceCID was properly formed + if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil { + return abi.PieceInfo{}, err + } + + if payloadRoundedBytes < pieceSize.Padded() { + paddedCid, err := commpffi.ZeroPadPieceCommitment(pieceCID, payloadRoundedBytes.Unpadded(), pieceSize) + if err != nil { + return abi.PieceInfo{}, xerrors.Errorf("failed to pad data: %w", err) + } + + pieceCID = paddedCid + } + + return abi.PieceInfo{ + Size: pieceSize.Padded(), + PieceCID: pieceCID, + }, nil +} + func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) { // TODO: allow tuning those: chunk := abi.PaddedPieceSize(4 << 20) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 8bea96cca..4b52f9a1d 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -165,7 +165,7 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores. sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate, } if sc.AllowAddPiece { - localTasks = append(localTasks, sealtasks.TTAddPiece) + localTasks = append(localTasks, sealtasks.TTAddPiece, sealtasks.TTDataCid) } if sc.AllowPreCommit1 { localTasks = append(localTasks, sealtasks.TTPreCommit1) @@ -327,6 +327,27 @@ func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error return nil } +func (m *Manager) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + selector := newTaskSelector() + + var out abi.PieceInfo + err := m.sched.Schedule(ctx, storage.NoSectorRef, sealtasks.TTDataCid, selector, schedNop, func(ctx context.Context, w Worker) error { + p, err := m.waitSimpleCall(ctx)(w.DataCid(ctx, pieceSize, pieceData)) + if err != nil { + return err + } + if p != nil { + out = p.(abi.PieceInfo) + } + return nil + }) + + return out, err +} + func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -975,6 +996,10 @@ func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storage.Sector return out, waitErr } +func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { + return m.returnResult(ctx, callID, pi, err) +} + func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { return m.returnResult(ctx, callID, pi, err) } diff --git a/extern/sector-storage/sealtasks/task.go b/extern/sector-storage/sealtasks/task.go index 1d3d3c1b5..e8a156291 100644 --- a/extern/sector-storage/sealtasks/task.go +++ b/extern/sector-storage/sealtasks/task.go @@ -3,6 +3,7 @@ package sealtasks type TaskType string const ( + TTDataCid TaskType = "seal/v0/datacid" TTAddPiece TaskType = "seal/v0/addpiece" TTPreCommit1 TaskType = "seal/v0/precommit/1" TTPreCommit2 TaskType = "seal/v0/precommit/2" @@ -25,7 +26,8 @@ const ( ) var order = map[TaskType]int{ - TTRegenSectorKey: 10, // least priority + TTRegenSectorKey: 11, // least priority + TTDataCid: 10, TTAddPiece: 9, TTReplicaUpdate: 8, TTProveReplicaUpdate2: 7, @@ -44,6 +46,7 @@ var order = map[TaskType]int{ } var shortNames = map[TaskType]string{ + TTDataCid: "DC", TTAddPiece: "AP", TTPreCommit1: "PC1", diff --git a/extern/sector-storage/storiface/resources.go b/extern/sector-storage/storiface/resources.go index ce533e2c0..71fd9e30c 100644 --- a/extern/sector-storage/storiface/resources.go +++ b/extern/sector-storage/storiface/resources.go @@ -569,6 +569,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources func init() { ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately ResourceTable[sealtasks.TTRegenSectorKey] = ResourceTable[sealtasks.TTReplicaUpdate] + ResourceTable[sealtasks.TTDataCid] = ResourceTable[sealtasks.TTAddPiece] // V1_1 is the same as V1 for _, m := range ResourceTable { diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index e37df31b5..5b4fabf02 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -117,6 +117,7 @@ var UndefCall CallID type WorkerCalls interface { // async + DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error) @@ -197,6 +198,7 @@ func Err(code ErrorCode, sub error) *CallError { } type WorkerReturn interface { + ReturnDataCid(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 46464caf6..9a14e42b5 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -180,6 +180,7 @@ func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) { type ReturnType string const ( + DataCid ReturnType = "DataCid" AddPiece ReturnType = "AddPiece" SealPreCommit1 ReturnType = "SealPreCommit1" SealPreCommit2 ReturnType = "SealPreCommit2" @@ -232,6 +233,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor } var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{ + DataCid: rfunc(storiface.WorkerReturn.ReturnDataCid), AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece), SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1), SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2), @@ -341,6 +343,17 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) e return sb.NewSector(ctx, sector) } +func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { + sb, err := l.executor() + if err != nil { + return storiface.UndefCall, err + } + + return l.asyncCall(ctx, storage.NoSectorRef, DataCid, func(ctx context.Context, ci storiface.CallID) (interface{}, error) { + return sb.DataCid(ctx, pieceSize, pieceData) + }) +} + func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) { sb, err := l.executor() if err != nil { diff --git a/extern/sector-storage/worker_tracked.go b/extern/sector-storage/worker_tracked.go index 1d92579a5..e3ce0a46a 100644 --- a/extern/sector-storage/worker_tracked.go +++ b/extern/sector-storage/worker_tracked.go @@ -186,6 +186,12 @@ func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.Secto return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) }) } +func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { + return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, storage.NoSectorRef, sealtasks.TTDataCid, func() (storiface.CallID, error) { + return t.Worker.DataCid(ctx, pieceSize, pieceData) + }) +} + func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) { return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) diff --git a/go.mod b/go.mod index f5b141d09..8cc4f32d1 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/filecoin-project/specs-actors/v5 v5.0.4 github.com/filecoin-project/specs-actors/v6 v6.0.1 github.com/filecoin-project/specs-actors/v7 v7.0.0 - github.com/filecoin-project/specs-storage v0.2.2 + github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f github.com/filecoin-project/test-vectors/schema v0.0.5 github.com/gbrlsnchs/jwt/v3 v3.0.1 github.com/gdamore/tcell/v2 v2.2.0 diff --git a/go.sum b/go.sum index 5cc8b4693..9ec39ebc3 100644 --- a/go.sum +++ b/go.sum @@ -394,8 +394,8 @@ github.com/filecoin-project/specs-actors/v6 v6.0.1/go.mod h1:V1AYfi5GkHXipx1mnVi github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1.0.20220118005651-2470cb39827e/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M= github.com/filecoin-project/specs-actors/v7 v7.0.0 h1:FQN7tjt3o68hfb3qLFSJBoLMuOFY0REkFVLO/zXj8RU= github.com/filecoin-project/specs-actors/v7 v7.0.0/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M= -github.com/filecoin-project/specs-storage v0.2.2 h1:6ugbtKQ6LTcTEnEIX9HkeCtTp1PCYO497P/bokF5tF4= -github.com/filecoin-project/specs-storage v0.2.2/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo= +github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f h1:+suJFu4RJt7aZRXvE+Innrpacap+Z8N87y6a1Cgkuqc= +github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo= github.com/filecoin-project/storetheindex v0.3.5 h1:KoS9TvjPm6zIZfUH8atAHJbVHOO7GTP1MdTG+v0eE+Q= github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI= github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=