remote store: Handle parallel fetches
This commit is contained in:
parent
dd23715942
commit
7f1c9c89e7
@ -27,9 +27,8 @@ type Remote struct {
|
|||||||
index SectorIndex
|
index SectorIndex
|
||||||
auth http.Header
|
auth http.Header
|
||||||
|
|
||||||
fetchLk sync.Mutex // TODO: this can be much smarter
|
fetchLk sync.Mutex
|
||||||
// TODO: allow multiple parallel fetches
|
fetching map[abi.SectorID]chan struct{}
|
||||||
// (make sure to not fetch the same sector data twice)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
||||||
@ -37,6 +36,8 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
|||||||
local: local,
|
local: local,
|
||||||
index: index,
|
index: index,
|
||||||
auth: auth,
|
auth: auth,
|
||||||
|
|
||||||
|
fetching: map[abi.SectorID]chan struct{}{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,8 +46,32 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec
|
|||||||
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
||||||
}
|
}
|
||||||
|
|
||||||
r.fetchLk.Lock()
|
for {
|
||||||
defer r.fetchLk.Unlock()
|
r.fetchLk.Lock()
|
||||||
|
|
||||||
|
c, locked := r.fetching[s]
|
||||||
|
if !locked {
|
||||||
|
r.fetching[s] = make(chan struct{})
|
||||||
|
r.fetchLk.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
r.fetchLk.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c:
|
||||||
|
continue
|
||||||
|
case <-ctx.Done():
|
||||||
|
return SectorPaths{}, SectorPaths{}, nil, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
r.fetchLk.Lock()
|
||||||
|
close(r.fetching[s])
|
||||||
|
delete(r.fetching, s)
|
||||||
|
r.fetchLk.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
|
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user