sectorstorage: get new work tracker to run
This commit is contained in:
parent
b1361aaf8b
commit
5e09581256
@ -444,6 +444,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
|
|||||||
}
|
}
|
||||||
|
|
||||||
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
|
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
|
||||||
|
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
|
||||||
|
|
||||||
smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &ffiwrapper.Config{
|
smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &ffiwrapper.Config{
|
||||||
SealProofType: spt,
|
SealProofType: spt,
|
||||||
@ -454,7 +455,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
|
|||||||
AllowPreCommit2: true,
|
AllowPreCommit2: true,
|
||||||
AllowCommit: true,
|
AllowCommit: true,
|
||||||
AllowUnseal: true,
|
AllowUnseal: true,
|
||||||
}, nil, sa, wsts)
|
}, nil, sa, wsts, smsts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
148
extern/sector-storage/cbor_gen.go
vendored
148
extern/sector-storage/cbor_gen.go
vendored
@ -143,3 +143,151 @@ func (t *Call) UnmarshalCBOR(r io.Reader) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (t *WorkState) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte{163}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
scratch := make([]byte, 9)
|
||||||
|
|
||||||
|
// t.Status (sectorstorage.WorkStatus) (string)
|
||||||
|
if len("Status") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"Status\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Status"))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.WriteString(w, string("Status")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(t.Status) > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field t.Status was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.Status))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.WriteString(w, string(t.Status)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.WorkerCall (storiface.CallID) (struct)
|
||||||
|
if len("WorkerCall") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"WorkerCall\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("WorkerCall"))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.WriteString(w, string("WorkerCall")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := t.WorkerCall.MarshalCBOR(w); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.WorkError (string) (string)
|
||||||
|
if len("WorkError") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"WorkError\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("WorkError"))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.WriteString(w, string("WorkError")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(t.WorkError) > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field t.WorkError was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len(t.WorkError))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.WriteString(w, string(t.WorkError)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *WorkState) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
*t = WorkState{}
|
||||||
|
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
scratch := make([]byte, 8)
|
||||||
|
|
||||||
|
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajMap {
|
||||||
|
return fmt.Errorf("cbor input should be of type map")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra > cbg.MaxLength {
|
||||||
|
return fmt.Errorf("WorkState: map struct too large (%d)", extra)
|
||||||
|
}
|
||||||
|
|
||||||
|
var name string
|
||||||
|
n := extra
|
||||||
|
|
||||||
|
for i := uint64(0); i < n; i++ {
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadStringBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
name = string(sval)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch name {
|
||||||
|
// t.Status (sectorstorage.WorkStatus) (string)
|
||||||
|
case "Status":
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadStringBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Status = WorkStatus(sval)
|
||||||
|
}
|
||||||
|
// t.WorkerCall (storiface.CallID) (struct)
|
||||||
|
case "WorkerCall":
|
||||||
|
|
||||||
|
{
|
||||||
|
|
||||||
|
if err := t.WorkerCall.UnmarshalCBOR(br); err != nil {
|
||||||
|
return xerrors.Errorf("unmarshaling t.WorkerCall: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// t.WorkError (string) (string)
|
||||||
|
case "WorkError":
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadStringBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
t.WorkError = string(sval)
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
111
extern/sector-storage/manager.go
vendored
111
extern/sector-storage/manager.go
vendored
@ -130,11 +130,11 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
|
|||||||
|
|
||||||
Prover: prover,
|
Prover: prover,
|
||||||
|
|
||||||
work: mss,
|
work: mss,
|
||||||
workWait: map[workID]*sync.Cond{},
|
|
||||||
callToWork: map[storiface.CallID]workID{},
|
callToWork: map[storiface.CallID]workID{},
|
||||||
results: map[workID]result{},
|
callRes: map[storiface.CallID]chan result{},
|
||||||
waitRes: map[workID]chan struct{}{},
|
results: map[workID]result{},
|
||||||
|
waitRes: map[workID]chan struct{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: remove all non-running work from the work tracker
|
// TODO: remove all non-running work from the work tracker
|
||||||
@ -223,14 +223,14 @@ func schedNop(context.Context, Worker) error {
|
|||||||
|
|
||||||
func (m *Manager) schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
|
func (m *Manager) schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
|
||||||
return func(ctx context.Context, worker Worker) error {
|
return func(ctx context.Context, worker Worker) error {
|
||||||
_, err := m.startWork(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
|
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error {
|
func (m *Manager) readPiece(sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, rok *bool) func(ctx context.Context, w Worker) error {
|
||||||
return func(ctx context.Context, w Worker) error {
|
return func(ctx context.Context, w Worker) error {
|
||||||
r, err := m.startWork(ctx)(w.ReadPiece(ctx, sink, sector, offset, size))
|
r, err := m.waitSimpleCall(ctx)(w.ReadPiece(ctx, sink, sector, offset, size))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -263,7 +263,7 @@ func (m *Manager) tryReadUnsealedPiece(ctx context.Context, sink io.Writer, sect
|
|||||||
|
|
||||||
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
|
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.startWork, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
|
||||||
m.readPiece(sink, sector, offset, size, &readOk))
|
m.readPiece(sink, sector, offset, size, &readOk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err)
|
returnErr = xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||||
@ -290,12 +290,12 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
unsealFetch := func(ctx context.Context, worker Worker) error {
|
unsealFetch := func(ctx context.Context, worker Worker) error {
|
||||||
if _, err := m.startWork(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
|
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy)); err != nil {
|
||||||
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if foundUnsealed {
|
if foundUnsealed {
|
||||||
if _, err := m.startWork(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil {
|
if _, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove)); err != nil {
|
||||||
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -306,7 +306,8 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size)
|
return xerrors.Errorf("cannot unseal piece (sector: %d, offset: %d size: %d) - unsealed cid is undefined", sector, offset, size)
|
||||||
}
|
}
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTUnseal, selector, unsealFetch, func(ctx context.Context, w Worker) error {
|
||||||
_, err := m.startWork(ctx)(w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed))
|
// TODO: make restartable
|
||||||
|
_, err := m.waitSimpleCall(ctx)(w.UnsealPiece(ctx, sector, offset, size, ticket, unsealed))
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -315,7 +316,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
|
|
||||||
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
|
selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(m.startWork, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, m.schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove),
|
||||||
m.readPiece(sink, sector, offset, size, &readOk))
|
m.readPiece(sink, sector, offset, size, &readOk))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
return xerrors.Errorf("reading piece from sealed sector: %w", err)
|
||||||
@ -351,7 +352,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
|||||||
|
|
||||||
var out abi.PieceInfo
|
var out abi.PieceInfo
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error {
|
||||||
p, err := m.startWork(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r))
|
p, err := m.waitSimpleCall(ctx)(w.AddPiece(ctx, sector, existingPieces, sz, r))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -398,8 +399,8 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
waitRes()
|
|
||||||
|
|
||||||
|
waitRes()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -410,18 +411,38 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
|
|||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
wk, wait, err := m.getWork(ctx, "PreCommit2", sector, phase1Out)
|
||||||
|
if err != nil {
|
||||||
|
return storage.SectorCids{}, xerrors.Errorf("getWork: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
waitRes := func() {
|
||||||
|
p, werr := m.waitWork(ctx, wk)
|
||||||
|
if werr != nil {
|
||||||
|
err = werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
out = p.(storage.SectorCids)
|
||||||
|
}
|
||||||
|
|
||||||
|
if wait { // already in progress
|
||||||
|
waitRes()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil {
|
if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil {
|
||||||
return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true)
|
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
p, err := m.startWork(ctx)(w.SealPreCommit2(ctx, sector, phase1Out))
|
err := m.startWork(ctx, wk)(w.SealPreCommit2(ctx, sector, phase1Out))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
out = p.(storage.SectorCids)
|
|
||||||
|
waitRes()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return out, err
|
return out, err
|
||||||
@ -431,6 +452,25 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
|
|||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
wk, wait, err := m.getWork(ctx, "Commit1", sector, ticket, seed, pieces, cids)
|
||||||
|
if err != nil {
|
||||||
|
return storage.Commit1Out{}, xerrors.Errorf("getWork: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
waitRes := func() {
|
||||||
|
p, werr := m.waitWork(ctx, wk)
|
||||||
|
if werr != nil {
|
||||||
|
err = werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
out = p.(storage.Commit1Out)
|
||||||
|
}
|
||||||
|
|
||||||
|
if wait { // already in progress
|
||||||
|
waitRes()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil {
|
if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil {
|
||||||
return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
@ -440,26 +480,47 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
|
|||||||
// generally very cheap / fast, and transferring data is not worth the effort
|
// generally very cheap / fast, and transferring data is not worth the effort
|
||||||
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
|
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, m.schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
|
||||||
p, err := m.startWork(ctx)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
|
err := m.startWork(ctx, wk)(w.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
out = p.(storage.Commit1Out)
|
|
||||||
|
waitRes()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return out, err
|
return out, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) {
|
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) {
|
||||||
|
wk, wait, err := m.getWork(ctx, "Commit2", sector, phase1Out)
|
||||||
|
if err != nil {
|
||||||
|
return storage.Proof{}, xerrors.Errorf("getWork: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
waitRes := func() {
|
||||||
|
p, werr := m.waitWork(ctx, wk)
|
||||||
|
if werr != nil {
|
||||||
|
err = werr
|
||||||
|
return
|
||||||
|
}
|
||||||
|
out = p.(storage.Proof)
|
||||||
|
}
|
||||||
|
|
||||||
|
if wait { // already in progress
|
||||||
|
waitRes()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
selector := newTaskSelector()
|
selector := newTaskSelector()
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
|
||||||
p, err := m.startWork(ctx)(w.SealCommit2(ctx, sector, phase1Out))
|
err := m.startWork(ctx, wk)(w.SealCommit2(ctx, sector, phase1Out))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
out = p.(storage.Proof)
|
|
||||||
|
waitRes()
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -489,9 +550,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
|
|||||||
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
|
selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||||
schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
_, err := m.startWork(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
_, err := m.waitSimpleCall(ctx)(w.FinalizeSector(ctx, sector, keepUnsealed))
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -507,9 +568,9 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
||||||
schedFetch(m.startWork, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
_, err := m.startWork(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed))
|
_, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed))
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
27
extern/sector-storage/manager_calltracker.go
vendored
27
extern/sector-storage/manager_calltracker.go
vendored
@ -2,11 +2,11 @@ package sectorstorage
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
)
|
)
|
||||||
@ -16,7 +16,7 @@ type workID struct {
|
|||||||
Params string // json [...params]
|
Params string // json [...params]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *workID) String() string {
|
func (w workID) String() string {
|
||||||
return fmt.Sprintf("%s(%s)", w.Method, w.Params)
|
return fmt.Sprintf("%s(%s)", w.Method, w.Params)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,16 +36,17 @@ type WorkState struct {
|
|||||||
WorkError string // Status = wsDone, set when failed to start work
|
WorkError string // Status = wsDone, set when failed to start work
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkState) UnmarshalCBOR(reader io.Reader) error {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func newWorkID(method string, params ...interface{}) (workID, error) {
|
func newWorkID(method string, params ...interface{}) (workID, error) {
|
||||||
pb, err := json.Marshal(params)
|
pb, err := json.Marshal(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return workID{}, xerrors.Errorf("marshaling work params: %w", err)
|
return workID{}, xerrors.Errorf("marshaling work params: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(pb) > 256 {
|
||||||
|
s := sha256.Sum256(pb)
|
||||||
|
pb = s[:]
|
||||||
|
}
|
||||||
|
|
||||||
return workID{
|
return workID{
|
||||||
Method: method,
|
Method: method,
|
||||||
Params: string(pb),
|
Params: string(pb),
|
||||||
@ -68,7 +69,7 @@ func (m *Manager) getWork(ctx context.Context, method string, params ...interfac
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !have {
|
if !have {
|
||||||
err := m.work.Begin(wid, WorkState{
|
err := m.work.Begin(wid, &WorkState{
|
||||||
Status: wsStarted,
|
Status: wsStarted,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -194,6 +195,16 @@ func (m *Manager) waitWork(ctx context.Context, wid workID) (interface{}, error)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) waitSimpleCall(ctx context.Context) func(callID storiface.CallID, err error) (interface{}, error) {
|
||||||
|
return func(callID storiface.CallID, err error) (interface{}, error) {
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.waitCall(ctx, callID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interface{}, error) {
|
func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interface{}, error) {
|
||||||
m.workLk.Lock()
|
m.workLk.Lock()
|
||||||
_, ok := m.callToWork[callID]
|
_, ok := m.callToWork[callID]
|
||||||
@ -204,7 +215,7 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf
|
|||||||
|
|
||||||
ch, ok := m.callRes[callID]
|
ch, ok := m.callRes[callID]
|
||||||
if !ok {
|
if !ok {
|
||||||
ch = make(chan result)
|
ch = make(chan result, 1)
|
||||||
m.callRes[callID] = ch
|
m.callRes[callID] = ch
|
||||||
}
|
}
|
||||||
m.workLk.Unlock()
|
m.workLk.Unlock()
|
||||||
|
21
extern/sector-storage/manager_test.go
vendored
21
extern/sector-storage/manager_test.go
vendored
@ -11,17 +11,19 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
logging "github.com/ipfs/go-log"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/go-statestore"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
logging "github.com/ipfs/go-log"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -111,8 +113,11 @@ func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *st
|
|||||||
|
|
||||||
Prover: prover,
|
Prover: prover,
|
||||||
|
|
||||||
results: map[storiface.CallID]result{},
|
work: statestore.New(datastore.NewMapDatastore()),
|
||||||
waitRes: map[storiface.CallID]chan struct{}{},
|
callToWork: map[storiface.CallID]workID{},
|
||||||
|
callRes: map[storiface.CallID]chan result{},
|
||||||
|
results: map[workID]result{},
|
||||||
|
waitRes: map[workID]chan struct{}{},
|
||||||
}
|
}
|
||||||
|
|
||||||
go m.sched.runSched()
|
go m.sched.runSched()
|
||||||
|
142
extern/sector-storage/storiface/cbor_gen.go
vendored
Normal file
142
extern/sector-storage/storiface/cbor_gen.go
vendored
Normal file
@ -0,0 +1,142 @@
|
|||||||
|
// Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT.
|
||||||
|
|
||||||
|
package storiface
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
cbg "github.com/whyrusleeping/cbor-gen"
|
||||||
|
xerrors "golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = xerrors.Errorf
|
||||||
|
|
||||||
|
func (t *CallID) MarshalCBOR(w io.Writer) error {
|
||||||
|
if t == nil {
|
||||||
|
_, err := w.Write(cbg.CborNull)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.Write([]byte{162}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
scratch := make([]byte, 9)
|
||||||
|
|
||||||
|
// t.Sector (abi.SectorID) (struct)
|
||||||
|
if len("Sector") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"Sector\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("Sector"))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.WriteString(w, string("Sector")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := t.Sector.MarshalCBOR(w); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// t.ID (uuid.UUID) (array)
|
||||||
|
if len("ID") > cbg.MaxLength {
|
||||||
|
return xerrors.Errorf("Value in field \"ID\" was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("ID"))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := io.WriteString(w, string("ID")); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(t.ID) > cbg.ByteArrayMaxLen {
|
||||||
|
return xerrors.Errorf("Byte array in field t.ID was too long")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajByteString, uint64(len(t.ID))); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := w.Write(t.ID[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *CallID) UnmarshalCBOR(r io.Reader) error {
|
||||||
|
*t = CallID{}
|
||||||
|
|
||||||
|
br := cbg.GetPeeker(r)
|
||||||
|
scratch := make([]byte, 8)
|
||||||
|
|
||||||
|
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if maj != cbg.MajMap {
|
||||||
|
return fmt.Errorf("cbor input should be of type map")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra > cbg.MaxLength {
|
||||||
|
return fmt.Errorf("CallID: map struct too large (%d)", extra)
|
||||||
|
}
|
||||||
|
|
||||||
|
var name string
|
||||||
|
n := extra
|
||||||
|
|
||||||
|
for i := uint64(0); i < n; i++ {
|
||||||
|
|
||||||
|
{
|
||||||
|
sval, err := cbg.ReadStringBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
name = string(sval)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch name {
|
||||||
|
// t.Sector (abi.SectorID) (struct)
|
||||||
|
case "Sector":
|
||||||
|
|
||||||
|
{
|
||||||
|
|
||||||
|
if err := t.Sector.UnmarshalCBOR(br); err != nil {
|
||||||
|
return xerrors.Errorf("unmarshaling t.Sector: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
// t.ID (uuid.UUID) (array)
|
||||||
|
case "ID":
|
||||||
|
|
||||||
|
maj, extra, err = cbg.CborReadHeaderBuf(br, scratch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra > cbg.ByteArrayMaxLen {
|
||||||
|
return fmt.Errorf("t.ID: byte array too large (%d)", extra)
|
||||||
|
}
|
||||||
|
if maj != cbg.MajByteString {
|
||||||
|
return fmt.Errorf("expected byte array")
|
||||||
|
}
|
||||||
|
|
||||||
|
if extra != 16 {
|
||||||
|
return fmt.Errorf("expected array to have 16 elements")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.ID = [16]uint8{}
|
||||||
|
|
||||||
|
if _, err := io.ReadFull(br, t.ID[:]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unknown struct field %d: '%s'", i, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
11
gen/main.go
11
gen/main.go
@ -2,7 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
gen "github.com/whyrusleeping/cbor-gen"
|
gen "github.com/whyrusleeping/cbor-gen"
|
||||||
@ -10,6 +9,8 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/api"
|
"github.com/filecoin-project/lotus/api"
|
||||||
"github.com/filecoin-project/lotus/chain/exchange"
|
"github.com/filecoin-project/lotus/chain/exchange"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
"github.com/filecoin-project/lotus/paychmgr"
|
"github.com/filecoin-project/lotus/paychmgr"
|
||||||
)
|
)
|
||||||
@ -75,6 +76,14 @@ func main() {
|
|||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = gen.WriteMapEncodersToFile("./extern/sector-storage/storiface/cbor_gen.go", "storiface",
|
||||||
|
storiface.CallID{},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
err = gen.WriteMapEncodersToFile("./extern/sector-storage/cbor_gen.go", "sectorstorage",
|
err = gen.WriteMapEncodersToFile("./extern/sector-storage/cbor_gen.go", "sectorstorage",
|
||||||
sectorstorage.Call{},
|
sectorstorage.Call{},
|
||||||
sectorstorage.WorkState{},
|
sectorstorage.WorkState{},
|
||||||
|
@ -23,7 +23,8 @@
|
|||||||
"AddSigner",
|
"AddSigner",
|
||||||
"RemoveSigner",
|
"RemoveSigner",
|
||||||
"SwapSigner",
|
"SwapSigner",
|
||||||
"ChangeNumApprovalsThreshold"
|
"ChangeNumApprovalsThreshold",
|
||||||
|
"LockBalance"
|
||||||
],
|
],
|
||||||
"fil/1/paymentchannel": [
|
"fil/1/paymentchannel": [
|
||||||
"Send",
|
"Send",
|
||||||
|
@ -517,13 +517,15 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S
|
|||||||
}
|
}
|
||||||
|
|
||||||
var WorkerCallsPrefix = datastore.NewKey("/worker/calls")
|
var WorkerCallsPrefix = datastore.NewKey("/worker/calls")
|
||||||
|
var ManagerWorkPrefix = datastore.NewKey("/stmgr/calls")
|
||||||
|
|
||||||
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
|
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *ffiwrapper.Config, sc sectorstorage.SealerConfig, urls sectorstorage.URLs, sa sectorstorage.StorageAuth, ds dtypes.MetadataDS) (*sectorstorage.Manager, error) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
|
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
|
||||||
|
smsts := statestore.New(namespace.Wrap(ds, ManagerWorkPrefix))
|
||||||
|
|
||||||
sst, err := sectorstorage.New(ctx, ls, si, cfg, sc, urls, sa, wsts)
|
sst, err := sectorstorage.New(ctx, ls, si, cfg, sc, urls, sa, wsts, smsts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user