WIP sector storage and integration test
This commit is contained in:
parent
9d4489360b
commit
7d2b3f05db
@ -118,17 +118,20 @@ type StorageMiner interface {
|
||||
WorkerJobs(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) //perm:admin
|
||||
|
||||
//storiface.WorkerReturn
|
||||
ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, out storage.ReplicaUpdateOut, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, vanillaProofs storage.ReplicaVanillaProofs, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storage.ReplicaUpdateProof, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
|
||||
// SealingSchedDiag dumps internal sealing scheduler state
|
||||
SealingSchedDiag(ctx context.Context, doSched bool) (interface{}, error) //perm:admin
|
||||
|
@ -39,6 +39,9 @@ type Worker interface {
|
||||
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
|
||||
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
|
||||
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
|
||||
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
|
||||
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
|
||||
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
|
||||
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
|
||||
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
|
||||
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
|
||||
|
@ -709,10 +709,16 @@ type StorageMinerStruct struct {
|
||||
|
||||
ReturnMoveStorage func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnProveReplicaUpdate1 func(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaVanillaProofs, p3 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnProveReplicaUpdate2 func(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaUpdateProof, p3 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnReadPiece func(p0 context.Context, p1 storiface.CallID, p2 bool, p3 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnReleaseUnsealed func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaUpdateOut, p3 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnSealCommit1 func(p0 context.Context, p1 storiface.CallID, p2 storage.Commit1Out, p3 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnSealCommit2 func(p0 context.Context, p1 storiface.CallID, p2 storage.Proof, p3 *storiface.CallError) error `perm:"admin"`
|
||||
@ -852,10 +858,16 @@ type WorkerStruct struct {
|
||||
|
||||
ProcessSession func(p0 context.Context) (uuid.UUID, error) `perm:"admin"`
|
||||
|
||||
ProveReplicaUpdate1 func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid, p3 cid.Cid, p4 cid.Cid) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
ProveReplicaUpdate2 func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid, p3 cid.Cid, p4 cid.Cid, p5 storage.ReplicaVanillaProofs) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
ReleaseUnsealed func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
Remove func(p0 context.Context, p1 abi.SectorID) error `perm:"admin"`
|
||||
|
||||
ReplicaUpdate func(p0 context.Context, p1 storage.SectorRef, p2 []abi.PieceInfo) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
SealCommit1 func(p0 context.Context, p1 storage.SectorRef, p2 abi.SealRandomness, p3 abi.InteractiveSealRandomness, p4 []abi.PieceInfo, p5 storage.SectorCids) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
SealCommit2 func(p0 context.Context, p1 storage.SectorRef, p2 storage.Commit1Out) (storiface.CallID, error) `perm:"admin"`
|
||||
@ -4165,6 +4177,28 @@ func (s *StorageMinerStub) ReturnMoveStorage(p0 context.Context, p1 storiface.Ca
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnProveReplicaUpdate1(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaVanillaProofs, p3 *storiface.CallError) error {
|
||||
if s.Internal.ReturnProveReplicaUpdate1 == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ReturnProveReplicaUpdate1(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) ReturnProveReplicaUpdate1(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaVanillaProofs, p3 *storiface.CallError) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnProveReplicaUpdate2(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaUpdateProof, p3 *storiface.CallError) error {
|
||||
if s.Internal.ReturnProveReplicaUpdate2 == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ReturnProveReplicaUpdate2(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) ReturnProveReplicaUpdate2(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaUpdateProof, p3 *storiface.CallError) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnReadPiece(p0 context.Context, p1 storiface.CallID, p2 bool, p3 *storiface.CallError) error {
|
||||
if s.Internal.ReturnReadPiece == nil {
|
||||
return ErrNotSupported
|
||||
@ -4187,6 +4221,17 @@ func (s *StorageMinerStub) ReturnReleaseUnsealed(p0 context.Context, p1 storifac
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnReplicaUpdate(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaUpdateOut, p3 *storiface.CallError) error {
|
||||
if s.Internal.ReturnReplicaUpdate == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ReturnReplicaUpdate(p0, p1, p2, p3)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) ReturnReplicaUpdate(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaUpdateOut, p3 *storiface.CallError) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnSealCommit1(p0 context.Context, p1 storiface.CallID, p2 storage.Commit1Out, p3 *storiface.CallError) error {
|
||||
if s.Internal.ReturnSealCommit1 == nil {
|
||||
return ErrNotSupported
|
||||
@ -4858,6 +4903,28 @@ func (s *WorkerStub) ProcessSession(p0 context.Context) (uuid.UUID, error) {
|
||||
return *new(uuid.UUID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) ProveReplicaUpdate1(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid, p3 cid.Cid, p4 cid.Cid) (storiface.CallID, error) {
|
||||
if s.Internal.ProveReplicaUpdate1 == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
return s.Internal.ProveReplicaUpdate1(p0, p1, p2, p3, p4)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) ProveReplicaUpdate1(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid, p3 cid.Cid, p4 cid.Cid) (storiface.CallID, error) {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) ProveReplicaUpdate2(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid, p3 cid.Cid, p4 cid.Cid, p5 storage.ReplicaVanillaProofs) (storiface.CallID, error) {
|
||||
if s.Internal.ProveReplicaUpdate2 == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
return s.Internal.ProveReplicaUpdate2(p0, p1, p2, p3, p4, p5)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) ProveReplicaUpdate2(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid, p3 cid.Cid, p4 cid.Cid, p5 storage.ReplicaVanillaProofs) (storiface.CallID, error) {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) ReleaseUnsealed(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) {
|
||||
if s.Internal.ReleaseUnsealed == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
@ -4880,6 +4947,17 @@ func (s *WorkerStub) Remove(p0 context.Context, p1 abi.SectorID) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) ReplicaUpdate(p0 context.Context, p1 storage.SectorRef, p2 []abi.PieceInfo) (storiface.CallID, error) {
|
||||
if s.Internal.ReplicaUpdate == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
return s.Internal.ReplicaUpdate(p0, p1, p2)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) ReplicaUpdate(p0 context.Context, p1 storage.SectorRef, p2 []abi.PieceInfo) (storiface.CallID, error) {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) SealCommit1(p0 context.Context, p1 storage.SectorRef, p2 abi.SealRandomness, p3 abi.InteractiveSealRandomness, p4 []abi.PieceInfo, p5 storage.SectorCids) (storiface.CallID, error) {
|
||||
if s.Internal.SealCommit1 == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -95,8 +95,11 @@
|
||||
* [ReturnFetch](#ReturnFetch)
|
||||
* [ReturnFinalizeSector](#ReturnFinalizeSector)
|
||||
* [ReturnMoveStorage](#ReturnMoveStorage)
|
||||
* [ReturnProveReplicaUpdate1](#ReturnProveReplicaUpdate1)
|
||||
* [ReturnProveReplicaUpdate2](#ReturnProveReplicaUpdate2)
|
||||
* [ReturnReadPiece](#ReturnReadPiece)
|
||||
* [ReturnReleaseUnsealed](#ReturnReleaseUnsealed)
|
||||
* [ReturnReplicaUpdate](#ReturnReplicaUpdate)
|
||||
* [ReturnSealCommit1](#ReturnSealCommit1)
|
||||
* [ReturnSealCommit2](#ReturnSealCommit2)
|
||||
* [ReturnSealPreCommit1](#ReturnSealPreCommit1)
|
||||
@ -1485,6 +1488,56 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ReturnProveReplicaUpdate1
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
},
|
||||
null,
|
||||
{
|
||||
"Code": 0,
|
||||
"Message": "string value"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ReturnProveReplicaUpdate2
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
},
|
||||
null,
|
||||
{
|
||||
"Code": 0,
|
||||
"Message": "string value"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ReturnReadPiece
|
||||
|
||||
|
||||
@ -1534,6 +1587,38 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ReturnReplicaUpdate
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
},
|
||||
{
|
||||
"NewSealed": {
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
"NewUnsealed": {
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Code": 0,
|
||||
"Message": "string value"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ReturnSealCommit1
|
||||
|
||||
|
||||
|
@ -15,8 +15,13 @@
|
||||
* [MoveStorage](#MoveStorage)
|
||||
* [Process](#Process)
|
||||
* [ProcessSession](#ProcessSession)
|
||||
* [Prove](#Prove)
|
||||
* [ProveReplicaUpdate1](#ProveReplicaUpdate1)
|
||||
* [ProveReplicaUpdate2](#ProveReplicaUpdate2)
|
||||
* [Release](#Release)
|
||||
* [ReleaseUnsealed](#ReleaseUnsealed)
|
||||
* [Replica](#Replica)
|
||||
* [ReplicaUpdate](#ReplicaUpdate)
|
||||
* [Seal](#Seal)
|
||||
* [SealCommit1](#SealCommit1)
|
||||
* [SealCommit2](#SealCommit2)
|
||||
@ -262,12 +267,125 @@ Inputs: `null`
|
||||
|
||||
Response: `"07070707-0707-0707-0707-070707070707"`
|
||||
|
||||
## Prove
|
||||
|
||||
|
||||
### ProveReplicaUpdate1
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"ID": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ProofType": 8
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
```
|
||||
|
||||
### ProveReplicaUpdate2
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"ID": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ProofType": 8
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
},
|
||||
null
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
```
|
||||
|
||||
## Release
|
||||
|
||||
|
||||
### ReleaseUnsealed
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"ID": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ProofType": 8
|
||||
},
|
||||
null
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
```
|
||||
|
||||
## Replica
|
||||
|
||||
|
||||
### ReplicaUpdate
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
|
2
extern/filecoin-ffi
vendored
2
extern/filecoin-ffi
vendored
@ -1 +1 @@
|
||||
Subproject commit 58c014a42b7a21e73560879841a71e679126a852
|
||||
Subproject commit b1a66cfd12686a8af6030fccace49916849b1954
|
83
extern/sector-storage/ffiwrapper/sealer_cgo.go
vendored
83
extern/sector-storage/ffiwrapper/sealer_cgo.go
vendored
@ -21,6 +21,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/detailyang/go-fallocate"
|
||||
commpffi "github.com/filecoin-project/go-commp-utils/ffiwrapper"
|
||||
"github.com/filecoin-project/go-commp-utils/zerocomm"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/fr32"
|
||||
@ -582,6 +583,88 @@ func (sb *Sealer) SealCommit2(ctx context.Context, sector storage.SectorRef, pha
|
||||
return ffi.SealCommitPhase2(phase1Out, sector.ID.Number, sector.ID.Miner)
|
||||
}
|
||||
|
||||
func (sb *Sealer) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storage.ReplicaUpdateOut, error) {
|
||||
empty := storage.ReplicaUpdateOut{}
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return empty, xerrors.Errorf("failed to acquire sector paths: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
|
||||
|
||||
s, err := os.Stat(paths.Sealed)
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
sealedSize := s.Size()
|
||||
|
||||
u, err := os.OpenFile(paths.Update, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
|
||||
if err != nil {
|
||||
return empty, xerrors.Errorf("ensuring updated replica file exists: %w", err)
|
||||
}
|
||||
if err := fallocate.Fallocate(u, 0, sealedSize); err != nil {
|
||||
return empty, xerrors.Errorf("allocating space for replica update file: %w", err)
|
||||
}
|
||||
|
||||
if err := u.Close(); err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
if err := os.Mkdir(paths.UpdateCache, 0755); err != nil { // nolint
|
||||
if os.IsExist(err) {
|
||||
log.Warnf("existing cache in %s; removing", paths.Cache)
|
||||
|
||||
if err := os.RemoveAll(paths.UpdateCache); err != nil {
|
||||
return empty, xerrors.Errorf("remove existing sector cache from %s (sector %d): %w", paths.Cache, sector, err)
|
||||
}
|
||||
|
||||
if err := os.Mkdir(paths.UpdateCache, 0755); err != nil { // nolint:gosec
|
||||
return empty, xerrors.Errorf("mkdir cache path after cleanup: %w", err)
|
||||
}
|
||||
} else {
|
||||
return empty, err
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: we want to keep the stuff at the end
|
||||
if err := os.Truncate(paths.Unsealed, sealedSize); err != nil {
|
||||
return empty, xerrors.Errorf("failed to truncate unsealed data file: %w", err)
|
||||
}
|
||||
|
||||
sealed, unsealed, err := ffi.SectorUpdate.EncodeInto(updateProofType, paths.Update, paths.UpdateCache, paths.Sealed, paths.Cache, paths.Unsealed, pieces)
|
||||
if err != nil {
|
||||
return empty, xerrors.Errorf("failed to update replica %d with new deal data: %w", sector.ID.Number, err)
|
||||
}
|
||||
|
||||
return storage.ReplicaUpdateOut{NewSealed: sealed, NewUnsealed: unsealed}, nil
|
||||
}
|
||||
|
||||
func (sb *Sealer) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storage.ReplicaVanillaProofs, error) {
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache|storiface.FTUpdateCache|storiface.FTUpdate, storiface.FTNone, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to acquire sector paths: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
|
||||
|
||||
vanillaProofs, err := ffi.SectorUpdate.GenerateUpdateVanillaProofs(updateProofType, sectorKey, newSealed, newUnsealed, paths.Update, paths.UpdateCache, paths.Sealed, paths.Cache)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to generate proof of replica update for sector %d: %w", sector.ID.Number, err)
|
||||
}
|
||||
return vanillaProofs, nil
|
||||
}
|
||||
|
||||
func (sb *Sealer) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storage.ReplicaUpdateProof, error) {
|
||||
updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
|
||||
return ffi.SectorUpdate.GenerateUpdateProofWithVanilla(updateProofType, sectorKey, newSealed, newUnsealed, vanillaProofs)
|
||||
}
|
||||
|
||||
func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storage.SectorRef) error {
|
||||
return xerrors.Errorf("not supported at this layer")
|
||||
}
|
||||
|
||||
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
|
||||
ssize, err := sector.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
|
@ -6,14 +6,13 @@ package ffiwrapper
|
||||
import (
|
||||
"context"
|
||||
|
||||
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
|
||||
|
||||
"go.opencensus.io/trace"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
ffi "github.com/filecoin-project/filecoin-ffi"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
proof5 "github.com/filecoin-project/specs-actors/v5/actors/runtime/proof"
|
||||
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
|
161
extern/sector-storage/manager.go
vendored
161
extern/sector-storage/manager.go
vendored
@ -578,6 +578,10 @@ func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) ReleaseSealed(ctx context.Context, sector storage.SectorRef) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
@ -601,6 +605,151 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Manager) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (out storage.ReplicaUpdateOut, err error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTReplicaUpdate, sector, pieces)
|
||||
if err != nil {
|
||||
return storage.ReplicaUpdateOut{}, xerrors.Errorf("getWork: %w", err)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
var waitErr error
|
||||
waitRes := func() {
|
||||
p, werr := m.waitWork(ctx, wk)
|
||||
if werr != nil {
|
||||
waitErr = werr
|
||||
return
|
||||
}
|
||||
if p != nil {
|
||||
out = p.(storage.ReplicaUpdateOut)
|
||||
}
|
||||
}
|
||||
|
||||
if wait { // already in progress
|
||||
waitRes()
|
||||
return out, waitErr
|
||||
}
|
||||
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTCache, storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
|
||||
return storage.ReplicaUpdateOut{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
selector := newAllocSelector(m.index, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing)
|
||||
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReplicaUpdate, selector, m.schedFetch(sector, storiface.FTSealed, storiface.PathSealing, storiface.AcquireCopy), func(ctx context.Context, w Worker) error {
|
||||
|
||||
err := m.startWork(ctx, w, wk)(w.ReplicaUpdate(ctx, sector, pieces))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
waitRes()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return storage.ReplicaUpdateOut{}, err
|
||||
}
|
||||
return out, waitErr
|
||||
}
|
||||
|
||||
func (m *Manager) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (out storage.ReplicaVanillaProofs, err error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTProveReplicaUpdate1, sector, sectorKey, newSealed, newUnsealed)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getWork: %w", err)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
var waitErr error
|
||||
waitRes := func() {
|
||||
p, werr := m.waitWork(ctx, wk)
|
||||
if werr != nil {
|
||||
waitErr = werr
|
||||
return
|
||||
}
|
||||
if p != nil {
|
||||
out = p.(storage.ReplicaVanillaProofs)
|
||||
}
|
||||
}
|
||||
|
||||
if wait { // already in progress
|
||||
waitRes()
|
||||
return out, waitErr
|
||||
}
|
||||
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTSealed|storiface.FTUpdate|storiface.FTCache|storiface.FTUpdateCache, storiface.FTNone); err != nil {
|
||||
return nil, xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTSealed|storiface.FTCache, true)
|
||||
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTProveReplicaUpdate1, selector, m.schedFetch(sector, storiface.FTSealed, storiface.PathSealing, storiface.AcquireCopy), func(ctx context.Context, w Worker) error {
|
||||
|
||||
err := m.startWork(ctx, w, wk)(w.ProveReplicaUpdate1(ctx, sector, sectorKey, newSealed, newUnsealed))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
waitRes()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out, waitErr
|
||||
}
|
||||
|
||||
func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (out storage.ReplicaUpdateProof, err error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTProveReplicaUpdate2, sector, sectorKey, newSealed, newUnsealed, vanillaProofs)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getWork: %w", err)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
var waitErr error
|
||||
waitRes := func() {
|
||||
p, werr := m.waitWork(ctx, wk)
|
||||
if werr != nil {
|
||||
waitErr = werr
|
||||
return
|
||||
}
|
||||
if p != nil {
|
||||
out = p.(storage.ReplicaUpdateProof)
|
||||
}
|
||||
}
|
||||
|
||||
if wait { // already in progress
|
||||
waitRes()
|
||||
return out, waitErr
|
||||
}
|
||||
|
||||
selector := newTaskSelector()
|
||||
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTProveReplicaUpdate2, selector, schedNop, func(ctx context.Context, w Worker) error {
|
||||
err := m.startWork(ctx, w, wk)(w.ProveReplicaUpdate2(ctx, sector, sectorKey, newSealed, newUnsealed, vanillaProofs))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
waitRes()
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return out, waitErr
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, pi, err)
|
||||
}
|
||||
@ -629,6 +778,18 @@ func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.Ca
|
||||
return m.returnResult(ctx, callID, nil, err)
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, out storage.ReplicaUpdateOut, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, out, err)
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, out storage.ReplicaVanillaProofs, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, out, err)
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storage.ReplicaUpdateProof, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, proof, err)
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, nil, err)
|
||||
}
|
||||
|
1
extern/sector-storage/manager_calltracker.go
vendored
1
extern/sector-storage/manager_calltracker.go
vendored
@ -385,7 +385,6 @@ func (m *Manager) returnResult(ctx context.Context, callID storiface.CallID, r i
|
||||
if ok {
|
||||
return xerrors.Errorf("result for call %v already reported", wid)
|
||||
}
|
||||
|
||||
m.results[wid] = res
|
||||
|
||||
err := m.work.Get(wid).Mutate(func(ws *WorkState) error {
|
||||
|
139
extern/sector-storage/manager_test.go
vendored
139
extern/sector-storage/manager_test.go
vendored
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -17,10 +18,12 @@ import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
@ -65,11 +68,11 @@ func newTestStorage(t *testing.T) *testStorage {
|
||||
}
|
||||
|
||||
func (t testStorage) cleanup() {
|
||||
for _, path := range t.StoragePaths {
|
||||
if err := os.RemoveAll(path.Path); err != nil {
|
||||
fmt.Println("Cleanup error:", err)
|
||||
}
|
||||
}
|
||||
// for _, path := range t.StoragePaths {
|
||||
// if err := os.RemoveAll(path.Path); err != nil {
|
||||
// fmt.Println("Cleanup error:", err)
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
func (t testStorage) GetStorage() (stores.StorageConfig, error) {
|
||||
@ -162,6 +165,132 @@ func TestSimple(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
type Reader struct{}
|
||||
|
||||
func (Reader) Read(out []byte) (int, error) {
|
||||
for i := range out {
|
||||
out[i] = 0
|
||||
}
|
||||
return len(out), nil
|
||||
}
|
||||
|
||||
type NullReader struct {
|
||||
*io.LimitedReader
|
||||
}
|
||||
|
||||
func NewNullReader(size abi.UnpaddedPieceSize) io.Reader {
|
||||
return &NullReader{(io.LimitReader(&Reader{}, int64(size))).(*io.LimitedReader)}
|
||||
}
|
||||
|
||||
func (m NullReader) NullBytes() int64 {
|
||||
return m.N
|
||||
}
|
||||
|
||||
func TestSnapDeals(t *testing.T) {
|
||||
logging.SetAllLoggers(logging.LevelWarn)
|
||||
ctx := context.Background()
|
||||
m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, datastore.NewMapDatastore())
|
||||
defer cleanup()
|
||||
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, sealtasks.TTCommit2, sealtasks.TTFinalize,
|
||||
sealtasks.TTFetch, sealtasks.TTReplicaUpdate, sealtasks.TTProveReplicaUpdate1, sealtasks.TTProveReplicaUpdate2,
|
||||
}
|
||||
wds := datastore.NewMapDatastore()
|
||||
|
||||
w := NewLocalWorker(WorkerConfig{TaskTypes: localTasks}, stor, lstor, idx, m, statestore.New(wds))
|
||||
err := m.AddWorker(ctx, w)
|
||||
require.NoError(t, err)
|
||||
|
||||
proofType := abi.RegisteredSealProof_StackedDrg2KiBV1
|
||||
ptStr := os.Getenv("LOTUS_TEST_SNAP_DEALS_PROOF_TYPE")
|
||||
switch ptStr {
|
||||
case "2k":
|
||||
case "8M":
|
||||
proofType = abi.RegisteredSealProof_StackedDrg8MiBV1
|
||||
case "512M":
|
||||
proofType = abi.RegisteredSealProof_StackedDrg512MiBV1
|
||||
case "32G":
|
||||
proofType = abi.RegisteredSealProof_StackedDrg32GiBV1
|
||||
case "64G":
|
||||
proofType = abi.RegisteredSealProof_StackedDrg64GiBV1
|
||||
default:
|
||||
log.Warn("Unspecified proof type, make sure to set LOTUS_TEST_SNAP_DEALS_PROOF_TYPE to '2k', '8M', '512M', '32G' or '64G'")
|
||||
log.Warn("Continuing test with 2k sectors")
|
||||
}
|
||||
|
||||
sid := storage.SectorRef{
|
||||
ID: abi.SectorID{Miner: 1000, Number: 1},
|
||||
ProofType: proofType,
|
||||
}
|
||||
ss, err := proofType.SectorSize()
|
||||
require.NoError(t, err)
|
||||
|
||||
unpaddedSectorSize := abi.PaddedPieceSize(ss).Unpadded()
|
||||
|
||||
// Pack sector with no pieces
|
||||
p0, err := m.AddPiece(ctx, sid, nil, unpaddedSectorSize, NewNullReader(unpaddedSectorSize))
|
||||
require.NoError(t, err)
|
||||
ccPieces := []abi.PieceInfo{p0}
|
||||
|
||||
// Precommit and Seal a CC sector
|
||||
fmt.Printf("PC1\n")
|
||||
ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9}
|
||||
pc1Out, err := m.SealPreCommit1(ctx, sid, ticket, ccPieces)
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("PC2\n")
|
||||
pc2Out, err := m.SealPreCommit2(ctx, sid, pc1Out)
|
||||
|
||||
require.NoError(t, err)
|
||||
seed := abi.InteractiveSealRandomness{1, 1, 1, 1, 1, 1, 1}
|
||||
fmt.Printf("C1\n")
|
||||
c1Out, err := m.SealCommit1(ctx, sid, ticket, seed, nil, pc2Out)
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("C2\n")
|
||||
_, err = m.SealCommit2(ctx, sid, c1Out)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now do a snap deals replica update
|
||||
sectorKey := pc2Out.Sealed
|
||||
|
||||
// Two pieces each half the size of the sector
|
||||
unpaddedPieceSize := unpaddedSectorSize / 2
|
||||
p1, err := m.AddPiece(ctx, sid, nil, unpaddedPieceSize, strings.NewReader(strings.Repeat("k", int(unpaddedPieceSize))))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, unpaddedPieceSize.Padded(), p1.Size)
|
||||
|
||||
p2, err := m.AddPiece(ctx, sid, []abi.UnpaddedPieceSize{p1.Size.Unpadded()}, unpaddedPieceSize, strings.NewReader(strings.Repeat("j", int(unpaddedPieceSize))))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, unpaddedPieceSize.Padded(), p1.Size)
|
||||
|
||||
pieces := []abi.PieceInfo{p1, p2}
|
||||
fmt.Printf("RU\n")
|
||||
out, err := m.ReplicaUpdate(ctx, sid, pieces)
|
||||
require.NoError(t, err)
|
||||
updateProofType, err := sid.ProofType.RegisteredUpdateProof()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
fmt.Printf("PR1\n")
|
||||
vanillaProofs, err := m.ProveReplicaUpdate1(ctx, sid, sectorKey, out.NewSealed, out.NewUnsealed)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, vanillaProofs)
|
||||
fmt.Printf("PR2\n")
|
||||
proof, err := m.ProveReplicaUpdate2(ctx, sid, sectorKey, out.NewSealed, out.NewUnsealed, vanillaProofs)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, proof)
|
||||
|
||||
vInfo := proof7.ReplicaUpdateInfo{
|
||||
Proof: proof,
|
||||
UpdateProofType: updateProofType,
|
||||
OldSealedSectorCID: sectorKey,
|
||||
NewSealedSectorCID: out.NewSealed,
|
||||
NewUnsealedSectorCID: out.NewUnsealed,
|
||||
}
|
||||
pass, err := ffiwrapper.ProofVerifier.VerifyReplicaUpdate(vInfo)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, pass)
|
||||
}
|
||||
|
||||
func TestRedoPC1(t *testing.T) {
|
||||
logging.SetAllLoggers(logging.LevelDebug)
|
||||
|
||||
|
32
extern/sector-storage/mock/mock.go
vendored
32
extern/sector-storage/mock/mock.go
vendored
@ -263,6 +263,24 @@ func (mgr *SectorMgr) SealCommit2(ctx context.Context, sid storage.SectorRef, ph
|
||||
return out[:], nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReplicaUpdate(ctx context.Context, sid storage.SectorRef, pieces []abi.PieceInfo) (storage.ReplicaUpdateOut, error) {
|
||||
out := storage.ReplicaUpdateOut{}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storage.ReplicaVanillaProofs, error) {
|
||||
out := make([][]byte, 0)
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storage.ReplicaUpdateProof, error) {
|
||||
return make([]byte, 0), nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReleaseSealed(ctx context.Context, sid storage.SectorRef) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Test Instrumentation Methods
|
||||
|
||||
func (mgr *SectorMgr) MarkFailed(sid storage.SectorRef, failed bool) error {
|
||||
@ -458,6 +476,8 @@ func (mgr *SectorMgr) CheckProvable(ctx context.Context, pp abi.RegisteredPoStPr
|
||||
return bad, nil
|
||||
}
|
||||
|
||||
var _ storiface.WorkerReturn = &SectorMgr{}
|
||||
|
||||
func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
|
||||
panic("not supported")
|
||||
}
|
||||
@ -502,6 +522,18 @@ func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID,
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, out storage.ReplicaUpdateOut, err *storiface.CallError) error {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, out storage.ReplicaVanillaProofs, err *storiface.CallError) error {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, out storage.ReplicaUpdateProof, err *storiface.CallError) error {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (m mockVerifProver) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
|
||||
plen, err := svi.SealProof.ProofSize()
|
||||
if err != nil {
|
||||
|
12
extern/sector-storage/sched_test.go
vendored
12
extern/sector-storage/sched_test.go
vendored
@ -100,6 +100,18 @@ func (s *schedTestWorker) AddPiece(ctx context.Context, sector storage.SectorRef
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, peices []abi.PieceInfo) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
27
extern/sector-storage/sealtasks/task.go
vendored
27
extern/sector-storage/sealtasks/task.go
vendored
@ -13,17 +13,24 @@ const (
|
||||
|
||||
TTFetch TaskType = "seal/v0/fetch"
|
||||
TTUnseal TaskType = "seal/v0/unseal"
|
||||
|
||||
TTReplicaUpdate TaskType = "seal/v0/replicaupdate"
|
||||
TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1"
|
||||
TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2"
|
||||
)
|
||||
|
||||
var order = map[TaskType]int{
|
||||
TTAddPiece: 6, // least priority
|
||||
TTPreCommit1: 5,
|
||||
TTPreCommit2: 4,
|
||||
TTCommit2: 3,
|
||||
TTCommit1: 2,
|
||||
TTUnseal: 1,
|
||||
TTFetch: -1,
|
||||
TTFinalize: -2, // most priority
|
||||
TTAddPiece: 9, // least priority
|
||||
TTReplicaUpdate: 8,
|
||||
TTProveReplicaUpdate2: 7,
|
||||
TTProveReplicaUpdate1: 6,
|
||||
TTPreCommit1: 5,
|
||||
TTPreCommit2: 4,
|
||||
TTCommit2: 3,
|
||||
TTCommit1: 2,
|
||||
TTUnseal: 1,
|
||||
TTFetch: -1,
|
||||
TTFinalize: -2, // most priority
|
||||
}
|
||||
|
||||
var shortNames = map[TaskType]string{
|
||||
@ -38,6 +45,10 @@ var shortNames = map[TaskType]string{
|
||||
|
||||
TTFetch: "GET",
|
||||
TTUnseal: "UNS",
|
||||
|
||||
TTReplicaUpdate: "RU",
|
||||
TTProveReplicaUpdate1: "PR1",
|
||||
TTProveReplicaUpdate2: "PR2",
|
||||
}
|
||||
|
||||
func (a TaskType) MuchLess(b TaskType) (bool, bool) {
|
||||
|
42
extern/sector-storage/storiface/filetype.go
vendored
42
extern/sector-storage/storiface/filetype.go
vendored
@ -12,11 +12,13 @@ const (
|
||||
FTUnsealed SectorFileType = 1 << iota
|
||||
FTSealed
|
||||
FTCache
|
||||
FTUpdate
|
||||
FTUpdateCache
|
||||
|
||||
FileTypes = iota
|
||||
)
|
||||
|
||||
var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache}
|
||||
var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache, FTUpdate, FTUpdateCache}
|
||||
|
||||
const (
|
||||
FTNone SectorFileType = 0
|
||||
@ -25,15 +27,21 @@ const (
|
||||
const FSOverheadDen = 10
|
||||
|
||||
var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
|
||||
FTUnsealed: FSOverheadDen,
|
||||
FTSealed: FSOverheadDen,
|
||||
FTCache: 141, // 11 layers + D(2x ssize) + C + R
|
||||
FTUnsealed: FSOverheadDen,
|
||||
FTSealed: FSOverheadDen,
|
||||
FTUpdate: FSOverheadDen,
|
||||
FTUpdateCache: FSOverheadDen * 2,
|
||||
FTCache: 141, // 11 layers + D(2x ssize) + C + R'
|
||||
}
|
||||
|
||||
// sector size * disk / fs overhead. FSOverheadDen is like the unit of sector size
|
||||
|
||||
var FsOverheadFinalized = map[SectorFileType]int{
|
||||
FTUnsealed: FSOverheadDen,
|
||||
FTSealed: FSOverheadDen,
|
||||
FTCache: 2,
|
||||
FTUnsealed: FSOverheadDen,
|
||||
FTSealed: FSOverheadDen,
|
||||
FTUpdate: FSOverheadDen * 2, // XXX: we should clear the update cache on Finalize???
|
||||
FTUpdateCache: FSOverheadDen,
|
||||
FTCache: 2,
|
||||
}
|
||||
|
||||
type SectorFileType int
|
||||
@ -46,6 +54,10 @@ func (t SectorFileType) String() string {
|
||||
return "sealed"
|
||||
case FTCache:
|
||||
return "cache"
|
||||
case FTUpdate:
|
||||
return "update"
|
||||
case FTUpdateCache:
|
||||
return "update-cache"
|
||||
default:
|
||||
return fmt.Sprintf("<unknown %d>", t)
|
||||
}
|
||||
@ -104,9 +116,11 @@ func (t SectorFileType) All() [FileTypes]bool {
|
||||
type SectorPaths struct {
|
||||
ID abi.SectorID
|
||||
|
||||
Unsealed string
|
||||
Sealed string
|
||||
Cache string
|
||||
Unsealed string
|
||||
Sealed string
|
||||
Cache string
|
||||
Update string
|
||||
UpdateCache string
|
||||
}
|
||||
|
||||
func ParseSectorID(baseName string) (abi.SectorID, error) {
|
||||
@ -139,6 +153,10 @@ func PathByType(sps SectorPaths, fileType SectorFileType) string {
|
||||
return sps.Sealed
|
||||
case FTCache:
|
||||
return sps.Cache
|
||||
case FTUpdate:
|
||||
return sps.Update
|
||||
case FTUpdateCache:
|
||||
return sps.UpdateCache
|
||||
}
|
||||
|
||||
panic("requested unknown path type")
|
||||
@ -152,5 +170,9 @@ func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) {
|
||||
sps.Sealed = p
|
||||
case FTCache:
|
||||
sps.Cache = p
|
||||
case FTUpdate:
|
||||
sps.Update = p
|
||||
case FTUpdateCache:
|
||||
sps.UpdateCache = p
|
||||
}
|
||||
}
|
||||
|
6
extern/sector-storage/storiface/worker.go
vendored
6
extern/sector-storage/storiface/worker.go
vendored
@ -92,6 +92,9 @@ type WorkerCalls interface {
|
||||
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error)
|
||||
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
|
||||
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error)
|
||||
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (CallID, error)
|
||||
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
|
||||
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (CallID, error)
|
||||
MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error)
|
||||
UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
|
||||
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
|
||||
@ -145,6 +148,9 @@ type WorkerReturn interface {
|
||||
ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err *CallError) error
|
||||
ReturnFinalizeSector(ctx context.Context, callID CallID, err *CallError) error
|
||||
ReturnReleaseUnsealed(ctx context.Context, callID CallID, err *CallError) error
|
||||
ReturnReplicaUpdate(ctx context.Context, callID CallID, out storage.ReplicaUpdateOut, err *CallError) error
|
||||
ReturnProveReplicaUpdate1(ctx context.Context, callID CallID, proofs storage.ReplicaVanillaProofs, err *CallError) error
|
||||
ReturnProveReplicaUpdate2(ctx context.Context, callID CallID, proof storage.ReplicaUpdateProof, err *CallError) error
|
||||
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
|
||||
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
|
||||
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
|
||||
|
16
extern/sector-storage/teststorage_test.go
vendored
16
extern/sector-storage/teststorage_test.go
vendored
@ -55,10 +55,26 @@ func (t *testExec) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) ReleaseSealed(ctx context.Context, sector storage.SectorRef) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) Remove(ctx context.Context, sector storage.SectorRef) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storage.ReplicaUpdateOut, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storage.ReplicaVanillaProofs, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storage.ReplicaUpdateProof, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) NewSector(ctx context.Context, sector storage.SectorRef) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
28
extern/sector-storage/testworker_test.go
vendored
28
extern/sector-storage/testworker_test.go
vendored
@ -7,6 +7,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
"github.com/google/uuid"
|
||||
cid "github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/mock"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||
@ -67,6 +68,33 @@ func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pie
|
||||
})
|
||||
}
|
||||
|
||||
func (t *testWorker) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
||||
return t.asyncCall(sector, func(ci storiface.CallID) {
|
||||
out, err := t.mockSeal.ReplicaUpdate(ctx, sector, pieces)
|
||||
if err := t.ret.ReturnReplicaUpdate(ctx, ci, out, toCallError(err)); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (t *testWorker) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) {
|
||||
return t.asyncCall(sector, func(ci storiface.CallID) {
|
||||
vanillaProofs, err := t.mockSeal.ProveReplicaUpdate1(ctx, sector, sectorKey, newSealed, newUnsealed)
|
||||
if err := t.ret.ReturnProveReplicaUpdate1(ctx, ci, vanillaProofs, toCallError(err)); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (t *testWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) {
|
||||
return t.asyncCall(sector, func(ci storiface.CallID) {
|
||||
proof, err := t.mockSeal.ProveReplicaUpdate2(ctx, sector, sectorKey, newSealed, newUnsealed, vanillaProofs)
|
||||
if err := t.ret.ReturnProveReplicaUpdate2(ctx, ci, proof, toCallError(err)); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
||||
return t.asyncCall(sector, func(ci storiface.CallID) {
|
||||
t.pc1s++
|
||||
|
85
extern/sector-storage/worker_local.go
vendored
85
extern/sector-storage/worker_local.go
vendored
@ -28,7 +28,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
)
|
||||
|
||||
var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache}
|
||||
var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache, storiface.FTUpdate, storiface.FTUpdateCache}
|
||||
|
||||
type WorkerConfig struct {
|
||||
TaskTypes []sealtasks.TaskType
|
||||
@ -145,7 +145,6 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector stor
|
||||
}
|
||||
|
||||
sid := storiface.PathByType(storageIDs, fileType)
|
||||
|
||||
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector.ID, fileType, l.op == storiface.AcquireMove); err != nil {
|
||||
log.Errorf("declare sector error: %+v", err)
|
||||
}
|
||||
@ -160,16 +159,19 @@ func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
|
||||
type ReturnType string
|
||||
|
||||
const (
|
||||
AddPiece ReturnType = "AddPiece"
|
||||
SealPreCommit1 ReturnType = "SealPreCommit1"
|
||||
SealPreCommit2 ReturnType = "SealPreCommit2"
|
||||
SealCommit1 ReturnType = "SealCommit1"
|
||||
SealCommit2 ReturnType = "SealCommit2"
|
||||
FinalizeSector ReturnType = "FinalizeSector"
|
||||
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
|
||||
MoveStorage ReturnType = "MoveStorage"
|
||||
UnsealPiece ReturnType = "UnsealPiece"
|
||||
Fetch ReturnType = "Fetch"
|
||||
AddPiece ReturnType = "AddPiece"
|
||||
SealPreCommit1 ReturnType = "SealPreCommit1"
|
||||
SealPreCommit2 ReturnType = "SealPreCommit2"
|
||||
SealCommit1 ReturnType = "SealCommit1"
|
||||
SealCommit2 ReturnType = "SealCommit2"
|
||||
FinalizeSector ReturnType = "FinalizeSector"
|
||||
ReplicaUpdate ReturnType = "ReplicaUpdate"
|
||||
ProveReplicaUpdate1 ReturnType = "ProveReplicaUpdate1"
|
||||
ProveReplicaUpdate2 ReturnType = "ProveReplicaUpdate2"
|
||||
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
|
||||
MoveStorage ReturnType = "MoveStorage"
|
||||
UnsealPiece ReturnType = "UnsealPiece"
|
||||
Fetch ReturnType = "Fetch"
|
||||
)
|
||||
|
||||
// in: func(WorkerReturn, context.Context, CallID, err string)
|
||||
@ -207,16 +209,19 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
|
||||
}
|
||||
|
||||
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{
|
||||
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
|
||||
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
|
||||
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
|
||||
SealCommit1: rfunc(storiface.WorkerReturn.ReturnSealCommit1),
|
||||
SealCommit2: rfunc(storiface.WorkerReturn.ReturnSealCommit2),
|
||||
FinalizeSector: rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
|
||||
ReleaseUnsealed: rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
|
||||
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
|
||||
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
|
||||
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
|
||||
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
|
||||
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
|
||||
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
|
||||
SealCommit1: rfunc(storiface.WorkerReturn.ReturnSealCommit1),
|
||||
SealCommit2: rfunc(storiface.WorkerReturn.ReturnSealCommit2),
|
||||
FinalizeSector: rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
|
||||
ReleaseUnsealed: rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
|
||||
ReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnReplicaUpdate),
|
||||
ProveReplicaUpdate1: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate1),
|
||||
ProveReplicaUpdate2: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate2),
|
||||
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
|
||||
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
|
||||
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
|
||||
}
|
||||
|
||||
func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
|
||||
@ -240,7 +245,6 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r
|
||||
}
|
||||
|
||||
res, err := work(ctx, ci)
|
||||
|
||||
if err != nil {
|
||||
rb, err := json.Marshal(res)
|
||||
if err != nil {
|
||||
@ -258,7 +262,6 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ci, nil
|
||||
}
|
||||
|
||||
@ -382,6 +385,40 @@ func (l *LocalWorker) SealCommit2(ctx context.Context, sector storage.SectorRef,
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, ReplicaUpdate, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
sealerOut, err := sb.ReplicaUpdate(ctx, sector, pieces)
|
||||
return sealerOut, err
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, ProveReplicaUpdate1, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return sb.ProveReplicaUpdate1(ctx, sector, sectorKey, newSealed, newUnsealed)
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, ProveReplicaUpdate2, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return sb.ProveReplicaUpdate2(ctx, sector, sectorKey, newSealed, newUnsealed, vanillaProofs)
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
|
19
extern/sector-storage/worker_tracked.go
vendored
19
extern/sector-storage/worker_tracked.go
vendored
@ -98,7 +98,6 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke
|
||||
wt.lk.Lock()
|
||||
delete(wt.prepared, prepID)
|
||||
}
|
||||
|
||||
callID, err := cb()
|
||||
if err != nil {
|
||||
return callID, err
|
||||
@ -198,4 +197,22 @@ func (t *trackedWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, i
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, id, sealtasks.TTUnseal, func() (storiface.CallID, error) { return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) })
|
||||
}
|
||||
|
||||
func (t *trackedWorker) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTReplicaUpdate, func() (storiface.CallID, error) {
|
||||
return t.Worker.ReplicaUpdate(ctx, sector, pieces)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trackedWorker) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTProveReplicaUpdate1, func() (storiface.CallID, error) {
|
||||
return t.Worker.ProveReplicaUpdate1(ctx, sector, sectorKey, newSealed, newUnsealed)
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trackedWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) {
|
||||
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTProveReplicaUpdate2, func() (storiface.CallID, error) {
|
||||
return t.Worker.ProveReplicaUpdate2(ctx, sector, sectorKey, newSealed, newUnsealed, vanillaProofs)
|
||||
})
|
||||
}
|
||||
|
||||
var _ Worker = &trackedWorker{}
|
||||
|
4
go.mod
4
go.mod
@ -51,7 +51,7 @@ require (
|
||||
github.com/filecoin-project/specs-actors/v5 v5.0.4
|
||||
github.com/filecoin-project/specs-actors/v6 v6.0.1
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211123153428-712cb8da07a3
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.5
|
||||
github.com/gbrlsnchs/jwt/v3 v3.0.1
|
||||
github.com/gdamore/tcell/v2 v2.2.0
|
||||
@ -156,7 +156,7 @@ require (
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
|
||||
golang.org/x/tools v0.1.5
|
||||
golang.org/x/tools v0.1.7
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
|
||||
gopkg.in/cheggaaa/pb.v1 v1.0.28
|
||||
gotest.tools v2.2.0+incompatible
|
||||
|
10
go.sum
10
go.sum
@ -396,8 +396,8 @@ github.com/filecoin-project/specs-actors/v6 v6.0.1/go.mod h1:V1AYfi5GkHXipx1mnVi
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211117170924-fd07a4c7dff9/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec h1:KV9vE+Sl2Y3qKsrpba4HcE7wHwK7v6O5U/S0xHbje6A=
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506 h1:Ur/l2+6qN+lQiqjozWWc5p9UDaAMDZKTlDS98oRnlIw=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211123153428-712cb8da07a3 h1:FLPxD2ksWwGc/sbnFLWep2p8ViP93VCAwFaVxrtVCyo=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211123153428-712cb8da07a3/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
|
||||
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
|
||||
@ -1872,6 +1872,7 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
|
||||
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/zondax/hid v0.9.0 h1:eiT3P6vNxAEVxXMw66eZUAAnU2zD33JBkfG/EnfAKl8=
|
||||
github.com/zondax/hid v0.9.0/go.mod h1:l5wttcP0jwtdLjqjMMWFVEE7d1zO0jvSPA9OPZxWpEM=
|
||||
github.com/zondax/ledger-go v0.12.1 h1:hYRcyznPRJp+5mzF2sazTLP2nGvGjYDD2VzhHhFomLU=
|
||||
@ -2104,6 +2105,7 @@ golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT
|
||||
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
|
||||
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf h1:R150MpwJIv1MpS0N/pc+NhTM8ajzvlmxlY5OYsrevXQ=
|
||||
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
@ -2227,6 +2229,7 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678 h1:J27LZFQBFoihqXoegpscI10HpjZ7B5WQLLKL2FZXQKw=
|
||||
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
@ -2318,8 +2321,9 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
golang.org/x/tools v0.1.1-0.20210225150353-54dc8c5edb56/go.mod h1:9bzcO0MWcOuT0tm1iBGzDVPshzfwoVvREIui8C+MHqU=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
|
||||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.7 h1:6j8CgantCy3yc8JGBqkDLMKWqZ0RDU2g1HVgacojGWQ=
|
||||
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
Loading…
Reference in New Issue
Block a user