stores: Simple health reporting
This commit is contained in:
parent
8c47b13d9e
commit
fc637552b7
@ -6,6 +6,7 @@ import (
|
|||||||
gopath "path"
|
gopath "path"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -13,6 +14,9 @@ import (
|
|||||||
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
"github.com/filecoin-project/specs-actors/actors/abi/big"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var HeartBeatInterval = 10 * time.Second
|
||||||
|
var SkippedHeartbeatThresh = HeartBeatInterval * 5
|
||||||
|
|
||||||
// ID identifies sector storage by UUID. One sector storage should map to one
|
// ID identifies sector storage by UUID. One sector storage should map to one
|
||||||
// filesystem, local or networked / shared by multiple machines
|
// filesystem, local or networked / shared by multiple machines
|
||||||
type ID string
|
type ID string
|
||||||
@ -24,12 +28,20 @@ type StorageInfo struct {
|
|||||||
|
|
||||||
CanSeal bool
|
CanSeal bool
|
||||||
CanStore bool
|
CanStore bool
|
||||||
|
|
||||||
|
LastHeartbeat time.Time
|
||||||
|
HeartbeatErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
type HealthReport struct {
|
||||||
|
Stat FsStat
|
||||||
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
type SectorIndex interface { // part of storage-miner api
|
type SectorIndex interface { // part of storage-miner api
|
||||||
StorageAttach(context.Context, StorageInfo, FsStat) error
|
StorageAttach(context.Context, StorageInfo, FsStat) error
|
||||||
StorageInfo(context.Context, ID) (StorageInfo, error)
|
StorageInfo(context.Context, ID) (StorageInfo, error)
|
||||||
// TODO: StorageUpdateStats(FsStat)
|
StorageReportHealth(context.Context, ID, HealthReport) error
|
||||||
|
|
||||||
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
|
StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
|
||||||
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
|
StorageDropSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error
|
||||||
@ -46,6 +58,9 @@ type Decl struct {
|
|||||||
type storageEntry struct {
|
type storageEntry struct {
|
||||||
info *StorageInfo
|
info *StorageInfo
|
||||||
fsi FsStat
|
fsi FsStat
|
||||||
|
|
||||||
|
lastHeartbeat time.Time
|
||||||
|
heartbeatErr error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Index struct {
|
type Index struct {
|
||||||
@ -120,10 +135,28 @@ func (i *Index) StorageAttach(ctx context.Context, si StorageInfo, st FsStat) er
|
|||||||
i.stores[si.ID] = &storageEntry{
|
i.stores[si.ID] = &storageEntry{
|
||||||
info: &si,
|
info: &si,
|
||||||
fsi: st,
|
fsi: st,
|
||||||
|
|
||||||
|
lastHeartbeat: time.Now(),
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *Index) StorageReportHealth(ctx context.Context, id ID, report HealthReport) error {
|
||||||
|
i.lk.Lock()
|
||||||
|
defer i.lk.Unlock()
|
||||||
|
|
||||||
|
ent, ok := i.stores[id]
|
||||||
|
if !ok {
|
||||||
|
return xerrors.Errorf("health report for unknown storage: %s", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
ent.fsi = report.Stat
|
||||||
|
ent.heartbeatErr = report.Err
|
||||||
|
ent.lastHeartbeat = time.Now()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error {
|
func (i *Index) StorageDeclareSector(ctx context.Context, storageId ID, s abi.SectorID, ft SectorFileType) error {
|
||||||
i.lk.Lock()
|
i.lk.Lock()
|
||||||
defer i.lk.Unlock()
|
defer i.lk.Unlock()
|
||||||
|
@ -5,9 +5,11 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/bits"
|
"math/bits"
|
||||||
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
@ -155,9 +157,44 @@ func (st *Local) open(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
go st.reportHealth(ctx)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (st *Local) reportHealth(ctx context.Context) {
|
||||||
|
// randomize interval by ~10%
|
||||||
|
interval := (HeartBeatInterval*100_000 + time.Duration(rand.Int63n(10_000))) / 100_000
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(interval):
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
st.localLk.RLock()
|
||||||
|
|
||||||
|
toReport := map[ID]HealthReport{}
|
||||||
|
for id, p := range st.paths {
|
||||||
|
stat, err := Stat(p.local)
|
||||||
|
|
||||||
|
toReport[id] = HealthReport{
|
||||||
|
Stat: stat,
|
||||||
|
Err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
st.localLk.RUnlock()
|
||||||
|
|
||||||
|
for id, report := range toReport {
|
||||||
|
if err := st.index.StorageReportHealth(ctx, id, report); err != nil {
|
||||||
|
log.Warnf("error reporting storage health for %s: %+v", id, report)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
|
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, existing SectorFileType, allocate SectorFileType, sealing bool) (SectorPaths, SectorPaths, func(), error) {
|
||||||
if existing|allocate != existing^allocate {
|
if existing|allocate != existing^allocate {
|
||||||
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
||||||
|
Loading…
Reference in New Issue
Block a user