Improve fault checker
This commit is contained in:
parent
4b9317d1f0
commit
1a5af8cafd
33
faults.go
33
faults.go
@ -2,6 +2,8 @@ package sectorstorage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -19,9 +21,22 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredProof, se
|
|||||||
var bad []abi.SectorID
|
var bad []abi.SectorID
|
||||||
|
|
||||||
// TODO: More better checks
|
// TODO: More better checks
|
||||||
// TODO: Use proper locking
|
|
||||||
for _, sector := range sectors {
|
for _, sector := range sectors {
|
||||||
err := func() error {
|
err := func() error {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
locked, err := m.index.StorageTryLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTNone)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !locked {
|
||||||
|
log.Warnw("CheckProvable Sector FAULT: can't acquire read lock", "sector", sector, "sealed")
|
||||||
|
bad = append(bad, sector)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove)
|
lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire sector in checkProvable: %w", err)
|
return xerrors.Errorf("acquire sector in checkProvable: %w", err)
|
||||||
@ -33,7 +48,21 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredProof, se
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// must be fine
|
toCheck := []string{
|
||||||
|
lp.Sealed,
|
||||||
|
filepath.Join(lp.Cache, "t_aux"),
|
||||||
|
filepath.Join(lp.Cache, "p_aux"),
|
||||||
|
filepath.Join(lp.Cache, "sc-02-data-tree-r-last.dat"),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range toCheck {
|
||||||
|
_, err := os.Stat(p)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnw("CheckProvable Sector FAULT: sector file stat error", "sector", sector, "sealed", lp.Sealed, "cache", lp.Cache, "file", p)
|
||||||
|
bad = append(bad, sector)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}()
|
}()
|
||||||
|
@ -62,6 +62,7 @@ type SectorIndex interface { // part of storage-miner api
|
|||||||
|
|
||||||
// atomically acquire locks on all sector file types. close ctx to unlock
|
// atomically acquire locks on all sector file types. close ctx to unlock
|
||||||
StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error
|
StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error
|
||||||
|
StorageTryLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Decl struct {
|
type Decl struct {
|
||||||
|
@ -45,17 +45,26 @@ func (l *sectorLock) tryLock(read SectorFileType, write SectorFileType) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *sectorLock) lock(ctx context.Context, read SectorFileType, write SectorFileType) error {
|
type lockFn func(l *sectorLock, ctx context.Context, read SectorFileType, write SectorFileType) (bool, error)
|
||||||
|
|
||||||
|
func (l *sectorLock) tryLockSafe(ctx context.Context, read SectorFileType, write SectorFileType) (bool, error) {
|
||||||
|
l.cond.L.Lock()
|
||||||
|
defer l.cond.L.Unlock()
|
||||||
|
|
||||||
|
return l.tryLock(read, write), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *sectorLock) lock(ctx context.Context, read SectorFileType, write SectorFileType) (bool, error) {
|
||||||
l.cond.L.Lock()
|
l.cond.L.Lock()
|
||||||
defer l.cond.L.Unlock()
|
defer l.cond.L.Unlock()
|
||||||
|
|
||||||
for !l.tryLock(read, write) {
|
for !l.tryLock(read, write) {
|
||||||
if err := l.cond.Wait(ctx); err != nil {
|
if err := l.cond.Wait(ctx); err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *sectorLock) unlock(read SectorFileType, write SectorFileType) {
|
func (l *sectorLock) unlock(read SectorFileType, write SectorFileType) {
|
||||||
@ -79,13 +88,13 @@ type indexLocks struct {
|
|||||||
locks map[abi.SectorID]*sectorLock
|
locks map[abi.SectorID]*sectorLock
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error {
|
func (i *indexLocks) lockWith(ctx context.Context, lockFn lockFn, sector abi.SectorID, read SectorFileType, write SectorFileType) (bool, error) {
|
||||||
if read|write == 0 {
|
if read|write == 0 {
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if read|write > (1<<FileTypes)-1 {
|
if read|write > (1<<FileTypes)-1 {
|
||||||
return xerrors.Errorf("unknown file types specified")
|
return false, xerrors.Errorf("unknown file types specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
i.lk.Lock()
|
i.lk.Lock()
|
||||||
@ -100,8 +109,12 @@ func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read
|
|||||||
|
|
||||||
i.lk.Unlock()
|
i.lk.Unlock()
|
||||||
|
|
||||||
if err := slk.lock(ctx, read, write); err != nil {
|
locked, err := lockFn(slk, ctx, read, write)
|
||||||
return err
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if !locked {
|
||||||
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
@ -120,5 +133,22 @@ func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read
|
|||||||
i.lk.Unlock()
|
i.lk.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error {
|
||||||
|
ok, err := i.lockWith(ctx, (*sectorLock).lock, sector, read, write)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("failed to acquire lock")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *indexLocks) StorageTryLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) (bool, error) {
|
||||||
|
return i.lockWith(ctx, (*sectorLock).tryLockSafe, sector, read, write)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user