RemoveData and Decode
- Unsealing replica update with sector key works and tested - Sector key generation added and tested
This commit is contained in:
parent
5c9a10617b
commit
d1480c36c0
@ -127,6 +127,7 @@ type StorageMiner interface {
|
||||
ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, out storage.ReplicaUpdateOut, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, vanillaProofs storage.ReplicaVanillaProofs, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storage.ReplicaUpdateProof, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
|
||||
|
@ -42,6 +42,7 @@ type Worker interface {
|
||||
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
|
||||
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
|
||||
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin
|
||||
GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) //perm:admin
|
||||
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) //perm:admin
|
||||
MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) //perm:admin
|
||||
UnsealPiece(context.Context, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) //perm:admin
|
||||
|
@ -716,6 +716,8 @@ type StorageMinerStruct struct {
|
||||
|
||||
ReturnFinalizeSector func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnGenerateSectorKeyFromData func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnMoveStorage func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
|
||||
|
||||
ReturnProveReplicaUpdate1 func(p0 context.Context, p1 storiface.CallID, p2 storage.ReplicaVanillaProofs, p3 *storiface.CallError) error `perm:"admin"`
|
||||
@ -859,6 +861,8 @@ type WorkerStruct struct {
|
||||
|
||||
FinalizeSector func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
GenerateSectorKeyFromData func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
|
||||
|
||||
Info func(p0 context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
|
||||
|
||||
MoveStorage func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType) (storiface.CallID, error) `perm:"admin"`
|
||||
@ -4230,6 +4234,17 @@ func (s *StorageMinerStub) ReturnFinalizeSector(p0 context.Context, p1 storiface
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnGenerateSectorKeyFromData(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
|
||||
if s.Internal.ReturnGenerateSectorKeyFromData == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.ReturnGenerateSectorKeyFromData(p0, p1, p2)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) ReturnGenerateSectorKeyFromData(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) ReturnMoveStorage(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
|
||||
if s.Internal.ReturnMoveStorage == nil {
|
||||
return ErrNotSupported
|
||||
@ -4923,6 +4938,17 @@ func (s *WorkerStub) FinalizeSector(p0 context.Context, p1 storage.SectorRef, p2
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) GenerateSectorKeyFromData(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) {
|
||||
if s.Internal.GenerateSectorKeyFromData == nil {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
return s.Internal.GenerateSectorKeyFromData(p0, p1, p2)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) GenerateSectorKeyFromData(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) {
|
||||
return *new(storiface.CallID), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) Info(p0 context.Context) (storiface.WorkerInfo, error) {
|
||||
if s.Internal.Info == nil {
|
||||
return *new(storiface.WorkerInfo), ErrNotSupported
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -94,6 +94,7 @@
|
||||
* [ReturnAddPiece](#ReturnAddPiece)
|
||||
* [ReturnFetch](#ReturnFetch)
|
||||
* [ReturnFinalizeSector](#ReturnFinalizeSector)
|
||||
* [ReturnGenerateSectorKeyFromData](#ReturnGenerateSectorKeyFromData)
|
||||
* [ReturnMoveStorage](#ReturnMoveStorage)
|
||||
* [ReturnProveReplicaUpdate1](#ReturnProveReplicaUpdate1)
|
||||
* [ReturnProveReplicaUpdate2](#ReturnProveReplicaUpdate2)
|
||||
@ -1443,6 +1444,30 @@ Response: `{}`
|
||||
### ReturnFinalizeSector
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
},
|
||||
{
|
||||
"Code": 0,
|
||||
"Message": "string value"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### ReturnGenerateSectorKeyFromData
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
|
@ -11,6 +11,8 @@
|
||||
* [AddPiece](#AddPiece)
|
||||
* [Finalize](#Finalize)
|
||||
* [FinalizeSector](#FinalizeSector)
|
||||
* [Generate](#Generate)
|
||||
* [GenerateSectorKeyFromData](#GenerateSectorKeyFromData)
|
||||
* [Move](#Move)
|
||||
* [MoveStorage](#MoveStorage)
|
||||
* [Process](#Process)
|
||||
@ -797,6 +799,41 @@ Response:
|
||||
}
|
||||
```
|
||||
|
||||
## Generate
|
||||
|
||||
|
||||
### GenerateSectorKeyFromData
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
{
|
||||
"ID": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ProofType": 8
|
||||
},
|
||||
{
|
||||
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
|
||||
}
|
||||
]
|
||||
```
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"Sector": {
|
||||
"Miner": 1000,
|
||||
"Number": 9
|
||||
},
|
||||
"ID": "07070707-0707-0707-0707-070707070707"
|
||||
}
|
||||
```
|
||||
|
||||
## Move
|
||||
|
||||
|
||||
|
2
extern/filecoin-ffi
vendored
2
extern/filecoin-ffi
vendored
@ -1 +1 @@
|
||||
Subproject commit ce7083b3d187ec3bc41a68ab66567bd4f3be6dfc
|
||||
Subproject commit 428503c87d917cc5e3e637983b43b4c260863bf0
|
60
extern/sector-storage/ffiwrapper/sealer_cgo.go
vendored
60
extern/sector-storage/ffiwrapper/sealer_cgo.go
vendored
@ -254,6 +254,23 @@ func (sb *Sealer) pieceCid(spt abi.RegisteredSealProof, in []byte) (cid.Cid, err
|
||||
return pieceCID, werr()
|
||||
}
|
||||
|
||||
func (sb *Sealer) tryDecodeUpdatedReplica(ctx context.Context, sector storage.SectorRef, commD cid.Cid, unsealedPath string) (bool, error) {
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage)
|
||||
if xerrors.Is(err, storiface.ErrSectorNotFound) {
|
||||
return false, nil
|
||||
} else if err != nil {
|
||||
return false, xerrors.Errorf("reading updated replica: %w", err)
|
||||
}
|
||||
defer done()
|
||||
|
||||
// Sector data stored in replica update
|
||||
updateProof, err := sector.ProofType.RegisteredUpdateProof()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, ffi.SectorUpdate.DecodeFrom(updateProof, unsealedPath, paths.Update, paths.Sealed, paths.Cache, commD)
|
||||
}
|
||||
|
||||
func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd cid.Cid) error {
|
||||
ssize, err := sector.ProofType.SectorSize()
|
||||
if err != nil {
|
||||
@ -304,6 +321,16 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector storage.SectorRef, off
|
||||
return nil
|
||||
}
|
||||
|
||||
// If piece data stored in updated replica decode whole sector
|
||||
decoded, err := sb.tryDecodeUpdatedReplica(ctx, sector, commd, unsealedPath.Unsealed)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("decoding sector from replica: %w", err)
|
||||
}
|
||||
if decoded {
|
||||
return pf.MarkAllocated(0, maxPieceSize)
|
||||
}
|
||||
|
||||
// Piece data sealed in sector
|
||||
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathStorage)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("acquire sealed sector paths: %w", err)
|
||||
@ -683,12 +710,6 @@ func (sb *Sealer) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, p
|
||||
return empty, err
|
||||
}
|
||||
}
|
||||
|
||||
// XXX: we want to keep the stuff at the end
|
||||
if err := os.Truncate(paths.Unsealed, sealedSize); err != nil {
|
||||
return empty, xerrors.Errorf("failed to truncate unsealed data file: %w", err)
|
||||
}
|
||||
|
||||
sealed, unsealed, err := ffi.SectorUpdate.EncodeInto(updateProofType, paths.Update, paths.UpdateCache, paths.Sealed, paths.Cache, paths.Unsealed, pieces)
|
||||
if err != nil {
|
||||
return empty, xerrors.Errorf("failed to update replica %d with new deal data: %w", sector.ID.Number, err)
|
||||
@ -718,6 +739,33 @@ func (sb *Sealer) ProveReplicaUpdate2(ctx context.Context, sector storage.Sector
|
||||
return ffi.SectorUpdate.GenerateUpdateProofWithVanilla(updateProofType, sectorKey, newSealed, newUnsealed, vanillaProofs)
|
||||
}
|
||||
|
||||
func (sb *Sealer) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) error {
|
||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTSealed, storiface.PathSealing)
|
||||
defer done()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to acquire sector paths: %w", err)
|
||||
}
|
||||
|
||||
s, err := os.Stat(paths.Update)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("measuring update file size: %w", err)
|
||||
}
|
||||
sealedSize := s.Size()
|
||||
e, err := os.OpenFile(paths.Sealed, os.O_RDWR|os.O_CREATE, 0644) // nolint:gosec
|
||||
if err != nil {
|
||||
return xerrors.Errorf("ensuring sector key file exists: %w", err)
|
||||
}
|
||||
if err := fallocate.Fallocate(e, 0, sealedSize); err != nil {
|
||||
return xerrors.Errorf("allocating space for sector key file: %w", err)
|
||||
}
|
||||
if err := e.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
updateProofType := abi.SealProofInfos[sector.ProofType].UpdateProof
|
||||
return ffi.SectorUpdate.RemoveData(updateProofType, paths.Sealed, paths.Cache, paths.Update, paths.UpdateCache, paths.Unsealed, commD)
|
||||
}
|
||||
|
||||
func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storage.SectorRef) error {
|
||||
return xerrors.Errorf("not supported at this layer")
|
||||
}
|
||||
|
73
extern/sector-storage/manager.go
vendored
73
extern/sector-storage/manager.go
vendored
@ -573,15 +573,72 @@ func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) ReleaseSealed(ctx context.Context, sector storage.SectorRef) error {
|
||||
return nil
|
||||
func (m *Manager) ReleaseSectorKey(ctx context.Context, sector storage.SectorRef) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed); err != nil {
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
return m.storage.Remove(ctx, sector.ID, storiface.FTSealed, true, nil)
|
||||
}
|
||||
|
||||
func (m *Manager) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) error {
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTRegenSectorKey, sector, commD)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getWork: %w", err)
|
||||
}
|
||||
defer cancel()
|
||||
|
||||
var waitErr error
|
||||
waitRes := func() {
|
||||
_, werr := m.waitWork(ctx, wk)
|
||||
if werr != nil {
|
||||
waitErr = werr
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if wait { // already in progress
|
||||
waitRes()
|
||||
return waitErr
|
||||
}
|
||||
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTUnsealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTSealed|storiface.FTCache); err != nil {
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
// NOTE: We set allowFetch to false in so that we always execute on a worker
|
||||
// with direct access to the data. We want to do that because this step is
|
||||
// generally very cheap / fast, and transferring data is not worth the effort
|
||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTUnsealed|storiface.FTUpdate|storiface.FTUpdateCache|storiface.FTCache, true)
|
||||
|
||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTRegenSectorKey, selector, m.schedFetch(sector, storiface.FTUpdate|storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||
err := m.startWork(ctx, w, wk)(w.GenerateSectorKeyFromData(ctx, sector, commD))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
waitRes()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return waitErr
|
||||
}
|
||||
|
||||
func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
|
||||
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
@ -596,6 +653,12 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
|
||||
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
|
||||
}
|
||||
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUpdate, true, nil); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
|
||||
}
|
||||
if rerr := m.storage.Remove(ctx, sector.ID, storiface.FTUpdateCache, true, nil); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
@ -785,6 +848,10 @@ func (m *Manager) ReturnProveReplicaUpdate2(ctx context.Context, callID storifac
|
||||
return m.returnResult(ctx, callID, proof, err)
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, nil, err)
|
||||
}
|
||||
|
||||
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
||||
return m.returnResult(ctx, callID, nil, err)
|
||||
}
|
||||
|
36
extern/sector-storage/manager_test.go
vendored
36
extern/sector-storage/manager_test.go
vendored
@ -199,7 +199,8 @@ func TestSnapDeals(t *testing.T) {
|
||||
|
||||
localTasks := []sealtasks.TaskType{
|
||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, sealtasks.TTCommit2, sealtasks.TTFinalize,
|
||||
sealtasks.TTFetch, sealtasks.TTReplicaUpdate, sealtasks.TTProveReplicaUpdate1, sealtasks.TTProveReplicaUpdate2,
|
||||
sealtasks.TTFetch, sealtasks.TTReplicaUpdate, sealtasks.TTProveReplicaUpdate1, sealtasks.TTProveReplicaUpdate2, sealtasks.TTUnseal,
|
||||
sealtasks.TTRegenSectorKey,
|
||||
}
|
||||
wds := datastore.NewMapDatastore()
|
||||
|
||||
@ -245,14 +246,6 @@ func TestSnapDeals(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("PC2\n")
|
||||
pc2Out, err := m.SealPreCommit2(ctx, sid, pc1Out)
|
||||
|
||||
require.NoError(t, err)
|
||||
seed := abi.InteractiveSealRandomness{1, 1, 1, 1, 1, 1, 1}
|
||||
fmt.Printf("C1\n")
|
||||
c1Out, err := m.SealCommit1(ctx, sid, ticket, seed, nil, pc2Out)
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("C2\n")
|
||||
_, err = m.SealCommit2(ctx, sid, c1Out)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Now do a snap deals replica update
|
||||
@ -270,19 +263,26 @@ func TestSnapDeals(t *testing.T) {
|
||||
|
||||
pieces := []abi.PieceInfo{p1, p2}
|
||||
fmt.Printf("RU\n")
|
||||
startRU := time.Now()
|
||||
out, err := m.ReplicaUpdate(ctx, sid, pieces)
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("RU duration (%s): %s\n", ss.ShortString(), time.Since(startRU))
|
||||
|
||||
updateProofType, err := sid.ProofType.RegisteredUpdateProof()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, out)
|
||||
fmt.Printf("PR1\n")
|
||||
startPR1 := time.Now()
|
||||
vanillaProofs, err := m.ProveReplicaUpdate1(ctx, sid, sectorKey, out.NewSealed, out.NewUnsealed)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, vanillaProofs)
|
||||
fmt.Printf("PR1 duration (%s): %s\n", ss.ShortString(), time.Since(startPR1))
|
||||
fmt.Printf("PR2\n")
|
||||
startPR2 := time.Now()
|
||||
proof, err := m.ProveReplicaUpdate2(ctx, sid, sectorKey, out.NewSealed, out.NewUnsealed, vanillaProofs)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, proof)
|
||||
fmt.Printf("PR2 duration (%s): %s\n", ss.ShortString(), time.Since(startPR2))
|
||||
|
||||
vInfo := proof7.ReplicaUpdateInfo{
|
||||
Proof: proof,
|
||||
@ -294,6 +294,24 @@ func TestSnapDeals(t *testing.T) {
|
||||
pass, err := ffiwrapper.ProofVerifier.VerifyReplicaUpdate(vInfo)
|
||||
require.NoError(t, err)
|
||||
assert.True(t, pass)
|
||||
|
||||
fmt.Printf("Decode\n")
|
||||
// Remove unsealed data and decode for retrieval
|
||||
require.NoError(t, m.FinalizeSector(ctx, sid, nil))
|
||||
startDecode := time.Now()
|
||||
require.NoError(t, m.SectorsUnsealPiece(ctx, sid, 0, p1.Size.Unpadded(), ticket, &out.NewUnsealed))
|
||||
fmt.Printf("Decode duration (%s): %s\n", ss.ShortString(), time.Since(startDecode))
|
||||
|
||||
// Remove just the first piece and decode for retrieval
|
||||
require.NoError(t, m.FinalizeSector(ctx, sid, []storage.Range{{Offset: p1.Size.Unpadded(), Size: p2.Size.Unpadded()}}))
|
||||
require.NoError(t, m.SectorsUnsealPiece(ctx, sid, 0, p1.Size.Unpadded(), ticket, &out.NewUnsealed))
|
||||
|
||||
fmt.Printf("GSK\n")
|
||||
require.NoError(t, m.ReleaseSectorKey(ctx, sid))
|
||||
startGSK := time.Now()
|
||||
require.NoError(t, m.GenerateSectorKeyFromData(ctx, sid, out.NewUnsealed))
|
||||
fmt.Printf("GSK duration (%s): %s\n", ss.ShortString(), time.Since(startGSK))
|
||||
|
||||
}
|
||||
|
||||
func TestRedoPC1(t *testing.T) {
|
||||
|
8
extern/sector-storage/mock/mock.go
vendored
8
extern/sector-storage/mock/mock.go
vendored
@ -278,6 +278,10 @@ func (mgr *SectorMgr) ProveReplicaUpdate2(ctx context.Context, sector storage.Se
|
||||
return make([]byte, 0), nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReleaseSealed(ctx context.Context, sid storage.SectorRef) error {
|
||||
return nil
|
||||
}
|
||||
@ -545,6 +549,10 @@ func (mgr *SectorMgr) ReturnProveReplicaUpdate2(ctx context.Context, callID stor
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (mgr *SectorMgr) ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
|
||||
panic("not supported")
|
||||
}
|
||||
|
||||
func (m mockVerifProver) VerifySeal(svi proof5.SealVerifyInfo) (bool, error) {
|
||||
plen, err := svi.SealProof.ProofSize()
|
||||
if err != nil {
|
||||
|
4
extern/sector-storage/sched_test.go
vendored
4
extern/sector-storage/sched_test.go
vendored
@ -114,6 +114,10 @@ func (s *schedTestWorker) ProveReplicaUpdate2(ctx context.Context, sector storag
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
5
extern/sector-storage/sealtasks/task.go
vendored
5
extern/sector-storage/sealtasks/task.go
vendored
@ -17,10 +17,12 @@ const (
|
||||
TTReplicaUpdate TaskType = "seal/v0/replicaupdate"
|
||||
TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1"
|
||||
TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2"
|
||||
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
|
||||
)
|
||||
|
||||
var order = map[TaskType]int{
|
||||
TTAddPiece: 9, // least priority
|
||||
TTRegenSectorKey: 10, // least priority
|
||||
TTAddPiece: 9,
|
||||
TTReplicaUpdate: 8,
|
||||
TTProveReplicaUpdate2: 7,
|
||||
TTProveReplicaUpdate1: 6,
|
||||
@ -49,6 +51,7 @@ var shortNames = map[TaskType]string{
|
||||
TTReplicaUpdate: "RU",
|
||||
TTProveReplicaUpdate1: "PR1",
|
||||
TTProveReplicaUpdate2: "PR2",
|
||||
TTRegenSectorKey: "GSK",
|
||||
}
|
||||
|
||||
func (a TaskType) MuchLess(b TaskType) (bool, bool) {
|
||||
|
2
extern/sector-storage/storiface/worker.go
vendored
2
extern/sector-storage/storiface/worker.go
vendored
@ -124,6 +124,7 @@ type WorkerCalls interface {
|
||||
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (CallID, error)
|
||||
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
|
||||
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (CallID, error)
|
||||
GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (CallID, error)
|
||||
MoveStorage(ctx context.Context, sector storage.SectorRef, types SectorFileType) (CallID, error)
|
||||
UnsealPiece(context.Context, storage.SectorRef, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
|
||||
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
|
||||
@ -180,6 +181,7 @@ type WorkerReturn interface {
|
||||
ReturnReplicaUpdate(ctx context.Context, callID CallID, out storage.ReplicaUpdateOut, err *CallError) error
|
||||
ReturnProveReplicaUpdate1(ctx context.Context, callID CallID, proofs storage.ReplicaVanillaProofs, err *CallError) error
|
||||
ReturnProveReplicaUpdate2(ctx context.Context, callID CallID, proof storage.ReplicaUpdateProof, err *CallError) error
|
||||
ReturnGenerateSectorKeyFromData(ctx context.Context, callID CallID, err *CallError) error
|
||||
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
|
||||
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
|
||||
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
|
||||
|
4
extern/sector-storage/teststorage_test.go
vendored
4
extern/sector-storage/teststorage_test.go
vendored
@ -75,6 +75,10 @@ func (t *testExec) ProveReplicaUpdate2(ctx context.Context, sector storage.Secto
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testExec) NewSector(ctx context.Context, sector storage.SectorRef) error {
|
||||
panic("implement me")
|
||||
}
|
||||
|
13
extern/sector-storage/worker_local.go
vendored
13
extern/sector-storage/worker_local.go
vendored
@ -171,6 +171,7 @@ const (
|
||||
ReplicaUpdate ReturnType = "ReplicaUpdate"
|
||||
ProveReplicaUpdate1 ReturnType = "ProveReplicaUpdate1"
|
||||
ProveReplicaUpdate2 ReturnType = "ProveReplicaUpdate2"
|
||||
GenerateSectorKey ReturnType = "GenerateSectorKey"
|
||||
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
|
||||
MoveStorage ReturnType = "MoveStorage"
|
||||
UnsealPiece ReturnType = "UnsealPiece"
|
||||
@ -222,6 +223,7 @@ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storifac
|
||||
ReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnReplicaUpdate),
|
||||
ProveReplicaUpdate1: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate1),
|
||||
ProveReplicaUpdate2: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate2),
|
||||
GenerateSectorKey: rfunc(storiface.WorkerReturn.ReturnGenerateSectorKeyFromData),
|
||||
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
|
||||
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
|
||||
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
|
||||
@ -422,6 +424,17 @@ func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.Se
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
return storiface.UndefCall, err
|
||||
}
|
||||
|
||||
return l.asyncCall(ctx, sector, GenerateSectorKey, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
|
||||
return nil, sb.GenerateSectorKeyFromData(ctx, sector, commD)
|
||||
})
|
||||
}
|
||||
|
||||
func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
|
||||
sb, err := l.executor()
|
||||
if err != nil {
|
||||
|
2
go.mod
2
go.mod
@ -52,7 +52,7 @@ require (
|
||||
github.com/filecoin-project/specs-actors/v5 v5.0.4
|
||||
github.com/filecoin-project/specs-actors/v6 v6.0.1
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211123153428-712cb8da07a3
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211202151826-2e51da61d454
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.5
|
||||
github.com/gbrlsnchs/jwt/v3 v3.0.1
|
||||
github.com/gdamore/tcell/v2 v2.2.0
|
||||
|
4
go.sum
4
go.sum
@ -397,8 +397,8 @@ github.com/filecoin-project/specs-actors/v6 v6.0.1/go.mod h1:V1AYfi5GkHXipx1mnVi
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211117170924-fd07a4c7dff9/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec h1:KV9vE+Sl2Y3qKsrpba4HcE7wHwK7v6O5U/S0xHbje6A=
|
||||
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211118013026-3dce48197cec/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211123153428-712cb8da07a3 h1:FLPxD2ksWwGc/sbnFLWep2p8ViP93VCAwFaVxrtVCyo=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211123153428-712cb8da07a3/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211202151826-2e51da61d454 h1:9II9Xf+jq5xAPQiS4rVoKIiALINa3loMC+ghyFYIrqQ=
|
||||
github.com/filecoin-project/specs-storage v0.1.1-0.20211202151826-2e51da61d454/go.mod h1:nJRRM7Aa9XVvygr3W9k6xGF46RWzr2zxF/iGoAIfA/g=
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=
|
||||
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
|
||||
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
|
||||
|
Loading…
Reference in New Issue
Block a user