stores: Cache disk usage results for a few seconds
This commit is contained in:
parent
3c9a8a42d9
commit
086908e535
49
extern/sector-storage/stores/local.go
vendored
49
extern/sector-storage/stores/local.go
vendored
@ -85,56 +85,9 @@ type path struct {
|
|||||||
|
|
||||||
reserved int64
|
reserved int64
|
||||||
reservations map[abi.SectorID]storiface.SectorFileType
|
reservations map[abi.SectorID]storiface.SectorFileType
|
||||||
|
|
||||||
statLk sync.Mutex
|
|
||||||
statDone chan struct{}
|
|
||||||
|
|
||||||
lastStat *fsutil.FsStat // nil if no stat / was error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
|
func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
|
||||||
for {
|
|
||||||
p.statLk.Lock()
|
|
||||||
if p.statDone == nil {
|
|
||||||
p.statDone = make(chan struct{})
|
|
||||||
p.statLk.Unlock()
|
|
||||||
|
|
||||||
st, err := p.doStat(ls)
|
|
||||||
|
|
||||||
p.statLk.Lock()
|
|
||||||
p.lastStat = nil
|
|
||||||
if err == nil {
|
|
||||||
p.lastStat = &st
|
|
||||||
}
|
|
||||||
close(p.statDone)
|
|
||||||
p.statDone = nil
|
|
||||||
p.statLk.Unlock()
|
|
||||||
return st, err
|
|
||||||
}
|
|
||||||
|
|
||||||
doneCh := p.statDone
|
|
||||||
p.statLk.Unlock()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-doneCh:
|
|
||||||
// todo context?
|
|
||||||
}
|
|
||||||
|
|
||||||
p.statLk.Lock()
|
|
||||||
if p.lastStat == nil {
|
|
||||||
p.statLk.Unlock()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
st := *p.lastStat
|
|
||||||
|
|
||||||
p.statLk.Unlock()
|
|
||||||
|
|
||||||
return st, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *path) doStat(ls LocalStorage) (fsutil.FsStat, error) {
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
stat, err := ls.Stat(p.local)
|
stat, err := ls.Stat(p.local)
|
||||||
@ -217,7 +170,7 @@ type URLs []string
|
|||||||
|
|
||||||
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
|
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
|
||||||
l := &Local{
|
l := &Local{
|
||||||
localStorage: ls,
|
localStorage: newCachedLocalStorage(ls),
|
||||||
index: index,
|
index: index,
|
||||||
urls: urls,
|
urls: urls,
|
||||||
|
|
||||||
|
131
extern/sector-storage/stores/localstorage_cached.go
vendored
Normal file
131
extern/sector-storage/stores/localstorage_cached.go
vendored
Normal file
@ -0,0 +1,131 @@
|
|||||||
|
package stores
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
var StatTimeout = 5 * time.Second
|
||||||
|
var MaxDiskUsageDuration = time.Second
|
||||||
|
|
||||||
|
type cachedLocalStorage struct {
|
||||||
|
base LocalStorage
|
||||||
|
|
||||||
|
statLk sync.Mutex
|
||||||
|
stats *lru.Cache // path -> statEntry
|
||||||
|
pathDUs *lru.Cache // path -> *diskUsageEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCachedLocalStorage(ls LocalStorage) *cachedLocalStorage {
|
||||||
|
statCache, _ := lru.New(1024)
|
||||||
|
duCache, _ := lru.New(1024)
|
||||||
|
|
||||||
|
return &cachedLocalStorage{
|
||||||
|
base: ls,
|
||||||
|
stats: statCache,
|
||||||
|
pathDUs: duCache,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type statEntry struct {
|
||||||
|
stat fsutil.FsStat
|
||||||
|
time time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
type diskUsageEntry struct {
|
||||||
|
last diskUsageResult
|
||||||
|
|
||||||
|
usagePromise <-chan diskUsageResult
|
||||||
|
}
|
||||||
|
|
||||||
|
type diskUsageResult struct {
|
||||||
|
usage int64
|
||||||
|
time time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cachedLocalStorage) GetStorage() (StorageConfig, error) {
|
||||||
|
return c.base.GetStorage()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cachedLocalStorage) SetStorage(f func(*StorageConfig)) error {
|
||||||
|
return c.base.SetStorage(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cachedLocalStorage) Stat(path string) (fsutil.FsStat, error) {
|
||||||
|
c.statLk.Lock()
|
||||||
|
defer c.statLk.Unlock()
|
||||||
|
|
||||||
|
if v, ok := c.stats.Get(path); ok && time.Now().Sub(v.(statEntry).time) < StatTimeout {
|
||||||
|
return v.(statEntry).stat, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we don't, get the stat
|
||||||
|
st, err := c.base.Stat(path)
|
||||||
|
if err == nil {
|
||||||
|
c.stats.Add(path, statEntry{
|
||||||
|
stat: st,
|
||||||
|
time: time.Now(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return st, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cachedLocalStorage) DiskUsage(path string) (int64, error) {
|
||||||
|
c.statLk.Lock()
|
||||||
|
defer c.statLk.Unlock()
|
||||||
|
|
||||||
|
var entry *diskUsageEntry
|
||||||
|
|
||||||
|
if v, ok := c.pathDUs.Get(path); ok {
|
||||||
|
entry = v.(*diskUsageEntry)
|
||||||
|
|
||||||
|
// if we have recent cached entry, use that
|
||||||
|
if time.Now().Sub(entry.last.time) < StatTimeout {
|
||||||
|
return entry.last.usage, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
entry = new(diskUsageEntry)
|
||||||
|
c.pathDUs.Add(path, entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// start a new disk usage request; this can take a while so start a
|
||||||
|
// goroutine, and if it doesn't return quickly, return either the
|
||||||
|
// previous value, or zero - that's better than potentially blocking
|
||||||
|
// here for a long time.
|
||||||
|
if entry.usagePromise == nil {
|
||||||
|
resCh := make(chan diskUsageResult, 1)
|
||||||
|
go func() {
|
||||||
|
du, err := c.base.DiskUsage(path)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorw("error getting disk usage", "path", path, "error", err)
|
||||||
|
}
|
||||||
|
resCh <- diskUsageResult{
|
||||||
|
usage: du,
|
||||||
|
time: time.Now(),
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
entry.usagePromise = resCh
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the disk usage result; if it doesn't come, fallback on
|
||||||
|
// previous usage
|
||||||
|
select {
|
||||||
|
case du := <-entry.usagePromise:
|
||||||
|
entry.usagePromise = nil
|
||||||
|
entry.last = du
|
||||||
|
case <-time.After(MaxDiskUsageDuration):
|
||||||
|
log.Warnw("getting usage is slow, falling back to previous usage",
|
||||||
|
"path", path,
|
||||||
|
"fallback", entry.last.usage,
|
||||||
|
"age", time.Now().Sub(entry.last.time))
|
||||||
|
}
|
||||||
|
|
||||||
|
return entry.last.usage, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ LocalStorage = &cachedLocalStorage{}
|
6
extern/sector-storage/stores/remote.go
vendored
6
extern/sector-storage/stores/remote.go
vendored
@ -95,6 +95,8 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
|
|||||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// First make sure that no other goroutines are trying to fetch this sector;
|
||||||
|
// wait if there are any.
|
||||||
for {
|
for {
|
||||||
r.fetchLk.Lock()
|
r.fetchLk.Lock()
|
||||||
|
|
||||||
@ -122,6 +124,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
|
|||||||
r.fetchLk.Unlock()
|
r.fetchLk.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// Try to get the sector from local storage
|
||||||
paths, stores, err := r.local.AcquireSector(ctx, s, existing, allocate, pathType, op)
|
paths, stores, err := r.local.AcquireSector(ctx, s, existing, allocate, pathType, op)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
|
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
|
||||||
@ -148,6 +151,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin
|
|||||||
odt = storiface.FsOverheadFinalized
|
odt = storiface.FsOverheadFinalized
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If any path types weren't found in local storage, try fetching them
|
||||||
|
|
||||||
|
// First reserve storage
|
||||||
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt)
|
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
|
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user