Merge pull request #8018 from filecoin-project/feat/fru

FinalizeReplicaUpdate
This commit is contained in:
Łukasz Magiera 2022-02-09 17:40:47 +00:00 committed by GitHub
commit e7038d286d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 454 additions and 61 deletions

View File

@ -130,6 +130,7 @@ type StorageMiner interface {
ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, vanillaProofs storage.ReplicaVanillaProofs, err *storiface.CallError) error //perm:admin retry:true
ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storage.ReplicaUpdateProof, err *storiface.CallError) error //perm:admin retry:true
ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true
ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error //perm:admin retry:true

View File

@ -39,6 +39,7 @@ type Worker interface {
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) //perm:admin
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) //perm:admin
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) //perm:admin
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) //perm:admin
ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) //perm:admin

View File

@ -723,6 +723,8 @@ type StorageMinerStruct struct {
ReturnFetch func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFinalizeReplicaUpdate func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnFinalizeSector func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
ReturnGenerateSectorKeyFromData func(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error `perm:"admin"`
@ -872,6 +874,8 @@ type WorkerStruct struct {
Fetch func(p0 context.Context, p1 storage.SectorRef, p2 storiface.SectorFileType, p3 storiface.PathType, p4 storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
FinalizeReplicaUpdate func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"`
FinalizeSector func(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) `perm:"admin"`
GenerateSectorKeyFromData func(p0 context.Context, p1 storage.SectorRef, p2 cid.Cid) (storiface.CallID, error) `perm:"admin"`
@ -4280,6 +4284,17 @@ func (s *StorageMinerStub) ReturnFetch(p0 context.Context, p1 storiface.CallID,
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnFinalizeReplicaUpdate(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnFinalizeReplicaUpdate == nil {
return ErrNotSupported
}
return s.Internal.ReturnFinalizeReplicaUpdate(p0, p1, p2)
}
func (s *StorageMinerStub) ReturnFinalizeReplicaUpdate(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
return ErrNotSupported
}
func (s *StorageMinerStruct) ReturnFinalizeSector(p0 context.Context, p1 storiface.CallID, p2 *storiface.CallError) error {
if s.Internal.ReturnFinalizeSector == nil {
return ErrNotSupported
@ -5006,6 +5021,17 @@ func (s *WorkerStub) Fetch(p0 context.Context, p1 storage.SectorRef, p2 storifac
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) FinalizeReplicaUpdate(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) {
if s.Internal.FinalizeReplicaUpdate == nil {
return *new(storiface.CallID), ErrNotSupported
}
return s.Internal.FinalizeReplicaUpdate(p0, p1, p2)
}
func (s *WorkerStub) FinalizeReplicaUpdate(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) {
return *new(storiface.CallID), ErrNotSupported
}
func (s *WorkerStruct) FinalizeSector(p0 context.Context, p1 storage.SectorRef, p2 []storage.Range) (storiface.CallID, error) {
if s.Internal.FinalizeSector == nil {
return *new(storiface.CallID), ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{}
var stateList = []stateMeta{
{col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving},
{col: color.FgGreen, state: sealing.UpdateActivating},
{col: color.FgBlue, state: sealing.Empty},
{col: color.FgBlue, state: sealing.WaitDeals},
@ -496,6 +497,7 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: sealing.SubmitReplicaUpdate},
{col: color.FgYellow, state: sealing.ReplicaUpdateWait},
{col: color.FgYellow, state: sealing.FinalizeReplicaUpdate},
{col: color.FgYellow, state: sealing.ReleaseSectorKey},
{col: color.FgCyan, state: sealing.Terminating},
{col: color.FgCyan, state: sealing.TerminateWait},
@ -524,6 +526,7 @@ var stateList = []stateMeta{
{col: color.FgRed, state: sealing.SnapDealsAddPieceFailed},
{col: color.FgRed, state: sealing.SnapDealsDealsExpired},
{col: color.FgRed, state: sealing.ReplicaUpdateFailed},
{col: color.FgRed, state: sealing.ReleaseSectorKeyFailed},
}
func init() {

View File

@ -261,7 +261,7 @@ var runCmd = &cli.Command{
var taskTypes []sealtasks.TaskType
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize)
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFinalizeReplicaUpdate)
if cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)

View File

@ -97,6 +97,7 @@
* [Return](#Return)
* [ReturnAddPiece](#ReturnAddPiece)
* [ReturnFetch](#ReturnFetch)
* [ReturnFinalizeReplicaUpdate](#ReturnFinalizeReplicaUpdate)
* [ReturnFinalizeSector](#ReturnFinalizeSector)
* [ReturnGenerateSectorKeyFromData](#ReturnGenerateSectorKeyFromData)
* [ReturnMoveStorage](#ReturnMoveStorage)
@ -2054,6 +2055,30 @@ Response: `{}`
### ReturnFetch
Perms: admin
Inputs:
```json
[
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
},
{
"Code": 0,
"Message": "string value"
}
]
```
Response: `{}`
### ReturnFinalizeReplicaUpdate
Perms: admin
Inputs:

View File

@ -10,6 +10,7 @@
* [Add](#Add)
* [AddPiece](#AddPiece)
* [Finalize](#Finalize)
* [FinalizeReplicaUpdate](#FinalizeReplicaUpdate)
* [FinalizeSector](#FinalizeSector)
* [Generate](#Generate)
* [GenerateSectorKeyFromData](#GenerateSectorKeyFromData)
@ -1112,6 +1113,41 @@ Response:
## Finalize
### FinalizeReplicaUpdate
Perms: admin
Inputs:
```json
[
{
"ID": {
"Miner": 1000,
"Number": 9
},
"ProofType": 8
},
[
{
"Offset": 1024,
"Size": 1024
}
]
]
```
Response:
```json
{
"Sector": {
"Miner": 1000,
"Number": 9
},
"ID": "07070707-0707-0707-0707-070707070707"
}
```
### FinalizeSector

2
extern/filecoin-ffi vendored

@ -1 +1 @@
Subproject commit c03fc07aadf76b464644c785f59292b93542c0ee
Subproject commit 5ec5d805c01ea85224f6448dd6c6fa0a2a73c028

View File

@ -55,6 +55,25 @@ func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof,
return nil
}
// temporary hack to make the check work with snapdeals
// will go away in https://github.com/filecoin-project/lotus/pull/7971
if lp.Sealed == "" || lp.Cache == "" {
// maybe it's update
lockedUpdate, err := m.index.StorageTryLock(ctx, sector.ID, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone)
if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
if lockedUpdate {
lp, _, err = m.localStore.AcquireSector(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad[sector.ID] = fmt.Sprintf("acquire sector failed: %s", err)
return nil
}
lp.Sealed, lp.Cache = lp.Update, lp.UpdateCache
}
}
if lp.Sealed == "" || lp.Cache == "" {
log.Warnw("CheckProvable Sector FAULT: cache and/or sealed paths not found", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache)
bad[sector.ID] = fmt.Sprintf("cache and/or sealed paths not found, cache %q, sealed %q", lp.Cache, lp.Sealed)

View File

@ -769,7 +769,7 @@ func (sb *Sealer) ReleaseSealed(ctx context.Context, sector storage.SectorRef) e
return xerrors.Errorf("not supported at this layer")
}
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
func (sb *Sealer) freeUnsealed(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
@ -834,6 +834,19 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
return nil
}
func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
return err
}
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
@ -843,6 +856,43 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return ffi.ClearCache(uint64(ssize), paths.Cache)
}
func (sb *Sealer) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return err
}
if err := sb.freeUnsealed(ctx, sector, keepUnsealed); err != nil {
return err
}
{
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
if err := ffi.ClearCache(uint64(ssize), paths.Cache); err != nil {
return xerrors.Errorf("clear cache: %w", err)
}
}
{
paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUpdateCache, 0, storiface.PathStorage)
if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err)
}
defer done()
if err := ffi.ClearCache(uint64(ssize), paths.UpdateCache); err != nil {
return xerrors.Errorf("clear cache: %w", err)
}
}
return nil
}
func (sb *Sealer) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
// This call is meant to mark storage as 'freeable'. Given that unsealing is
// very expensive, we don't remove data as soon as we can - instead we only

View File

@ -146,7 +146,7 @@ func New(ctx context.Context, lstor *stores.Local, stor *stores.Remote, ls store
go m.sched.runSched()
localTasks := []sealtasks.TaskType{
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch,
sealtasks.TTCommit1, sealtasks.TTProveReplicaUpdate1, sealtasks.TTFinalize, sealtasks.TTFetch, sealtasks.TTFinalizeReplicaUpdate,
}
if sc.AllowAddPiece {
localTasks = append(localTasks, sealtasks.TTAddPiece)
@ -577,6 +577,74 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return nil
}
func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
if err := m.index.StorageLock(ctx, sector.ID, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache|storiface.FTUpdate|storiface.FTUpdateCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err)
}
fts := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
if err != nil {
return xerrors.Errorf("finding unsealed sector: %w", err)
}
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
fts = storiface.FTNone
}
}
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
if err != nil {
return xerrors.Errorf("finding sealed sector: %w", err)
}
for _, store := range sealedStores {
if store.CanSeal {
pathType = storiface.PathSealing
break
}
}
}
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|fts, pathType, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed))
return err
})
if err != nil {
return err
}
fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathStorage)
moveUnsealed := fts
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone
}
}
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed))
return err
})
if err != nil {
return xerrors.Errorf("moving sector to storage: %w", err)
}
return nil
}
func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
return nil
}
@ -875,6 +943,10 @@ func (m *Manager) ReturnProveReplicaUpdate2(ctx context.Context, callID storifac
return m.returnResult(ctx, callID, proof, err)
}
func (m *Manager) ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}
func (m *Manager) ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(ctx, callID, nil, err)
}

View File

@ -477,6 +477,10 @@ func (mgr *SectorMgr) FinalizeSector(context.Context, storage.SectorRef, []stora
return nil
}
func (mgr *SectorMgr) FinalizeReplicaUpdate(context.Context, storage.SectorRef, []storage.Range) error {
return nil
}
func (mgr *SectorMgr) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error {
return nil
}
@ -577,6 +581,10 @@ func (mgr *SectorMgr) ReturnGenerateSectorKeyFromData(ctx context.Context, callI
panic("not supported")
}
func (mgr *SectorMgr) ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}
func (m mockVerifProver) VerifySeal(svi proof.SealVerifyInfo) (bool, error) {
plen, err := svi.SealProof.ProofSize()
if err != nil {

View File

@ -119,6 +119,10 @@ func (s *schedTestWorker) GenerateSectorKeyFromData(ctx context.Context, sector
panic("implement me")
}
func (s *schedTestWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
panic("implement me")
}

View File

@ -14,10 +14,11 @@ const (
TTFetch TaskType = "seal/v0/fetch"
TTUnseal TaskType = "seal/v0/unseal"
TTReplicaUpdate TaskType = "seal/v0/replicaupdate"
TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1"
TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2"
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
TTReplicaUpdate TaskType = "seal/v0/replicaupdate"
TTProveReplicaUpdate1 TaskType = "seal/v0/provereplicaupdate/1"
TTProveReplicaUpdate2 TaskType = "seal/v0/provereplicaupdate/2"
TTRegenSectorKey TaskType = "seal/v0/regensectorkey"
TTFinalizeReplicaUpdate TaskType = "seal/v0/finalize/replicaupdate"
)
var order = map[TaskType]int{
@ -48,10 +49,11 @@ var shortNames = map[TaskType]string{
TTFetch: "GET",
TTUnseal: "UNS",
TTReplicaUpdate: "RU",
TTProveReplicaUpdate1: "PR1",
TTProveReplicaUpdate2: "PR2",
TTRegenSectorKey: "GSK",
TTReplicaUpdate: "RU",
TTProveReplicaUpdate1: "PR1",
TTProveReplicaUpdate2: "PR2",
TTRegenSectorKey: "GSK",
TTFinalizeReplicaUpdate: "FRU",
}
func (a TaskType) MuchLess(b TaskType) (bool, bool) {

View File

@ -120,6 +120,7 @@ type WorkerCalls interface {
SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (CallID, error)
SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (CallID, error)
FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (CallID, error)
ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (CallID, error)
ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (CallID, error)
ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (CallID, error)
@ -182,6 +183,7 @@ type WorkerReturn interface {
ReturnProveReplicaUpdate1(ctx context.Context, callID CallID, proofs storage.ReplicaVanillaProofs, err *CallError) error
ReturnProveReplicaUpdate2(ctx context.Context, callID CallID, proof storage.ReplicaUpdateProof, err *CallError) error
ReturnGenerateSectorKeyFromData(ctx context.Context, callID CallID, err *CallError) error
ReturnFinalizeReplicaUpdate(ctx context.Context, callID CallID, err *CallError) error
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error

View File

@ -87,6 +87,10 @@ func (t *testExec) GenerateSectorKeyFromData(ctx context.Context, sector storage
panic("implement me")
}
func (t *testExec) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error {
panic("implement me")
}
func (t *testExec) NewSector(ctx context.Context, sector storage.SectorRef) error {
panic("implement me")
}

View File

@ -162,20 +162,21 @@ func (l *LocalWorker) ffiExec() (ffiwrapper.Storage, error) {
type ReturnType string
const (
AddPiece ReturnType = "AddPiece"
SealPreCommit1 ReturnType = "SealPreCommit1"
SealPreCommit2 ReturnType = "SealPreCommit2"
SealCommit1 ReturnType = "SealCommit1"
SealCommit2 ReturnType = "SealCommit2"
FinalizeSector ReturnType = "FinalizeSector"
ReplicaUpdate ReturnType = "ReplicaUpdate"
ProveReplicaUpdate1 ReturnType = "ProveReplicaUpdate1"
ProveReplicaUpdate2 ReturnType = "ProveReplicaUpdate2"
GenerateSectorKey ReturnType = "GenerateSectorKey"
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
MoveStorage ReturnType = "MoveStorage"
UnsealPiece ReturnType = "UnsealPiece"
Fetch ReturnType = "Fetch"
AddPiece ReturnType = "AddPiece"
SealPreCommit1 ReturnType = "SealPreCommit1"
SealPreCommit2 ReturnType = "SealPreCommit2"
SealCommit1 ReturnType = "SealCommit1"
SealCommit2 ReturnType = "SealCommit2"
FinalizeSector ReturnType = "FinalizeSector"
FinalizeReplicaUpdate ReturnType = "FinalizeReplicaUpdate"
ReplicaUpdate ReturnType = "ReplicaUpdate"
ProveReplicaUpdate1 ReturnType = "ProveReplicaUpdate1"
ProveReplicaUpdate2 ReturnType = "ProveReplicaUpdate2"
GenerateSectorKey ReturnType = "GenerateSectorKey"
ReleaseUnsealed ReturnType = "ReleaseUnsealed"
MoveStorage ReturnType = "MoveStorage"
UnsealPiece ReturnType = "UnsealPiece"
Fetch ReturnType = "Fetch"
)
// in: func(WorkerReturn, context.Context, CallID, err string)
@ -213,20 +214,21 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
}
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
SealCommit1: rfunc(storiface.WorkerReturn.ReturnSealCommit1),
SealCommit2: rfunc(storiface.WorkerReturn.ReturnSealCommit2),
FinalizeSector: rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
ReleaseUnsealed: rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
ReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnReplicaUpdate),
ProveReplicaUpdate1: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate1),
ProveReplicaUpdate2: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate2),
GenerateSectorKey: rfunc(storiface.WorkerReturn.ReturnGenerateSectorKeyFromData),
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
SealCommit1: rfunc(storiface.WorkerReturn.ReturnSealCommit1),
SealCommit2: rfunc(storiface.WorkerReturn.ReturnSealCommit2),
FinalizeSector: rfunc(storiface.WorkerReturn.ReturnFinalizeSector),
ReleaseUnsealed: rfunc(storiface.WorkerReturn.ReturnReleaseUnsealed),
ReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnReplicaUpdate),
ProveReplicaUpdate1: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate1),
ProveReplicaUpdate2: rfunc(storiface.WorkerReturn.ReturnProveReplicaUpdate2),
GenerateSectorKey: rfunc(storiface.WorkerReturn.ReturnGenerateSectorKeyFromData),
FinalizeReplicaUpdate: rfunc(storiface.WorkerReturn.ReturnFinalizeReplicaUpdate),
MoveStorage: rfunc(storiface.WorkerReturn.ReturnMoveStorage),
UnsealPiece: rfunc(storiface.WorkerReturn.ReturnUnsealPiece),
Fetch: rfunc(storiface.WorkerReturn.ReturnFetch),
}
func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, rt ReturnType, work func(ctx context.Context, ci storiface.CallID) (interface{}, error)) (storiface.CallID, error) {
@ -456,6 +458,27 @@ func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorR
})
}
func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
sb, err := l.executor()
if err != nil {
return storiface.UndefCall, err
}
return l.asyncCall(ctx, sector, FinalizeReplicaUpdate, func(ctx context.Context, ci storiface.CallID) (interface{}, error) {
if err := sb.FinalizeReplicaUpdate(ctx, sector, keepUnsealed); err != nil {
return nil, xerrors.Errorf("finalizing sector: %w", err)
}
if len(keepUnsealed) == 0 {
if err := l.storage.Remove(ctx, sector.ID, storiface.FTUnsealed, true, nil); err != nil {
return nil, xerrors.Errorf("removing unsealed data: %w", err)
}
}
return nil, err
})
}
func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) {
return storiface.UndefCall, xerrors.Errorf("implement me")
}

View File

@ -215,4 +215,8 @@ func (t *trackedWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.
})
}
func (t *trackedWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
return t.tracker.track(ctx, t.execute, t.wid, t.workerInfo, sector, sealtasks.TTFinalizeReplicaUpdate, func() (storiface.CallID, error) { return t.Worker.FinalizeReplicaUpdate(ctx, sector, keepUnsealed) })
}
var _ Worker = &trackedWorker{}

View File

@ -169,7 +169,14 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorAbortUpgrade{}, AbortUpgrade),
),
FinalizeReplicaUpdate: planOne(
on(SectorFinalized{}, Proving),
on(SectorFinalized{}, UpdateActivating),
),
UpdateActivating: planOne(
on(SectorUpdateActive{}, ReleaseSectorKey),
),
ReleaseSectorKey: planOne(
on(SectorKeyReleased{}, Proving),
on(SectorReleaseKeyFailed{}, ReleaseSectorKeyFailed),
),
// Sealing errors
@ -250,6 +257,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs),
on(SectorDealsExpired{}, SnapDealsDealsExpired),
),
ReleaseSectorKeyFailed: planOne(
on(SectorUpdateActive{}, ReleaseSectorKey),
),
// Post-seal
@ -477,6 +487,10 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleReplicaUpdateWait, processed, nil
case FinalizeReplicaUpdate:
return m.handleFinalizeReplicaUpdate, processed, nil
case UpdateActivating:
return m.handleUpdateActivating, processed, nil
case ReleaseSectorKey:
return m.handleReleaseSectorKey, processed, nil
// Handled failure modes
case AddPieceFailed:
@ -513,6 +527,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
return m.handleSnapDealsRecoverDealIDs, processed, nil
case ReplicaUpdateFailed:
return m.handleSubmitReplicaUpdateFailed, processed, nil
case ReleaseSectorKeyFailed:
return m.handleReleaseSectorKeyFailed, 0, err
case AbortUpgrade:
return m.handleAbortUpgrade, processed, nil

View File

@ -335,6 +335,14 @@ type SectorReplicaUpdateLanded struct{}
func (evt SectorReplicaUpdateLanded) apply(state *SectorInfo) {}
type SectorUpdateActive struct{}
func (evt SectorUpdateActive) apply(state *SectorInfo) {}
type SectorKeyReleased struct{}
func (evt SectorKeyReleased) apply(state *SectorInfo) {}
// Failed state recovery
type SectorRetrySealPreCommit1 struct{}
@ -445,6 +453,13 @@ type SectorSubmitReplicaUpdateFailed struct{}
func (evt SectorSubmitReplicaUpdateFailed) apply(state *SectorInfo) {}
type SectorReleaseKeyFailed struct{ error }
func (evt SectorReleaseKeyFailed) FormatError(xerrors.Printer) (next error) {
return evt.error
}
func (evt SectorReleaseKeyFailed) apply(state *SectorInfo) {}
// Faults
type SectorFaulty struct{}

View File

@ -52,11 +52,14 @@ var ExistSectorStateList = map[SectorState]struct{}{
ProveReplicaUpdate: {},
SubmitReplicaUpdate: {},
ReplicaUpdateWait: {},
UpdateActivating: {},
ReleaseSectorKey: {},
FinalizeReplicaUpdate: {},
SnapDealsAddPieceFailed: {},
SnapDealsDealsExpired: {},
SnapDealsRecoverDealIDs: {},
ReplicaUpdateFailed: {},
ReleaseSectorKeyFailed: {},
AbortUpgrade: {},
}
@ -104,6 +107,8 @@ const (
SubmitReplicaUpdate SectorState = "SubmitReplicaUpdate"
ReplicaUpdateWait SectorState = "ReplicaUpdateWait"
FinalizeReplicaUpdate SectorState = "FinalizeReplicaUpdate"
UpdateActivating SectorState = "UpdateActivating"
ReleaseSectorKey SectorState = "ReleaseSectorKey"
// error modes
FailedUnrecoverable SectorState = "FailedUnrecoverable"
@ -124,6 +129,7 @@ const (
SnapDealsRecoverDealIDs SectorState = "SnapDealsRecoverDealIDs"
AbortUpgrade SectorState = "AbortUpgrade"
ReplicaUpdateFailed SectorState = "ReplicaUpdateFailed"
ReleaseSectorKeyFailed SectorState = "ReleaseSectorKeyFailed"
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
@ -153,7 +159,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
return sstProving
}
return sstSealing
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
case Proving, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving
}

View File

@ -255,6 +255,16 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
return ctx.Send(SectorRetrySubmitReplicaUpdate{})
}
func (m *Sealing) handleReleaseSectorKeyFailed(ctx statemachine.Context, sector SectorInfo) error {
// not much we can do, wait for a bit and try again
if err := failedCooldown(ctx, sector); err != nil {
return err
}
return ctx.Send(SectorUpdateActive{})
}
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
tok, _, err := m.Api.ChainHead(ctx.Context())
if err != nil {

View File

@ -2,12 +2,16 @@ package sealing
import (
"bytes"
"context"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/exitcode"
statemachine "github.com/filecoin-project/go-statemachine"
api "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/actors/policy"
"golang.org/x/xerrors"
)
@ -218,9 +222,70 @@ func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector Secto
}
func (m *Sealing) handleFinalizeReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting sealing config: %w", err)
}
if err := m.sealer.FinalizeReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.keepUnsealedRanges(false, cfg.AlwaysKeepUnsealedCopy)); err != nil {
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
}
return ctx.Send(SectorFinalized{})
}
func (m *Sealing) handleUpdateActivating(ctx statemachine.Context, sector SectorInfo) error {
try := func() error {
mw, err := m.Api.StateWaitMsg(ctx.Context(), *sector.ReplicaUpdateMessage)
if err != nil {
return err
}
tok, _, err := m.Api.ChainHead(ctx.Context())
if err != nil {
return err
}
nv, err := m.Api.StateNetworkVersion(ctx.Context(), tok)
if err != nil {
return err
}
lb := policy.GetWinningPoStSectorSetLookback(nv)
targetHeight := mw.Height + lb + InteractivePoRepConfidence
return m.events.ChainAt(func(context.Context, TipSetToken, abi.ChainEpoch) error {
return ctx.Send(SectorUpdateActive{})
}, func(ctx context.Context, ts TipSetToken) error {
log.Warn("revert in handleUpdateActivating")
return nil
}, InteractivePoRepConfidence, targetHeight)
}
for {
err := try()
if err == nil {
break
}
log.Errorw("error in handleUpdateActivating", "error", err)
// likely an API issue, sleep for a bit and retry
time.Sleep(time.Minute)
}
return nil
}
func (m *Sealing) handleReleaseSectorKey(ctx statemachine.Context, sector SectorInfo) error {
if err := m.sealer.ReleaseSectorKey(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
return ctx.Send(SectorReleaseKeyFailed{err})
}
return ctx.Send(SectorKeyReleased{})
}
func handleErrors(ctx statemachine.Context, err error, sector SectorInfo) error {
switch err.(type) {
case *ErrApi:

2
go.mod
View File

@ -51,7 +51,7 @@ require (
github.com/filecoin-project/specs-actors/v5 v5.0.4
github.com/filecoin-project/specs-actors/v6 v6.0.1
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9
github.com/filecoin-project/specs-storage v0.2.0
github.com/filecoin-project/test-vectors/schema v0.0.5
github.com/gbrlsnchs/jwt/v3 v3.0.1
github.com/gdamore/tcell/v2 v2.2.0

4
go.sum
View File

@ -382,8 +382,8 @@ github.com/filecoin-project/specs-actors/v7 v7.0.0-20211117170924-fd07a4c7dff9/g
github.com/filecoin-project/specs-actors/v7 v7.0.0-20211222192039-c83bea50c402/go.mod h1:p6LIOFezA1rgRLMewbvdi3Pp6SAu+q9FtJ9CAleSjrE=
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1 h1:FuDaXIbcw2hRsFI8SDTmsGGCE+NumpF6aiBoU/2X5W4=
github.com/filecoin-project/specs-actors/v7 v7.0.0-rc1/go.mod h1:TA5FwCna+Yi36POaT7SLKXsgEDvJwc0V/L6ZsO19B9M=
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9 h1:oUYOvF7EvdXS0Zmk9mNkaB6Bu0l+WXBYPzVodKMiLug=
github.com/filecoin-project/specs-storage v0.1.1-0.20211228030229-6d460d25a0c9/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU=
github.com/filecoin-project/specs-storage v0.2.0 h1:Y4UDv0apRQ3zI2GiPPubi8JblpUZZphEdaJUxCutfyg=
github.com/filecoin-project/specs-storage v0.2.0/go.mod h1:Tb88Zq+IBJbvAn3mS89GYj3jdRThBTE/771HCVZdRJU=
github.com/filecoin-project/test-vectors/schema v0.0.5 h1:w3zHQhzM4pYxJDl21avXjOKBLF8egrvwUwjpT8TquDg=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=

View File

@ -30,23 +30,14 @@ func TestCCUpgrade(t *testing.T) {
//stm: @MINER_SECTOR_LIST_001
kit.QuietMiningLogs()
for _, height := range []abi.ChainEpoch{
-1, // before
162, // while sealing
560, // after upgrade deal
} {
height := height // make linters happy by copying
t.Run(fmt.Sprintf("upgrade-%d", height), func(t *testing.T) {
runTestCCUpgrade(t, height)
})
}
runTestCCUpgrade(t)
}
func runTestCCUpgrade(t *testing.T, upgradeHeight abi.ChainEpoch) *kit.TestFullNode {
func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
ctx := context.Background()
blockTime := 1 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15))
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC())
ens.InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx)
@ -91,6 +82,11 @@ func runTestCCUpgrade(t *testing.T, upgradeHeight abi.ChainEpoch) *kit.TestFullN
status, err := miner.SectorsStatus(ctx, CCUpgrade, true)
require.NoError(t, err)
assert.Equal(t, 1, len(status.Deals))
miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{
CCUpgrade: {},
})
return client
}
@ -137,7 +133,7 @@ func TestCCUpgradeAndPoSt(t *testing.T) {
kit.QuietMiningLogs()
t.Run("upgrade and then post", func(t *testing.T) {
ctx := context.Background()
n := runTestCCUpgrade(t, 100)
n := runTestCCUpgrade(t)
ts, err := n.ChainHead(ctx)
require.NoError(t, err)
start := ts.Height()

View File

@ -87,7 +87,10 @@ type TestMiner struct {
func (tm *TestMiner) PledgeSectors(ctx context.Context, n, existing int, blockNotif <-chan struct{}) {
toCheck := tm.StartPledge(ctx, n, existing, blockNotif)
tm.WaitSectorsProving(ctx, toCheck)
}
func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.SectorNumber]struct{}) {
for len(toCheck) > 0 {
tm.FlushSealingBatches(ctx)
@ -105,9 +108,8 @@ func (tm *TestMiner) PledgeSectors(ctx context.Context, n, existing int, blockNo
}
build.Clock.Sleep(100 * time.Millisecond)
fmt.Printf("WaitSeal: %d %+v\n", len(toCheck), states)
fmt.Printf("WaitSectorsProving: %d %+v\n", len(toCheck), states)
}
}
func (tm *TestMiner) StartPledge(ctx context.Context, n, existing int, blockNotif <-chan struct{}) map[abi.SectorNumber]struct{} {

View File

@ -39,6 +39,7 @@ func fullRpc(t *testing.T, f *TestFullNode) *TestFullNode {
require.NoError(t, err)
srv, maddr := CreateRPCServer(t, handler, l)
fmt.Printf("FULLNODE RPC ENV FOR CLI DEBUGGING `export FULLNODE_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
cl, stop, err := client.NewFullNodeRPCV1(context.Background(), "ws://"+srv.Listener.Addr().String()+"/rpc/v1", nil)
require.NoError(t, err)
@ -54,7 +55,9 @@ func minerRpc(t *testing.T, m *TestMiner) *TestMiner {
srv, maddr := CreateRPCServer(t, handler, m.RemoteListener)
fmt.Println("creating RPC server for", m.ActorAddr, "at: ", srv.Listener.Addr().String())
fmt.Printf("creating RPC server for %s at %s\n", m.ActorAddr, srv.Listener.Addr().String())
fmt.Printf("SP RPC ENV FOR CLI DEBUGGING `export MINER_API_INFO=%s`\n", "ws://"+srv.Listener.Addr().String())
url := "ws://" + srv.Listener.Addr().String() + "/rpc/v0"
cl, stop, err := client.NewStorageMinerRPCV0(context.Background(), url, nil)
require.NoError(t, err)