diff --git a/extern/sector-storage/fsutil/filesize_unix.go b/extern/sector-storage/fsutil/filesize_unix.go index 7df8dae4c..9c7f04ace 100644 --- a/extern/sector-storage/fsutil/filesize_unix.go +++ b/extern/sector-storage/fsutil/filesize_unix.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" "syscall" + "time" "golang.org/x/xerrors" ) @@ -15,6 +16,8 @@ type SizeInfo struct { // FileSize returns bytes used by a file or directory on disk // NOTE: We care about the allocated bytes, not file or directory size func FileSize(path string) (SizeInfo, error) { + start := time.Now() + var size int64 err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { if err != nil { @@ -32,6 +35,11 @@ func FileSize(path string) (SizeInfo, error) { } return err }) + + if time.Now().Sub(start) >= 3*time.Second { + log.Warnw("very slow file size check", "took", time.Now().Sub(start), "path", path) + } + if err != nil { if os.IsNotExist(err) { return SizeInfo{}, os.ErrNotExist diff --git a/extern/sector-storage/stores/local.go b/extern/sector-storage/stores/local.go index 4efddca38..4a8c4e1b3 100644 --- a/extern/sector-storage/stores/local.go +++ b/extern/sector-storage/stores/local.go @@ -88,6 +88,8 @@ type path struct { } func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) { + start := time.Now() + stat, err := ls.Stat(p.local) if err != nil { return fsutil.FsStat{}, xerrors.Errorf("stat %s: %w", p.local, err) @@ -155,6 +157,8 @@ func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) { } } + log.Infow("storage stat", "took", time.Now().Sub(start), "reservations", len(p.reservations)) + return stat, err } @@ -166,7 +170,7 @@ type URLs []string func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) { l := &Local{ - localStorage: ls, + localStorage: newCachedLocalStorage(ls), index: index, urls: urls, diff --git a/extern/sector-storage/stores/localstorage_cached.go b/extern/sector-storage/stores/localstorage_cached.go new file mode 100644 index 000000000..9ada06cbf --- /dev/null +++ b/extern/sector-storage/stores/localstorage_cached.go @@ -0,0 +1,131 @@ +package stores + +import ( + "sync" + "time" + + lru "github.com/hashicorp/golang-lru" + + "github.com/filecoin-project/lotus/extern/sector-storage/fsutil" +) + +var StatTimeout = 5 * time.Second +var MaxDiskUsageDuration = time.Second + +type cachedLocalStorage struct { + base LocalStorage + + statLk sync.Mutex + stats *lru.Cache // path -> statEntry + pathDUs *lru.Cache // path -> *diskUsageEntry +} + +func newCachedLocalStorage(ls LocalStorage) *cachedLocalStorage { + statCache, _ := lru.New(1024) + duCache, _ := lru.New(1024) + + return &cachedLocalStorage{ + base: ls, + stats: statCache, + pathDUs: duCache, + } +} + +type statEntry struct { + stat fsutil.FsStat + time time.Time +} + +type diskUsageEntry struct { + last diskUsageResult + + usagePromise <-chan diskUsageResult +} + +type diskUsageResult struct { + usage int64 + time time.Time +} + +func (c *cachedLocalStorage) GetStorage() (StorageConfig, error) { + return c.base.GetStorage() +} + +func (c *cachedLocalStorage) SetStorage(f func(*StorageConfig)) error { + return c.base.SetStorage(f) +} + +func (c *cachedLocalStorage) Stat(path string) (fsutil.FsStat, error) { + c.statLk.Lock() + defer c.statLk.Unlock() + + if v, ok := c.stats.Get(path); ok && time.Now().Sub(v.(statEntry).time) < StatTimeout { + return v.(statEntry).stat, nil + } + + // if we don't, get the stat + st, err := c.base.Stat(path) + if err == nil { + c.stats.Add(path, statEntry{ + stat: st, + time: time.Now(), + }) + } + + return st, err +} + +func (c *cachedLocalStorage) DiskUsage(path string) (int64, error) { + c.statLk.Lock() + defer c.statLk.Unlock() + + var entry *diskUsageEntry + + if v, ok := c.pathDUs.Get(path); ok { + entry = v.(*diskUsageEntry) + + // if we have recent cached entry, use that + if time.Now().Sub(entry.last.time) < StatTimeout { + return entry.last.usage, nil + } + } else { + entry = new(diskUsageEntry) + c.pathDUs.Add(path, entry) + } + + // start a new disk usage request; this can take a while so start a + // goroutine, and if it doesn't return quickly, return either the + // previous value, or zero - that's better than potentially blocking + // here for a long time. + if entry.usagePromise == nil { + resCh := make(chan diskUsageResult, 1) + go func() { + du, err := c.base.DiskUsage(path) + if err != nil { + log.Errorw("error getting disk usage", "path", path, "error", err) + } + resCh <- diskUsageResult{ + usage: du, + time: time.Now(), + } + }() + entry.usagePromise = resCh + } + + // wait for the disk usage result; if it doesn't come, fallback on + // previous usage + select { + case du := <-entry.usagePromise: + entry.usagePromise = nil + entry.last = du + case <-time.After(MaxDiskUsageDuration): + log.Warnw("getting usage is slow, falling back to previous usage", + "path", path, + "fallback", entry.last.usage, + "age", time.Now().Sub(entry.last.time)) + } + + return entry.last.usage, nil +} + +var _ LocalStorage = &cachedLocalStorage{} diff --git a/extern/sector-storage/stores/remote.go b/extern/sector-storage/stores/remote.go index 441088228..e8e008139 100644 --- a/extern/sector-storage/stores/remote.go +++ b/extern/sector-storage/stores/remote.go @@ -95,6 +95,8 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") } + // First make sure that no other goroutines are trying to fetch this sector; + // wait if there are any. for { r.fetchLk.Lock() @@ -122,6 +124,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin r.fetchLk.Unlock() }() + // Try to get the sector from local storage paths, stores, err := r.local.AcquireSector(ctx, s, existing, allocate, pathType, op) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err) @@ -148,6 +151,9 @@ func (r *Remote) AcquireSector(ctx context.Context, s storage.SectorRef, existin odt = storiface.FsOverheadFinalized } + // If any path types weren't found in local storage, try fetching them + + // First reserve storage releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, odt) if err != nil { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)