diff --git a/stores/index.go b/stores/index.go index c3271225e..f0ff22a01 100644 --- a/stores/index.go +++ b/stores/index.go @@ -6,6 +6,7 @@ import ( gopath "path" "sort" "sync" + "time" "golang.org/x/xerrors" @@ -13,6 +14,9 @@ import ( "github.com/filecoin-project/specs-actors/actors/abi/big" ) +var HeartBeatInterval = 10 * time.Second +var SkippedHeartbeatThresh = HeartBeatInterval * 5 + // ID identifies sector storage by UUID. One sector storage should map to one // filesystem, local or networked / shared by multiple machines type ID string @@ -24,12 +28,20 @@ type StorageInfo struct { CanSeal bool CanStore bool + + LastHeartbeat time.Time + HeartbeatErr error +} + +type HealthReport struct { + Stat FsStat + Err error } type SectorIndex interface { // part of storage-miner api StorageAttach(context.Context, StorageInfo, FsStat) error StorageInfo(context.Context, ID) (StorageInfo, error) - // TODO: StorageUpdateStats(FsStat) + StorageReportHealth(context.Context, ID, HealthReport) error StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error @@ -46,6 +58,9 @@ type Decl struct { type storageEntry struct { info *StorageInfo fsi FsStat + + lastHeartbeat time.Time + heartbeatErr error } type Index struct { @@ -120,10 +135,28 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) er i.stores[si.ID] = &storageEntry{ info: &si, fsi: st, + + lastHeartbeat: time.Now(), } return nil } +func (i *Index) StorageReportHealth(ctx context.Context, id ID, report HealthReport) error { + i.lk.Lock() + defer i.lk.Unlock() + + ent, ok := i.stores[id] + if !ok { + return xerrors.Errorf("health report for unknown storage: %s", id) + } + + ent.fsi = report.Stat + ent.heartbeatErr = report.Err + ent.lastHeartbeat = time.Now() + + return nil +} + func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error { i.lk.Lock() defer i.lk.Unlock() diff --git a/stores/local.go b/stores/local.go index 281475b1c..673583c6a 100644 --- a/stores/local.go +++ b/stores/local.go @@ -5,9 +5,11 @@ import ( "encoding/json" "io/ioutil" "math/bits" + "math/rand" "os" "path/filepath" "sync" + "time" "golang.org/x/xerrors" @@ -155,9 +157,44 @@ func (st *Local) open(ctx context.Context) error { } } + go st.reportHealth(ctx) + return nil } +func (st *Local) reportHealth(ctx context.Context) { + // randomize interval by ~10% + interval := (HeartBeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000 + + for { + select { + case <-time.After(interval): + case <-ctx.Done(): + return + } + + st.localLk.RLock() + + toReport := map[ID]HealthReport{} + for id, p := range st.paths { + stat, err := Stat(p.local) + + toReport[id] = HealthReport{ + Stat: stat, + Err: err, + } + } + + st.localLk.RUnlock() + + for id, report := range toReport { + if err := st.index.StorageReportHealth(ctx, id, report); err != nil { + log.Warnf("error reporting storage health for %s: %+v", id, report) + } + } + } +} + func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) { if existing|allocate != existing^allocate { return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")