remote: Limit parallel fetches
This commit is contained in:
parent
ed2e57dde6
commit
ff9ffddd57
@ -76,6 +76,8 @@ type Manager struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SealerConfig struct {
|
type SealerConfig struct {
|
||||||
|
ParallelFetchLimit int
|
||||||
|
|
||||||
// Local worker config
|
// Local worker config
|
||||||
AllowPreCommit1 bool
|
AllowPreCommit1 bool
|
||||||
AllowPreCommit2 bool
|
AllowPreCommit2 bool
|
||||||
@ -96,7 +98,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
|
|||||||
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stor := stores.NewRemote(lstor, si, http.Header(sa))
|
stor := stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit)
|
||||||
|
|
||||||
m := &Manager{
|
m := &Manager{
|
||||||
scfg: cfg,
|
scfg: cfg,
|
||||||
|
@ -95,7 +95,7 @@ func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *st
|
|||||||
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg)
|
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
stor := stores.NewRemote(lstor, si, nil)
|
stor := stores.NewRemote(lstor, si, nil, 6000)
|
||||||
|
|
||||||
m := &Manager{
|
m := &Manager{
|
||||||
scfg: cfg,
|
scfg: cfg,
|
||||||
|
@ -29,6 +29,8 @@ type Remote struct {
|
|||||||
index SectorIndex
|
index SectorIndex
|
||||||
auth http.Header
|
auth http.Header
|
||||||
|
|
||||||
|
limit chan struct{}
|
||||||
|
|
||||||
fetchLk sync.Mutex
|
fetchLk sync.Mutex
|
||||||
fetching map[abi.SectorID]chan struct{}
|
fetching map[abi.SectorID]chan struct{}
|
||||||
}
|
}
|
||||||
@ -41,12 +43,14 @@ func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types SectorF
|
|||||||
return r.local.RemoveCopies(ctx, s, types)
|
return r.local.RemoveCopies(ctx, s, types)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int) *Remote {
|
||||||
return &Remote{
|
return &Remote{
|
||||||
local: local,
|
local: local,
|
||||||
index: index,
|
index: index,
|
||||||
auth: auth,
|
auth: auth,
|
||||||
|
|
||||||
|
limit: make(chan struct{}, fetchLimit),
|
||||||
|
|
||||||
fetching: map[abi.SectorID]chan struct{}{},
|
fetching: map[abi.SectorID]chan struct{}{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -165,6 +169,21 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, spt abi.
|
|||||||
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
func (r *Remote) fetch(ctx context.Context, url, outname string) error {
|
||||||
log.Infof("Fetch %s -> %s", url, outname)
|
log.Infof("Fetch %s -> %s", url, outname)
|
||||||
|
|
||||||
|
if len(r.limit) >= cap(r.limit) {
|
||||||
|
log.Infof("Throttling fetch, %d already running", len(r.limit))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Smarter throttling
|
||||||
|
// * Priority (just going sequentially is still pretty good)
|
||||||
|
// * Per interface
|
||||||
|
// * Aware of remote load
|
||||||
|
select {
|
||||||
|
case r.limit <- struct{}{}:
|
||||||
|
defer func() { <-r.limit }()
|
||||||
|
case <-ctx.Done():
|
||||||
|
return xerrors.Errorf("context error while waiting for fetch limiter: %w", ctx.Err())
|
||||||
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", url, nil)
|
req, err := http.NewRequest("GET", url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("request: %w", err)
|
return xerrors.Errorf("request: %w", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user