Merge pull request #8557 from filecoin-project/feat/worker-commp

sealing: DataCid on workers
This commit is contained in:
Łukasz Magiera 2022-04-28 17:45:32 +02:00 committed by GitHub
commit f9a4a400da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 515 additions and 9 deletions

View File

@ -54,6 +54,8 @@ type StorageMiner interface {
ComputeWindowPoSt(ctx context.Context, dlIdx uint64, tsk types.TipSetKey) ([]miner.SubmitWindowedPoStParams, error) //perm:admin
ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) //perm:admin
// Temp api for testing
PledgeSector(context.Context) (abi.SectorID, error) //perm:write
@ -124,6 +126,7 @@ type StorageMiner interface {
WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) //perm:admin
//storiface.WorkerReturn
ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true
ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true
ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error //perm:admin retry:true
ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error //perm:admin retry:true

View File

@ -34,6 +34,7 @@ type Worker interface {
Info(context.Context) (storiface.WorkerInfo, error) //perm:admin
// storiface.WorkerCalls
DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) //perm:admin
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) //perm:admin

View File

@ -641,6 +641,8 @@ type StorageMinerStruct struct {
CheckProvable func(p0 context.Context, p1 abi.RegisteredPoStProof, p2 []storage.SectorRef, p3 bool) (map[abi.SectorNumber]string, error) `perm:"admin"`
ComputeDataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (abi.PieceInfo, error) `perm:"admin"`
ComputeProof func(p0 context.Context, p1 []builtin.ExtendedSectorInfo, p2 abi.PoStRandomness, p3 abi.ChainEpoch, p4 abinetwork.Version) ([]builtin.PoStProof, error) `perm:"read"`
ComputeWindowPoSt func(p0 context.Context, p1 uint64, p2 types.TipSetKey) ([]miner.SubmitWindowedPoStParams, error) `perm:"admin"`
@ -743,6 +745,8 @@ type StorageMinerStruct struct {
ReturnAddPiece func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"`
ReturnDataCid func(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error `perm:"admin"`
ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
@ -894,6 +898,8 @@ type WorkerStruct struct {
Internal struct {
AddPiece func(p0 context.Context, p1 storage.SectorRef, p2 []abi.UnpaddedPieceSize, p3 abi.UnpaddedPieceSize, p4 storage.Data) (storiface.CallID, error) `perm:"admin"`
DataCid func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) `perm:"admin"`
Enabled func(p0 context.Context) (bool, error) `perm:"admin"`
Fetch func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
@ -3861,6 +3867,17 @@ func (s *StorageMinerStub) CheckProvable(p0 context.Context, p1 abi.RegisteredPo
return *new(map[abi.SectorNumber]string), ErrNotSupported
}
func (s *StorageMinerStruct) ComputeDataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (abi.PieceInfo, error) {
if s.Internal.ComputeDataCid == nil {
return *new(abi.PieceInfo), ErrNotSupported
}
return s.Internal.ComputeDataCid(p0, p1, p2)
}
func (s *StorageMinerStub) ComputeDataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (abi.PieceInfo, error) {
return *new(abi.PieceInfo), ErrNotSupported
}
func (s *StorageMinerStruct) ComputeProof(p0 context.Context, p1 []builtin.ExtendedSectorInfo, p2 abi.PoStRandomness, p3 abi.ChainEpoch, p4 abinetwork.Version) ([]builtin.PoStProof, error) {
if s.Internal.ComputeProof == nil {
return *new([]builtin.PoStProof), ErrNotSupported
@ -4422,6 +4439,17 @@ func (s *StorageMinerStub) ReturnAddPiece(p0 context.Context, p1 storiface.CallI
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnDataCid(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error {
if s.Internal.ReturnDataCid == nil {
return ErrNotSupported
}
return s.Internal.ReturnDataCid(p0, p1, p2, p3)
}
func (s *StorageMinerStub) ReturnDataCid(p0 context.Context, p1 storiface.CallID, p2 abi.PieceInfo, p3 *storiface.CallError) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnFetch(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnFetch == nil {
return ErrNotSupported
@ -5159,6 +5187,17 @@ func (s *WorkerStub) AddPiece(p0 context.Context, p1 storage.SectorRef, p2 []abi
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) {
if s.Internal.DataCid == nil {
return *new(storiface.CallID), ErrNotSupported
}
return s.Internal.DataCid(p0, p1, p2)
}
func (s *WorkerStub) DataCid(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data) (storiface.CallID, error) {
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) Enabled(p0 context.Context) (bool, error) {
if s.Internal.Enabled == nil {
return false, ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -313,7 +313,7 @@ var runCmd = &cli.Command{
}
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("addpiece")) && cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
taskTypes = append(taskTypes, sealtasks.TTAddPiece, sealtasks.TTDataCid)
}
if (workerType == sealtasks.WorkerSealing || cctx.IsSet("precommit1")) && cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)

View File

@ -23,6 +23,7 @@ var tasksCmd = &cli.Command{
var allowSetting = map[sealtasks.TaskType]struct{}{
sealtasks.TTAddPiece: {},
sealtasks.TTDataCid: {},
sealtasks.TTPreCommit1: {},
sealtasks.TTPreCommit2: {},
sealtasks.TTCommit2: {},

View File

@ -15,6 +15,7 @@
* [Check](#Check)
* [CheckProvable](#CheckProvable)
* [Compute](#Compute)
* [ComputeDataCid](#ComputeDataCid)
* [ComputeProof](#ComputeProof)
* [ComputeWindowPoSt](#ComputeWindowPoSt)
* [Create](#Create)
@ -105,6 +106,7 @@
* [PledgeSector](#PledgeSector)
* [Return](#Return)
* [ReturnAddPiece](#ReturnAddPiece)
* [ReturnDataCid](#ReturnDataCid)
* [ReturnFetch](#ReturnFetch)
* [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate)
* [ReturnFinalizeSector](#ReturnFinalizeSector)
@ -361,6 +363,29 @@ Response:
## Compute
### ComputeDataCid
Perms: admin
Inputs:
```json
[
1024,
{}
]
```
Response:
```json
{
"Size": 1032,
"PieceCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
}
```
### ComputeProof
@ -2195,6 +2220,36 @@ Response:
### ReturnAddPiece
Perms: admin
Inputs:
```json
[
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
},
{
"Size": 1032,
"PieceCID": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
},
{
"Code": 0,
"Message": "string value"
}
]
```
Response: `{}`
### ReturnDataCid
storiface.WorkerReturn
@ -4020,6 +4075,88 @@ Response:
"BaseMinMemory": 68719476736
}
},
"seal/v0/datacid": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"3": {
"MinMemory": 4294967296,
"MaxMemory": 4294967296,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"4": {
"MinMemory": 8589934592,
"MaxMemory": 8589934592,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"8": {
"MinMemory": 4294967296,
"MaxMemory": 4294967296,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"9": {
"MinMemory": 8589934592,
"MaxMemory": 8589934592,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
}
},
"seal/v0/fetch": {
"0": {
"MinMemory": 1048576,

View File

@ -9,6 +9,8 @@
* [Version](#Version)
* [Add](#Add)
* [AddPiece](#AddPiece)
* [Data](#Data)
* [DataCid](#DataCid)
* [Finalize](#Finalize)
* [FinalizeReplicaUpdate](#FinalizeReplicaUpdate)
* [FinalizeSector](#FinalizeSector)
@ -520,6 +522,88 @@ Response:
"BaseMinMemory": 68719476736
}
},
"seal/v0/datacid": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"3": {
"MinMemory": 4294967296,
"MaxMemory": 4294967296,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"4": {
"MinMemory": 8589934592,
"MaxMemory": 8589934592,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"8": {
"MinMemory": 4294967296,
"MaxMemory": 4294967296,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"9": {
"MinMemory": 8589934592,
"MaxMemory": 8589934592,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
}
},
"seal/v0/fetch": {
"0": {
"MinMemory": 1048576,
@ -1242,7 +1326,6 @@ Response: `131584`
### AddPiece
storiface.WorkerCalls
Perms: admin
@ -1276,6 +1359,34 @@ Response:
}
```
## Data
### DataCid
storiface.WorkerCalls
Perms: admin
Inputs:
```json
[
1024,
{}
]
```
Response:
```json
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
}
```
## Finalize

View File

@ -173,7 +173,7 @@ NAME:
lotus-worker tasks enable - Enable a task type
USAGE:
lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK]
lotus-worker tasks enable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|DC|GSK]
OPTIONS:
--help, -h show help (default: false)
@ -186,7 +186,7 @@ NAME:
lotus-worker tasks disable - Disable a task type
USAGE:
lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|GSK]
lotus-worker tasks disable [command options] [UNS|C2|PC2|PC1|PR2|RU|AP|DC|GSK]
OPTIONS:
--help, -h show help (default: false)

View File

@ -51,6 +51,120 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error
return nil
}
func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU()
maxSizeSpt := abi.RegisteredSealProof_StackedDrg64GiBV1_1
throttle := make(chan []byte, parallel)
piecePromises := make([]func() (abi.PieceInfo, error), 0)
buf := make([]byte, chunk.Unpadded())
for i := 0; i < parallel; i++ {
if abi.UnpaddedPieceSize(i)*chunk.Unpadded() >= pieceSize {
break // won't use this many buffers
}
throttle <- make([]byte, chunk.Unpadded())
}
for {
var read int
for rbuf := buf; len(rbuf) > 0; {
n, err := pieceData.Read(rbuf)
if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)
}
rbuf = rbuf[n:]
read += n
if err == io.EOF {
break
}
}
if read == 0 {
break
}
done := make(chan struct {
cid.Cid
error
}, 1)
pbuf := <-throttle
copy(pbuf, buf[:read])
go func(read int) {
defer func() {
throttle <- pbuf
}()
c, err := sb.pieceCid(maxSizeSpt, pbuf[:read])
done <- struct {
cid.Cid
error
}{c, err}
}(read)
piecePromises = append(piecePromises, func() (abi.PieceInfo, error) {
select {
case e := <-done:
if e.error != nil {
return abi.PieceInfo{}, e.error
}
return abi.PieceInfo{
Size: abi.UnpaddedPieceSize(read).Padded(),
PieceCID: e.Cid,
}, nil
case <-ctx.Done():
return abi.PieceInfo{}, ctx.Err()
}
})
}
if len(piecePromises) == 1 {
return piecePromises[0]()
}
var payloadRoundedBytes abi.PaddedPieceSize
pieceCids := make([]abi.PieceInfo, len(piecePromises))
for i, promise := range piecePromises {
pinfo, err := promise()
if err != nil {
return abi.PieceInfo{}, err
}
pieceCids[i] = pinfo
payloadRoundedBytes += pinfo.Size
}
pieceCID, err := ffi.GenerateUnsealedCID(maxSizeSpt, pieceCids)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("generate unsealed CID: %w", err)
}
// validate that the pieceCID was properly formed
if _, err := commcid.CIDToPieceCommitmentV1(pieceCID); err != nil {
return abi.PieceInfo{}, err
}
if payloadRoundedBytes < pieceSize.Padded() {
paddedCid, err := commpffi.ZeroPadPieceCommitment(pieceCID, payloadRoundedBytes.Unpadded(), pieceSize)
if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("failed to pad data: %w", err)
}
pieceCID = paddedCid
}
return abi.PieceInfo{
Size: pieceSize.Padded(),
PieceCID: pieceCID,
}, nil
}
func (sb *Sealer) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, pieceSize abi.UnpaddedPieceSize, file storage.Data) (abi.PieceInfo, error) {
// TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20)

View File

@ -165,7 +165,7 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate,
}
if sc.AllowAddPiece {
localTasks = append(localTasks, sealtasks.TTAddPiece)
localTasks = append(localTasks, sealtasks.TTAddPiece, sealtasks.TTDataCid)
}
if sc.AllowPreCommit1 {
localTasks = append(localTasks, sealtasks.TTPreCommit1)
@ -327,6 +327,27 @@ func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error
return nil
}
func (m *Manager) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
selector := newTaskSelector()
var out abi.PieceInfo
err := m.sched.Schedule(ctx, storage.NoSectorRef, sealtasks.TTDataCid, selector, schedNop, func(ctx context.Context, w Worker) error {
p, err := m.waitSimpleCall(ctx)(w.DataCid(ctx, pieceSize, pieceData))
if err != nil {
return err
}
if p != nil {
out = p.(abi.PieceInfo)
}
return nil
})
return out, err
}
func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -975,6 +996,10 @@ func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storage.Sector
return out, waitErr
}
func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(ctx, callID, pi, err)
}
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(ctx, callID, pi, err)
}

View File

@ -80,6 +80,10 @@ func (mgr *SectorMgr) SectorsUnsealPiece(ctx context.Context, sector storage.Sec
panic("SectorMgr: unsealing piece: implement me")
}
func (mgr *SectorMgr) DataCid(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
panic("todo")
}
func (mgr *SectorMgr) AddPiece(ctx context.Context, sectorID storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, size abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
log.Warn("Add piece: ", sectorID, size, sectorID.ProofType)
@ -537,6 +541,10 @@ func (mgr *SectorMgr) CheckProvable(ctx context.Context, pp abi.RegisteredPoStPr
var _ storiface.WorkerReturn = &SectorMgr{}
func (mgr *SectorMgr) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
panic("not supported")
}

View File

@ -68,6 +68,10 @@ type schedTestWorker struct {
ignoreResources bool
}
func (s *schedTestWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
panic("implement me")
}

View File

@ -3,6 +3,7 @@ package sealtasks
type TaskType string
const (
TTDataCid TaskType = "seal/v0/datacid"
TTAddPiece TaskType = "seal/v0/addpiece"
TTPreCommit1 TaskType = "seal/v0/precommit/1"
TTPreCommit2 TaskType = "seal/v0/precommit/2"
@ -25,7 +26,8 @@ const (
)
var order = map[TaskType]int{
TTRegenSectorKey: 10, // least priority
TTRegenSectorKey: 11, // least priority
TTDataCid: 10,
TTAddPiece: 9,
TTReplicaUpdate: 8,
TTProveReplicaUpdate2: 7,
@ -44,6 +46,7 @@ var order = map[TaskType]int{
}
var shortNames = map[TaskType]string{
TTDataCid: "DC",
TTAddPiece: "AP",
TTPreCommit1: "PC1",

View File

@ -569,6 +569,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
func init() {
ResourceTable[sealtasks.TTUnseal] = ResourceTable[sealtasks.TTPreCommit1] // TODO: measure accurately
ResourceTable[sealtasks.TTRegenSectorKey] = ResourceTable[sealtasks.TTReplicaUpdate]
ResourceTable[sealtasks.TTDataCid] = ResourceTable[sealtasks.TTAddPiece]
// V1_1 is the same as V1
for _, m := range ResourceTable {

View File

@ -117,6 +117,7 @@ var UndefCall CallID
type WorkerCalls interface {
// async
DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (CallID, error)
SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (CallID, error)
SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (CallID, error)
@ -197,6 +198,7 @@ func Err(code ErrorCode, sub error) *CallError {
}
type WorkerReturn interface {
ReturnDataCid(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error

View File

@ -23,6 +23,10 @@ type testExec struct {
apch chan chan apres
}
func (t *testExec) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
panic("implement me")
}
func (t *testExec) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error) {
panic("implement me")
}

View File

@ -180,6 +180,7 @@ func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
type ReturnType string
const (
DataCid ReturnType = "DataCid"
AddPiece ReturnType = "AddPiece"
SealPreCommit1 ReturnType = "SealPreCommit1"
SealPreCommit2 ReturnType = "SealPreCommit2"
@ -232,6 +233,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
}
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{
DataCid: rfunc(storiface.WorkerReturn.ReturnDataCid),
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
@ -341,6 +343,17 @@ func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) e
return sb.NewSector(ctx, sector)
}
func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, storage.NoSectorRef, DataCid, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
return sb.DataCid(ctx, pieceSize, pieceData)
})
}
func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {

View File

@ -186,6 +186,12 @@ func (t *trackedWorker) FinalizeSector(ctx context.Context, sector storage.Secto
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalize, func() (storiface.CallID, error) { return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) })
}
func (t *trackedWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, storage.NoSectorRef, sealtasks.TTDataCid, func() (storiface.CallID, error) {
return t.Worker.DataCid(ctx, pieceSize, pieceData)
})
}
func (t *trackedWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTAddPiece, func() (storiface.CallID, error) {
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)

2
go.mod
View File

@ -53,7 +53,7 @@ require (
github.com/filecoin-project/specs-actors/v5 v5.0.4
github.com/filecoin-project/specs-actors/v6 v6.0.1
github.com/filecoin-project/specs-actors/v7 v7.0.0
github.com/filecoin-project/specs-storage v0.2.2
github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f
github.com/filecoin-project/test-vectors/schema v0.0.5
github.com/gbrlsnchs/jwt/v3 v3.0.1
github.com/gdamore/tcell/v2 v2.2.0

4
go.sum
View File

@ -402,8 +402,8 @@ github.com/filecoin-project/specs-actors/v6 v6.0.1/go.mod h1:V1AYfi5GkHXipx1mnVi
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1.0.20220118005651-2470cb39827e/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M=
github.com/filecoin-project/specs-actors/v7 v7.0.0 h1:FQN7tjt3o68hfb3qLFSJBoLMuOFY0REkFVLO/zXj8RU=
github.com/filecoin-project/specs-actors/v7 v7.0.0/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M=
github.com/filecoin-project/specs-storage v0.2.2 h1:6ugbtKQ6LTcTEnEIX9HkeCtTp1PCYO497P/bokF5tF4=
github.com/filecoin-project/specs-storage v0.2.2/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo=
github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f h1:+suJFu4RJt7aZRXvE+Innrpacap+Z8N87y6a1Cgkuqc=
github.com/filecoin-project/specs-storage v0.2.3-0.20220426183226-1a0a63c5990f/go.mod h1:6cc/lncmAxMUocPi0z1EPCX63beIX7F7UnlmUZ3hLQo=
github.com/filecoin-project/storetheindex v0.3.5 h1:KoS9TvjPm6zIZfUH8atAHJbVHOO7GTP1MdTG+v0eE+Q=
github.com/filecoin-project/storetheindex v0.3.5/go.mod h1:0r3d0kSpK63O6AvLr1CjAINLi+nWD49clzcnKV+GLpI=
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=

View File

@ -2,6 +2,7 @@ package itests
import (
"context"
"strings"
"sync/atomic"
"testing"
"time"
@ -40,6 +41,27 @@ func TestWorkerPledge(t *testing.T) {
miner.PledgeSectors(ctx, 1, 0, nil)
}
func TestWorkerDataCid(t *testing.T) {
ctx := context.Background()
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
/*
pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String())
*/
bigPiece := abi.PaddedPieceSize(16 << 20).Unpadded()
pi, err := miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece))))
require.NoError(t, err)
require.Equal(t, bigPiece.Padded(), pi.Size)
require.Equal(t, "baga6ea4seaqmhoxl2ybw5m2wyd3pt3h4zmp7j52yumzu2rar26twns3uocq7yfa", pi.PieceCID.String())
}
func TestWinningPostWorker(t *testing.T) {
prevIns := build.InsecurePoStValidation
build.InsecurePoStValidation = false

View File

@ -211,6 +211,7 @@ type RpcReader struct {
postBody io.ReadCloser // nil on initial head request
next chan *RpcReader // on head will get us the postBody after sending resStart
mustRedirect bool
eof bool
res chan readRes
beginOnce *sync.Once
@ -266,6 +267,10 @@ func (w *RpcReader) Read(p []byte) (int, error) {
w.beginPost()
})
if w.eof {
return 0, io.EOF
}
if w.mustRedirect {
return 0, ErrMustRedirect
}
@ -276,6 +281,9 @@ func (w *RpcReader) Read(p []byte) (int, error) {
n, err := w.postBody.Read(p)
if err != nil {
if err == io.EOF {
w.eof = true
}
w.closeOnce.Do(func() {
close(w.res)
})

View File

@ -425,6 +425,10 @@ func (sm *StorageMinerAPI) ComputeWindowPoSt(ctx context.Context, dlIdx uint64,
return sm.WdPoSt.ComputePoSt(ctx, dlIdx, ts)
}
func (sm *StorageMinerAPI) ComputeDataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData sto.Data) (abi.PieceInfo, error) {
return sm.StorageMgr.DataCid(ctx, pieceSize, pieceData)
}
func (sm *StorageMinerAPI) WorkerConnect(ctx context.Context, url string) error {
w, err := connectRemoteWorker(ctx, sm, url)
if err != nil {