storage: Integrate async workers in sealing manager
This commit is contained in:
parent
5d73943929
commit
06e3852cef
178
extern/sector-storage/manager.go
vendored
178
extern/sector-storage/manager.go
vendored
@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/hashicorp/go-multierror"
|
"github.com/hashicorp/go-multierror"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
@ -68,6 +69,15 @@ type Manager struct {
|
|||||||
sched *scheduler
|
sched *scheduler
|
||||||
|
|
||||||
storage.Prover
|
storage.Prover
|
||||||
|
|
||||||
|
resLk sync.Mutex
|
||||||
|
results map[storiface.CallID]result
|
||||||
|
waitRes map[storiface.CallID]chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type result struct {
|
||||||
|
r interface{}
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
type SealerConfig struct {
|
type SealerConfig struct {
|
||||||
@ -191,9 +201,10 @@ func schedNop(context.Context, Worker) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
|
func schedFetch(wf waitFunc, sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
|
||||||
return func(ctx context.Context, worker Worker) error {
|
return func(ctx context.Context, worker Worker) error {
|
||||||
return worker.Fetch(ctx, sector, ft, ptype, am)
|
_, err := wf(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -220,15 +231,21 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
|
|
||||||
var readOk bool
|
var readOk bool
|
||||||
|
|
||||||
|
readPiece := func(ctx context.Context, w Worker) error {
|
||||||
|
r, err := m.waitResult(ctx)(w.ReadPiece(ctx, sink, sector, offset, size))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
readOk = r.(bool)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if len(best) > 0 {
|
if len(best) > 0 {
|
||||||
// There is unsealed sector, see if we can read from it
|
// There is unsealed sector, see if we can read from it
|
||||||
|
|
||||||
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
|
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.waitResult, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), readPiece)
|
||||||
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||||
}
|
}
|
||||||
@ -239,12 +256,12 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
unsealFetch := func(ctx context.Context, worker Worker) error {
|
unsealFetch := func(ctx context.Context, worker Worker) error {
|
||||||
if err := worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy); err != nil {
|
if _, err := m.waitResult(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
|
||||||
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(best) > 0 {
|
if len(best) > 0 {
|
||||||
if err := worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove); err != nil {
|
if _, err := m.waitResult(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil {
|
||||||
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -252,7 +269,8 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
|
||||||
return w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed)
|
_, err := m.waitResult(ctx)(w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed))
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -260,10 +278,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
|
|
||||||
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
|
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.waitResult, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), readPiece)
|
||||||
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||||
}
|
}
|
||||||
@ -298,11 +313,11 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
|||||||
|
|
||||||
var out abi.PieceInfo
|
var out abi.PieceInfo
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error {
|
||||||
p, err := w.AddPiece(ctx, sector, existingPieces, sz, r)
|
p, err := m.waitResult(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
out = p
|
out = p.(abi.PieceInfo)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -321,12 +336,12 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
|
|||||||
|
|
||||||
selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing)
|
selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(m.waitResult, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
|
p, err := m.waitResult(ctx)(w.SealPreCommit1(ctx, sector, ticket, pieces))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
out = p
|
out = p.(storage.PreCommit1Out)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -343,12 +358,12 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
|
|||||||
|
|
||||||
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true)
|
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(m.waitResult, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
p, err := w.SealPreCommit2(ctx, sector, phase1Out)
|
p, err := m.waitResult(ctx)(w.SealPreCommit2(ctx, sector, phase1Out))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
out = p
|
out = p.(storage.SectorCids)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return out, err
|
return out, err
|
||||||
@ -367,12 +382,12 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
|
|||||||
// generally very cheap / fast, and transferring data is not worth the effort
|
// generally very cheap / fast, and transferring data is not worth the effort
|
||||||
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
|
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(m.waitResult, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
p, err := m.waitResult(ctx)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
out = p
|
out = p.(storage.Commit1Out)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return out, err
|
return out, err
|
||||||
@ -382,11 +397,11 @@ func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Ou
|
|||||||
selector := newTaskSelector()
|
selector := newTaskSelector()
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
|
||||||
p, err := w.SealCommit2(ctx, sector, phase1Out)
|
p, err := m.waitResult(ctx)(w.SealCommit2(ctx, sector, phase1Out))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
out = p
|
out = p.(storage.Proof)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -416,9 +431,10 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
|
|||||||
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
|
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||||
schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
|
schedFetch(m.waitResult, sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
return w.FinalizeSector(ctx, sector, keepUnsealed)
|
_, err := m.waitResult(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -433,9 +449,10 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
||||||
schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
schedFetch(m.waitResult, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
return w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed)
|
_, err := m.waitResult(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed))
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("moving sector to storage: %w", err)
|
return xerrors.Errorf("moving sector to storage: %w", err)
|
||||||
@ -472,6 +489,107 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type waitFunc func(ctx context.Context) func(callID storiface.CallID, err error) (interface{}, error)
|
||||||
|
|
||||||
|
func (m *Manager) waitResult(ctx context.Context) func(callID storiface.CallID, err error) (interface{}, error) {
|
||||||
|
return func(callID storiface.CallID, err error) (interface{}, error) {
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
m.resLk.Lock()
|
||||||
|
res, ok := m.results[callID]
|
||||||
|
if ok {
|
||||||
|
m.resLk.Unlock()
|
||||||
|
return res.r, res.err
|
||||||
|
}
|
||||||
|
|
||||||
|
ch, ok := m.waitRes[callID]
|
||||||
|
if !ok {
|
||||||
|
ch = make(chan struct{})
|
||||||
|
m.waitRes[callID] = ch
|
||||||
|
}
|
||||||
|
m.resLk.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
m.resLk.Lock()
|
||||||
|
defer m.resLk.Unlock()
|
||||||
|
|
||||||
|
res := m.results[callID]
|
||||||
|
delete(m.results, callID)
|
||||||
|
|
||||||
|
return res.r, res.err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, xerrors.Errorf("waiting for result: %w", ctx.Err())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) returnResult(callID storiface.CallID, r interface{}, err string) error {
|
||||||
|
m.resLk.Lock()
|
||||||
|
defer m.resLk.Unlock()
|
||||||
|
|
||||||
|
_, ok := m.results[callID]
|
||||||
|
if ok {
|
||||||
|
return xerrors.Errorf("result for call %v already reported")
|
||||||
|
}
|
||||||
|
|
||||||
|
m.results[callID] = result{
|
||||||
|
r: r,
|
||||||
|
err: errors.New(err),
|
||||||
|
}
|
||||||
|
|
||||||
|
close(m.waitRes[callID])
|
||||||
|
delete(m.waitRes, callID)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error {
|
||||||
|
return m.returnResult(callID, pi, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error {
|
||||||
|
return m.returnResult(callID, p1o, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error {
|
||||||
|
return m.returnResult(callID, sealed, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error {
|
||||||
|
return m.returnResult(callID, out, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error {
|
||||||
|
return m.returnResult(callID, proof, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error {
|
||||||
|
return m.returnResult(callID, nil, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error {
|
||||||
|
return m.returnResult(callID, nil, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error {
|
||||||
|
return m.returnResult(callID, nil, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error {
|
||||||
|
return m.returnResult(callID, nil, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error {
|
||||||
|
return m.returnResult(callID, ok, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error {
|
||||||
|
return m.returnResult(callID, nil, err)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
|
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
|
||||||
l, err := m.localStore.Local(ctx)
|
l, err := m.localStore.Local(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user