diff --git a/cmd/lotus-seal-worker/rpc.go b/cmd/lotus-seal-worker/rpc.go index 5380fe432..3c0a1f2ce 100644 --- a/cmd/lotus-seal-worker/rpc.go +++ b/cmd/lotus-seal-worker/rpc.go @@ -6,11 +6,10 @@ import ( "github.com/mitchellh/go-homedir" "golang.org/x/xerrors" - "github.com/filecoin-project/specs-storage/storage" - "github.com/filecoin-project/lotus/build" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/stores" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) type worker struct { @@ -43,4 +42,4 @@ func (w *worker) StorageAddLocal(ctx context.Context, path string) error { return nil } -var _ storage.Sealer = &worker{} +var _ storiface.WorkerCalls = &worker{} diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index da700cdc0..8891d83ab 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -52,6 +52,7 @@ type SectorManager interface { ffiwrapper.StorageSealer storage.Prover + storiface.WorkerReturn FaultTracker } @@ -70,13 +71,13 @@ type Manager struct { storage.Prover - resLk sync.Mutex + resLk sync.Mutex results map[storiface.CallID]result waitRes map[storiface.CallID]chan struct{} } type result struct { - r interface{} + r interface{} err error } @@ -179,7 +180,8 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error { m.sched.newWorkers <- &workerHandle{ w: w, wt: &workTracker{ - running: map[uint64]storiface.WorkerJob{}, + done: map[storiface.CallID]struct{}{}, + running: map[storiface.CallID]storiface.WorkerJob{}, }, info: info, preparing: &activeResources{}, diff --git a/extern/sector-storage/sched_test.go b/extern/sector-storage/sched_test.go index 834a9f6dd..a95d4b3fa 100644 --- a/extern/sector-storage/sched_test.go +++ b/extern/sector-storage/sched_test.go @@ -166,7 +166,8 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str sched.newWorkers <- &workerHandle{ w: w, wt: &workTracker{ - running: map[uint64]storiface.WorkerJob{}, + done: map[storiface.CallID]struct{}{}, + running: map[storiface.CallID]storiface.WorkerJob{}, }, info: info, preparing: &activeResources{}, diff --git a/extern/sector-storage/stats.go b/extern/sector-storage/stats.go index 7f95e3bc3..f9063cbec 100644 --- a/extern/sector-storage/stats.go +++ b/extern/sector-storage/stats.go @@ -36,7 +36,7 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob { for wi, window := range handle.activeWindows { for _, request := range window.todo { out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{ - ID: 0, + ID: storiface.UndefCall, Sector: request.sector, Task: request.taskType, RunWait: wi + 1, diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index dac22aba0..839ac14ad 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -40,7 +40,7 @@ type WorkerStats struct { } type WorkerJob struct { - ID uint64 + ID CallID Sector abi.SectorID Task sealtasks.TaskType diff --git a/extern/sector-storage/work_tracker.go b/extern/sector-storage/work_tracker.go index 53f79af90..57e136af0 100644 --- a/extern/sector-storage/work_tracker.go +++ b/extern/sector-storage/work_tracker.go @@ -18,31 +18,58 @@ import ( type workTracker struct { lk sync.Mutex - ctr uint64 - running map[uint64]storiface.WorkerJob + done map[storiface.CallID]struct{} + running map[storiface.CallID]storiface.WorkerJob // TODO: done, aggregate stats, queue stats, scheduler feedback } -func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func() { +// TODO: CALL THIS! +// TODO: CALL THIS! +// TODO: CALL THIS! +// TODO: CALL THIS! +// TODO: CALL THIS! +// TODO: CALL THIS! +// TODO: CALL THIS! +// TODO: CALL THIS! +// TODO: CALL THIS! +// TODO: CALL THIS! +func (wt *workTracker) onDone(callID storiface.CallID) { wt.lk.Lock() defer wt.lk.Unlock() - id := wt.ctr - wt.ctr++ - - wt.running[id] = storiface.WorkerJob{ - ID: id, - Sector: sid, - Task: task, - Start: time.Now(), + _, ok := wt.running[callID] + if !ok { + wt.done[callID] = struct{}{} + return } - return func() { + delete(wt.running, callID) +} + +func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) { + return func(callID storiface.CallID, err error) (storiface.CallID, error) { + if err != nil { + return callID, err + } + wt.lk.Lock() defer wt.lk.Unlock() - delete(wt.running, id) + _, done := wt.done[callID] + if done { + delete(wt.done, callID) + return callID, err + } + + wt.running[callID] = storiface.WorkerJob{ + ID: callID, + Sector: sid, + Task: task, + Start: time.Now(), + } + + return callID, err } } @@ -71,58 +98,40 @@ type trackedWorker struct { tracker *workTracker } -func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { - defer t.tracker.track(sector, sealtasks.TTPreCommit1)() - - return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces) +func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) { + return t.tracker.track(sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)) } -func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) { - defer t.tracker.track(sector, sealtasks.TTPreCommit2)() - - return t.Worker.SealPreCommit2(ctx, sector, pc1o) +func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) { + return t.tracker.track(sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o)) } -func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { - defer t.tracker.track(sector, sealtasks.TTCommit1)() - - return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids) +func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) { + return t.tracker.track(sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)) } -func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) { - defer t.tracker.track(sector, sealtasks.TTCommit2)() - - return t.Worker.SealCommit2(ctx, sector, c1o) +func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) { + return t.tracker.track(sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o)) } -func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { - defer t.tracker.track(sector, sealtasks.TTFinalize)() - - return t.Worker.FinalizeSector(ctx, sector, keepUnsealed) +func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) { + return t.tracker.track(sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed)) } -func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { - defer t.tracker.track(sector, sealtasks.TTAddPiece)() - - return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) +func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { + return t.tracker.track(sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)) } -func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) error { - defer t.tracker.track(s, sealtasks.TTFetch)() - - return t.Worker.Fetch(ctx, s, ft, ptype, am) +func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { + return t.tracker.track(s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am)) } -func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { - defer t.tracker.track(id, sealtasks.TTUnseal)() - - return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid) +func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) { + return t.tracker.track(id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)) } -func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { - defer t.tracker.track(id, sealtasks.TTReadUnsealed)() - - return t.Worker.ReadPiece(ctx, writer, id, index, size) +func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) { + return t.tracker.track(id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size)) } var _ Worker = &trackedWorker{} diff --git a/node/builder.go b/node/builder.go index 5b6966cd4..5c21a155d 100644 --- a/node/builder.go +++ b/node/builder.go @@ -43,6 +43,7 @@ import ( sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/stores" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/lib/peermgr" @@ -298,6 +299,7 @@ func Online() Option { Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), Override(new(storage2.Prover), From(new(sectorstorage.SectorManager))), + Override(new(storiface.WorkerReturn), From(new(sectorstorage.SectorManager))), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*storage.Miner), modules.StorageMiner(config.DefaultStorageMiner().Fees)), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index c688ff677..73e8eea06 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -53,6 +53,7 @@ type StorageMinerAPI struct { StorageMgr *sectorstorage.Manager `optional:"true"` IStorageMgr sectorstorage.SectorManager *stores.Index + storiface.WorkerReturn DataTransfer dtypes.ProviderDataTransfer Host host.Host