sealing: DataCid on workers
This commit is contained in:
parent
48ffb15901
commit
1aed631fe5
@ -124,6 +124,7 @@ type StorageMiner interface {
|
||||
WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) //perm:admin
|
||||
|
||||
//storiface.WorkerReturn
|
||||
ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error //perm:admin retry:true
|
||||
|
@ -34,6 +34,7 @@ type Worker interface {
|
||||
Info(context.Context) (storiface.WorkerInfo, error) //perm:admin
|
||||
|
||||
// storiface.WorkerCalls
|
||||
DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
|
||||
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
|
||||
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
|
||||
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin
|
||||
|
@ -743,6 +743,8 @@ type StorageMinerStruct struct {
|
||||
|
||||
ReturnAddPiece func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnDataCid func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
|
||||
@ -894,6 +896,8 @@ type WorkerStruct struct {
|
||||
Internal struct {
|
||||
AddPiece func(p0 context.Context, p1 storage.SectorRef, p2 []abi.UnpaddedPieceSize, p3 abi.UnpaddedPieceSize, p4 storage.Data) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
Enabled func(p0 context.Context) (bool, error) `perm:"admin"`
|
||||
|
||||
Fetch func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
|
||||
@ -4422,6 +4426,17 @@ func (s *StorageMinerStub) ReturnAddPiece(p0 context.Context, p1 storiface.CallI
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnDataCid(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error {
|
||||
if s.Internal.ReturnDataCid == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ReturnDataCid(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) ReturnDataCid(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnFetch(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
|
||||
if s.Internal.ReturnFetch == nil {
|
||||
return ErrNotSupported
|
||||
@ -5159,6 +5174,17 @@ func (s *WorkerStub) AddPiece(p0 context.Context, p1 storage.SectorRef, p2 []abi
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) {
|
||||
if s.Internal.DataCid == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
return s.Internal.DataCid(p0, p1, p2)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) Enabled(p0 context.Context) (bool, error) {
|
||||
if s.Internal.Enabled == nil {
|
||||
return false, ErrNotSupported
|
||||
|
0
build/builtin-actors/builtin-actors-v7.car
Normal file
0
build/builtin-actors/builtin-actors-v7.car
Normal file
BIN
build/builtin-actors/builtin-actors-v8.car
Normal file
BIN
build/builtin-actors/builtin-actors-v8.car
Normal file
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -313,7 +313,7 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
|
||||
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("addpiece")) && cctx.Bool("addpiece") {
|
||||
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
|
||||
taskTypes = append(taskTypes, sealtasks.TTAddPiece, sealtasks.TTDataCid)
|
||||
}
|
||||
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit1")) && cctx.Bool("precommit1") {
|
||||
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
|
||||
|
@ -23,6 +23,7 @@ var tasksCmd = &cli.Command{
|
||||
|
||||
var allowSetting = map[sealtasks.TaskType]struct{}{
|
||||
sealtasks.TTAddPiece: {},
|
||||
sealtasks.TTDataCid: {},
|
||||
sealtasks.TTPreCommit1: {},
|
||||
sealtasks.TTPreCommit2: {},
|
||||
sealtasks.TTCommit2: {},
|
||||
|
@ -105,6 +105,7 @@
|
||||
* [PledgeSector](#PledgeSector)
|
||||
* [Return](#Return)
|
||||
* [ReturnAddPiece](#ReturnAddPiece)
|
||||
* [ReturnDataCid](#ReturnDataCid)
|
||||
* [ReturnFetch](#ReturnFetch)
|
||||
* [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate)
|
||||
* [ReturnFinalizeSector](#ReturnFinalizeSector)
|
||||
@ -2195,6 +2196,36 @@ Response:
|
||||
|
||||
|
||||
### ReturnAddPiece
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
},
|
||||
{
|
||||
"Size": 1032,
|
||||
"PieceCID": {
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Code": 0,
|
||||
"Message": "string value"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ReturnDataCid
|
||||
storiface.WorkerReturn
|
||||
|
||||
|
||||
@ -4020,6 +4051,88 @@ Response:
|
||||
"BaseMinMemory": 68719476736
|
||||
}
|
||||
},
|
||||
"seal/v0/datacid": {
|
||||
"0": {
|
||||
"MinMemory": 2048,
|
||||
"MaxMemory": 2048,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 2048
|
||||
},
|
||||
"1": {
|
||||
"MinMemory": 8388608,
|
||||
"MaxMemory": 8388608,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 8388608
|
||||
},
|
||||
"2": {
|
||||
"MinMemory": 1073741824,
|
||||
"MaxMemory": 1073741824,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"3": {
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"4": {
|
||||
"MinMemory": 8589934592,
|
||||
"MaxMemory": 8589934592,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"5": {
|
||||
"MinMemory": 2048,
|
||||
"MaxMemory": 2048,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 2048
|
||||
},
|
||||
"6": {
|
||||
"MinMemory": 8388608,
|
||||
"MaxMemory": 8388608,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 8388608
|
||||
},
|
||||
"7": {
|
||||
"MinMemory": 1073741824,
|
||||
"MaxMemory": 1073741824,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"8": {
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"9": {
|
||||
"MinMemory": 8589934592,
|
||||
"MaxMemory": 8589934592,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
}
|
||||
},
|
||||
"seal/v0/fetch": {
|
||||
"0": {
|
||||
"MinMemory": 1048576,
|
||||
|
@ -9,6 +9,8 @@
|
||||
* [Version](#Version)
|
||||
* [Add](#Add)
|
||||
* [AddPiece](#AddPiece)
|
||||
* [Data](#Data)
|
||||
* [DataCid](#DataCid)
|
||||
* [Finalize](#Finalize)
|
||||
* [FinalizeReplicaUpdate](#FinalizeReplicaUpdate)
|
||||
* [FinalizeSector](#FinalizeSector)
|
||||
@ -520,6 +522,88 @@ Response:
|
||||
"BaseMinMemory": 68719476736
|
||||
}
|
||||
},
|
||||
"seal/v0/datacid": {
|
||||
"0": {
|
||||
"MinMemory": 2048,
|
||||
"MaxMemory": 2048,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 2048
|
||||
},
|
||||
"1": {
|
||||
"MinMemory": 8388608,
|
||||
"MaxMemory": 8388608,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 8388608
|
||||
},
|
||||
"2": {
|
||||
"MinMemory": 1073741824,
|
||||
"MaxMemory": 1073741824,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"3": {
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"4": {
|
||||
"MinMemory": 8589934592,
|
||||
"MaxMemory": 8589934592,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"5": {
|
||||
"MinMemory": 2048,
|
||||
"MaxMemory": 2048,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 2048
|
||||
},
|
||||
"6": {
|
||||
"MinMemory": 8388608,
|
||||
"MaxMemory": 8388608,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 8388608
|
||||
},
|
||||
"7": {
|
||||
"MinMemory": 1073741824,
|
||||
"MaxMemory": 1073741824,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"8": {
|
||||
"MinMemory": 4294967296,
|
||||
"MaxMemory": 4294967296,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
},
|
||||
"9": {
|
||||
"MinMemory": 8589934592,
|
||||
"MaxMemory": 8589934592,
|
||||
"GPUUtilization": 0,
|
||||
"MaxParallelism": 1,
|
||||
"MaxParallelismGPU": 0,
|
||||
"BaseMinMemory": 1073741824
|
||||
}
|
||||
},
|
||||
"seal/v0/fetch": {
|
||||
"0": {
|
||||
"MinMemory": 1048576,
|
||||
@ -1242,7 +1326,6 @@ Response: `131584`
|
||||
|
||||
|
||||
### AddPiece
|
||||
storiface.WorkerCalls
|
||||
|
||||
|
||||
Perms: admin
|
||||
@ -1276,6 +1359,34 @@ Response:
|
||||
}
|
||||
```
|
||||
|
||||
## Data
|
||||
|
||||
|
||||
### DataCid
|
||||
storiface.WorkerCalls
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
1024,
|
||||
{}
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
```
|
||||
|
||||
## Finalize
|
||||
|
||||
|
||||
|
@ -173,7 +173,7 @@ NAME:
|
||||
lotus-worker tasks enable - Enable a task type
|
||||
|
||||
USAGE:
|
||||
lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK]
|
||||
lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|DC|GSK]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
@ -186,7 +186,7 @@ NAME:
|
||||
lotus-worker tasks disable - Disable a task type
|
||||
|
||||
USAGE:
|
||||
lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK]
|
||||
lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|DC|GSK]
|
||||
|
||||
OPTIONS:
|
||||
--help, -h show help (default: false)
|
||||
|
122
extern/sector-storage/ffiwrapper/sealer_cgo.go
vendored
122
extern/sector-storage/ffiwrapper/sealer_cgo.go
vendored
@ -51,6 +51,128 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
|
||||
// TODO: allow tuning those:
|
||||
chunk := abi.PaddedPieceSize(4 << 20)
|
||||
parallel := runtime.NumCPU()
|
||||
|
||||
maxSizeSpt := abi.RegisteredSealProof_StackedDrg64GiBV1_1
|
||||
|
||||
var done func()
|
||||
|
||||
defer func() {
|
||||
if done != nil {
|
||||
done()
|
||||
}
|
||||
}()
|
||||
|
||||
throttle := make(chan []byte, parallel)
|
||||
piecePromises := make([]func() (abi.PieceInfo, error), 0)
|
||||
|
||||
buf := make([]byte, chunk.Unpadded())
|
||||
for i := 0; i < parallel; i++ {
|
||||
if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize {
|
||||
break // won't use this many buffers
|
||||
}
|
||||
throttle <- make([]byte, chunk.Unpadded())
|
||||
}
|
||||
|
||||
for {
|
||||
var read int
|
||||
for rbuf := buf; len(rbuf) > 0; {
|
||||
n, err := pieceData.Read(rbuf)
|
||||
if err != nil && err != io.EOF {
|
||||
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
|
||||
}
|
||||
|
||||
rbuf = rbuf[n:]
|
||||
read += n
|
||||
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
if read == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
done := make(chan struct {
|
||||
cid.Cid
|
||||
error
|
||||
}, 1)
|
||||
pbuf := <-throttle
|
||||
copy(pbuf, buf[:read])
|
||||
|
||||
go func(read int) {
|
||||
defer func() {
|
||||
throttle <- pbuf
|
||||
}()
|
||||
|
||||
c, err := sb.pieceCid(maxSizeSpt, pbuf[:read])
|
||||
done <- struct {
|
||||
cid.Cid
|
||||
error
|
||||
}{c, err}
|
||||
}(read)
|
||||
|
||||
piecePromises = append(piecePromises, func() (abi.PieceInfo, error) {
|
||||
select {
|
||||
case e := <-done:
|
||||
if e.error != nil {
|
||||
return abi.PieceInfo{}, e.error
|
||||
}
|
||||
|
||||
return abi.PieceInfo{
|
||||
Size: abi.UnpaddedPieceSize(len(buf[:read])).Padded(),
|
||||
PieceCID: e.Cid,
|
||||
}, nil
|
||||
case <-ctx.Done():
|
||||
return abi.PieceInfo{}, ctx.Err()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
if len(piecePromises) == 1 {
|
||||
return piecePromises[0]()
|
||||
}
|
||||
|
||||
var payloadRoundedBytes abi.PaddedPieceSize
|
||||
pieceCids := make([]abi.PieceInfo, len(piecePromises))
|
||||
for i, promise := range piecePromises {
|
||||
pinfo, err := promise()
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, err
|
||||
}
|
||||
|
||||
pieceCids[i] = pinfo
|
||||
payloadRoundedBytes += pinfo.Size
|
||||
}
|
||||
|
||||
pieceCID, err := ffi.GenerateUnsealedCID(maxSizeSpt, pieceCids)
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
|
||||
}
|
||||
|
||||
// validate that the pieceCID was properly formed
|
||||
if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil {
|
||||
return abi.PieceInfo{}, err
|
||||
}
|
||||
|
||||
if payloadRoundedBytes < pieceSize.Padded() {
|
||||
paddedCid, err := commpffi.ZeroPadPieceCommitment(pieceCID, payloadRoundedBytes.Unpadded(), pieceSize)
|
||||
if err != nil {
|
||||
return abi.PieceInfo{}, xerrors.Errorf("failed to pad data: %w", err)
|
||||
}
|
||||
|
||||
pieceCID = paddedCid
|
||||
}
|
||||
|
||||
return abi.PieceInfo{
|
||||
Size: pieceSize.Padded(),
|
||||
PieceCID: pieceCID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
|
||||
// TODO: allow tuning those:
|
||||
chunk := abi.PaddedPieceSize(4 << 20)
|
||||
|
27
extern/sector-storage/manager.go
vendored
27
extern/sector-storage/manager.go
vendored
@ -165,7 +165,7 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
|
||||
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate,
|
||||
}
|
||||
if sc.AllowAddPiece {
|
||||
localTasks = append(localTasks, sealtasks.TTAddPiece)
|
||||
localTasks = append(localTasks, sealtasks.TTAddPiece, sealtasks.TTDataCid)
|
||||
}
|
||||
if sc.AllowPreCommit1 {
|
||||
localTasks = append(localTasks, sealtasks.TTPreCommit1)
|
||||
@ -327,6 +327,27 @@ func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
selector := newTaskSelector()
|
||||
|
||||
var out abi.PieceInfo
|
||||
err := m.sched.Schedule(ctx, storage.NoSectorRef, sealtasks.TTDataCid, selector, schedNop, func(ctx context.Context, w Worker) error {
|
||||
p, err := m.waitSimpleCall(ctx)(w.DataCid(ctx, pieceSize, pieceData))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if p != nil {
|
||||
out = p.(abi.PieceInfo)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return out, err
|
||||
}
|
||||
|
||||
func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
@ -975,6 +996,10 @@ func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storage.Sector
|
||||
return out, waitErr
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, pi, err)
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, pi, err)
|
||||
}
|
||||
|
5
extern/sector-storage/sealtasks/task.go
vendored
5
extern/sector-storage/sealtasks/task.go
vendored
@ -3,6 +3,7 @@ package sealtasks
|
||||
type TaskType string
|
||||
|
||||
const (
|
||||
TTDataCid TaskType = "seal/v0/datacid"
|
||||
TTAddPiece TaskType = "seal/v0/addpiece"
|
||||
TTPreCommit1 TaskType = "seal/v0/precommit/1"
|
||||
TTPreCommit2 TaskType = "seal/v0/precommit/2"
|
||||
@ -25,7 +26,8 @@ const (
|
||||
)
|
||||
|
||||
var order = map[TaskType]int{
|
||||
TTRegenSectorKey: 10, // least priority
|
||||
TTRegenSectorKey: 11, // least priority
|
||||
TTDataCid: 10,
|
||||
TTAddPiece: 9,
|
||||
TTReplicaUpdate: 8,
|
||||
TTProveReplicaUpdate2: 7,
|
||||
@ -44,6 +46,7 @@ var order = map[TaskType]int{
|
||||
}
|
||||
|
||||
var shortNames = map[TaskType]string{
|
||||
TTDataCid: "DC",
|
||||
TTAddPiece: "AP",
|
||||
|
||||
TTPreCommit1: "PC1",
|
||||
|
1
extern/sector-storage/storiface/resources.go
vendored
1
extern/sector-storage/storiface/resources.go
vendored
@ -569,6 +569,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
|
||||
func init() {
|
||||
ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately
|
||||
ResourceTable[sealtasks.TTRegenSectorKey] = ResourceTable[sealtasks.TTReplicaUpdate]
|
||||
ResourceTable[sealtasks.TTDataCid] = ResourceTable[sealtasks.TTAddPiece]
|
||||
|
||||
// V1_1 is the same as V1
|
||||
for _, m := range ResourceTable {
|
||||
|
2
extern/sector-storage/storiface/worker.go
vendored
2
extern/sector-storage/storiface/worker.go
vendored
@ -117,6 +117,7 @@ var UndefCall CallID
|
||||
|
||||
type WorkerCalls interface {
|
||||
// async
|
||||
DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
|
||||
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
|
||||
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
|
||||
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error)
|
||||
@ -197,6 +198,7 @@ func Err(code ErrorCode, sub error) *CallError {
|
||||
}
|
||||
|
||||
type WorkerReturn interface {
|
||||
ReturnDataCid(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
|
||||
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
|
||||
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error
|
||||
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error
|
||||
|
13
extern/sector-storage/worker_local.go
vendored
13
extern/sector-storage/worker_local.go
vendored
@ -180,6 +180,7 @@ func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
|
||||
type ReturnType string
|
||||
|
||||
const (
|
||||
DataCid ReturnType = "DataCid"
|
||||
AddPiece ReturnType = "AddPiece"
|
||||
SealPreCommit1 ReturnType = "SealPreCommit1"
|
||||
SealPreCommit2 ReturnType = "SealPreCommit2"
|
||||
@ -232,6 +233,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
|
||||
}
|
||||
|
||||
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{
|
||||
DataCid: rfunc(storiface.WorkerReturn.ReturnDataCid),
|
||||
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
|
||||
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
|
||||
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
|
||||
@ -341,6 +343,17 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) e
|
||||
return sb.NewSector(ctx, sector)
|
||||
}
|
||||
|
||||
func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, storage.NoSectorRef, DataCid, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return sb.DataCid(ctx, pieceSize, pieceData)
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
|
6
extern/sector-storage/worker_tracked.go
vendored
6
extern/sector-storage/worker_tracked.go
vendored
@ -186,6 +186,12 @@ func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.Secto
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) })
|
||||
}
|
||||
|
||||
func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, storage.NoSectorRef, sealtasks.TTDataCid, func() (storiface.CallID, error) {
|
||||
return t.Worker.DataCid(ctx, pieceSize, pieceData)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) {
|
||||
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
|
||||
|
2
go.mod
2
go.mod
@ -53,7 +53,7 @@ require (
|
||||
github.com/filecoin-project/specs-actors/v5 v5.0.4
|
||||
github.com/filecoin-project/specs-actors/v6 v6.0.1
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0
|
||||
github.com/filecoin-project/specs-storage v0.2.2
|
||||
github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.5
|
||||
github.com/gbrlsnchs/jwt/v3 v3.0.1
|
||||
github.com/gdamore/tcell/v2 v2.2.0
|
||||
|
4
go.sum
4
go.sum
@ -394,8 +394,8 @@ github.com/filecoin-project/specs-actors/v6 v6.0.1/go.mod h1:V1AYfi5GkHXipx1mnVi
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1.0.20220118005651-2470cb39827e/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M=
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0 h1:FQN7tjt3o68hfb3qLFSJBoLMuOFY0REkFVLO/zXj8RU=
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M=
|
||||
github.com/filecoin-project/specs-storage v0.2.2 h1:6ugbtKQ6LTcTEnEIX9HkeCtTp1PCYO497P/bokF5tF4=
|
||||
github.com/filecoin-project/specs-storage v0.2.2/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo=
|
||||
github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f h1:+suJFu4RJt7aZRXvE+Innrpacap+Z8N87y6a1Cgkuqc=
|
||||
github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo=
|
||||
github.com/filecoin-project/storetheindex v0.3.5 h1:KoS9TvjPm6zIZfUH8atAHJbVHOO7GTP1MdTG+v0eE+Q=
|
||||
github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI=
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=
|
||||
|
Loading…
Reference in New Issue
Block a user