From ff9ffddd5769696b6769e9a89aa7869985ad3b03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 24 Jul 2020 16:43:41 +0200 Subject: [PATCH] remote: Limit parallel fetches --- manager.go | 4 +++- manager_test.go | 2 +- stores/remote.go | 21 ++++++++++++++++++++- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/manager.go b/manager.go index 063456fa9..64dd2dcbc 100644 --- a/manager.go +++ b/manager.go @@ -76,6 +76,8 @@ type Manager struct { } type SealerConfig struct { + ParallelFetchLimit int + // Local worker config AllowPreCommit1 bool AllowPreCommit2 bool @@ -96,7 +98,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg return nil, xerrors.Errorf("creating prover instance: %w", err) } - stor := stores.NewRemote(lstor, si, http.Header(sa)) + stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit) m := &Manager{ scfg: cfg, diff --git a/manager_test.go b/manager_test.go index 9cee303c5..10e6a5020 100644 --- a/manager_test.go +++ b/manager_test.go @@ -95,7 +95,7 @@ func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *st prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg) require.NoError(t, err) - stor := stores.NewRemote(lstor, si, nil) + stor := stores.NewRemote(lstor, si, nil, 6000) m := &Manager{ scfg: cfg, diff --git a/stores/remote.go b/stores/remote.go index c78f026f4..ee68b5ef6 100644 --- a/stores/remote.go +++ b/stores/remote.go @@ -29,6 +29,8 @@ type Remote struct { index SectorIndex auth http.Header + limit chan struct{} + fetchLk sync.Mutex fetching map[abi.SectorID]chan struct{} } @@ -41,12 +43,14 @@ func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types SectorF return r.local.RemoveCopies(ctx, s, types) } -func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote { +func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int) *Remote { return &Remote{ local: local, index: index, auth: auth, + limit: make(chan struct{}, fetchLimit), + fetching: map[abi.SectorID]chan struct{}{}, } } @@ -165,6 +169,21 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi. func (r *Remote) fetch(ctx context.Context, url, outname string) error { log.Infof("Fetch %s -> %s", url, outname) + if len(r.limit) >= cap(r.limit) { + log.Infof("Throttling fetch, %d already running", len(r.limit)) + } + + // TODO: Smarter throttling + // * Priority (just going sequentially is still pretty good) + // * Per interface + // * Aware of remote load + select { + case r.limit <- struct{}{}: + defer func() { <-r.limit }() + case <-ctx.Done(): + return xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err()) + } + req, err := http.NewRequest("GET", url, nil) if err != nil { return xerrors.Errorf("request: %w", err)