storage: Fix build

This commit is contained in:
Łukasz Magiera 2020-09-07 16:12:46 +02:00
parent 06e3852cef
commit 9e6f974f3c
8 changed files with 72 additions and 58 deletions

View File

@ -6,11 +6,10 @@ import (
"github.com/mitchellh/go-homedir" "github.com/mitchellh/go-homedir"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
type worker struct { type worker struct {
@ -43,4 +42,4 @@ func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
return nil return nil
} }
var _ storage.Sealer = &worker{} var _ storiface.WorkerCalls = &worker{}

View File

@ -52,6 +52,7 @@ type SectorManager interface {
ffiwrapper.StorageSealer ffiwrapper.StorageSealer
storage.Prover storage.Prover
storiface.WorkerReturn
FaultTracker FaultTracker
} }
@ -179,7 +180,8 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
m.sched.newWorkers <- &workerHandle{ m.sched.newWorkers <- &workerHandle{
w: w, w: w,
wt: &workTracker{ wt: &workTracker{
running: map[uint64]storiface.WorkerJob{}, done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]storiface.WorkerJob{},
}, },
info: info, info: info,
preparing: &activeResources{}, preparing: &activeResources{},

View File

@ -166,7 +166,8 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
sched.newWorkers <- &workerHandle{ sched.newWorkers <- &workerHandle{
w: w, w: w,
wt: &workTracker{ wt: &workTracker{
running: map[uint64]storiface.WorkerJob{}, done: map[storiface.CallID]struct{}{},
running: map[storiface.CallID]storiface.WorkerJob{},
}, },
info: info, info: info,
preparing: &activeResources{}, preparing: &activeResources{},

View File

@ -36,7 +36,7 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
for wi, window := range handle.activeWindows { for wi, window := range handle.activeWindows {
for _, request := range window.todo { for _, request := range window.todo {
out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{ out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{
ID: 0, ID: storiface.UndefCall,
Sector: request.sector, Sector: request.sector,
Task: request.taskType, Task: request.taskType,
RunWait: wi + 1, RunWait: wi + 1,

View File

@ -40,7 +40,7 @@ type WorkerStats struct {
} }
type WorkerJob struct { type WorkerJob struct {
ID uint64 ID CallID
Sector abi.SectorID Sector abi.SectorID
Task sealtasks.TaskType Task sealtasks.TaskType

View File

@ -18,31 +18,58 @@ import (
type workTracker struct { type workTracker struct {
lk sync.Mutex lk sync.Mutex
ctr uint64 done map[storiface.CallID]struct{}
running map[uint64]storiface.WorkerJob running map[storiface.CallID]storiface.WorkerJob
// TODO: done, aggregate stats, queue stats, scheduler feedback // TODO: done, aggregate stats, queue stats, scheduler feedback
} }
func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func() { // TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
// TODO: CALL THIS!
func (wt *workTracker) onDone(callID storiface.CallID) {
wt.lk.Lock() wt.lk.Lock()
defer wt.lk.Unlock() defer wt.lk.Unlock()
id := wt.ctr _, ok := wt.running[callID]
wt.ctr++ if !ok {
wt.done[callID] = struct{}{}
return
}
wt.running[id] = storiface.WorkerJob{ delete(wt.running, callID)
ID: id, }
func (wt *workTracker) track(sid abi.SectorID, task sealtasks.TaskType) func(storiface.CallID, error) (storiface.CallID, error) {
return func(callID storiface.CallID, err error) (storiface.CallID, error) {
if err != nil {
return callID, err
}
wt.lk.Lock()
defer wt.lk.Unlock()
_, done := wt.done[callID]
if done {
delete(wt.done, callID)
return callID, err
}
wt.running[callID] = storiface.WorkerJob{
ID: callID,
Sector: sid, Sector: sid,
Task: task, Task: task,
Start: time.Now(), Start: time.Now(),
} }
return func() { return callID, err
wt.lk.Lock()
defer wt.lk.Unlock()
delete(wt.running, id)
} }
} }
@ -71,58 +98,40 @@ type trackedWorker struct {
tracker *workTracker tracker *workTracker
} }
func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { func (t *trackedWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
defer t.tracker.track(sector, sealtasks.TTPreCommit1)() return t.tracker.track(sector, sealtasks.TTPreCommit1)(t.Worker.SealPreCommit1(ctx, sector, ticket, pieces))
return t.Worker.SealPreCommit1(ctx, sector, ticket, pieces)
} }
func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) { func (t *trackedWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
defer t.tracker.track(sector, sealtasks.TTPreCommit2)() return t.tracker.track(sector, sealtasks.TTPreCommit2)(t.Worker.SealPreCommit2(ctx, sector, pc1o))
return t.Worker.SealPreCommit2(ctx, sector, pc1o)
} }
func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { func (t *trackedWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
defer t.tracker.track(sector, sealtasks.TTCommit1)() return t.tracker.track(sector, sealtasks.TTCommit1)(t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids))
return t.Worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
} }
func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) { func (t *trackedWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
defer t.tracker.track(sector, sealtasks.TTCommit2)() return t.tracker.track(sector, sealtasks.TTCommit2)(t.Worker.SealCommit2(ctx, sector, c1o))
return t.Worker.SealCommit2(ctx, sector, c1o)
} }
func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { func (t *trackedWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
defer t.tracker.track(sector, sealtasks.TTFinalize)() return t.tracker.track(sector, sealtasks.TTFinalize)(t.Worker.FinalizeSector(ctx, sector, keepUnsealed))
return t.Worker.FinalizeSector(ctx, sector, keepUnsealed)
} }
func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
defer t.tracker.track(sector, sealtasks.TTAddPiece)() return t.tracker.track(sector, sealtasks.TTAddPiece)(t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData))
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
} }
func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) error { func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
defer t.tracker.track(s, sealtasks.TTFetch)() return t.tracker.track(s, sealtasks.TTFetch)(t.Worker.Fetch(ctx, s, ft, ptype, am))
return t.Worker.Fetch(ctx, s, ft, ptype, am)
} }
func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { func (t *trackedWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
defer t.tracker.track(id, sealtasks.TTUnseal)() return t.tracker.track(id, sealtasks.TTUnseal)(t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid))
return t.Worker.UnsealPiece(ctx, id, index, size, randomness, cid)
} }
func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { func (t *trackedWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
defer t.tracker.track(id, sealtasks.TTReadUnsealed)() return t.tracker.track(id, sealtasks.TTReadUnsealed)(t.Worker.ReadPiece(ctx, writer, id, index, size))
return t.Worker.ReadPiece(ctx, writer, id, index, size)
} }
var _ Worker = &trackedWorker{} var _ Worker = &trackedWorker{}

View File

@ -43,6 +43,7 @@ import (
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing" sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/lib/blockstore" "github.com/filecoin-project/lotus/lib/blockstore"
"github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/lib/peermgr"
@ -298,6 +299,7 @@ func Online() Option {
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))), Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
Override(new(storage2.Prover), From(new(sectorstorage.SectorManager))), Override(new(storage2.Prover), From(new(sectorstorage.SectorManager))),
Override(new(storiface.WorkerReturn), From(new(sectorstorage.SectorManager))),
Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks), Override(new(*sectorblocks.SectorBlocks), sectorblocks.NewSectorBlocks),
Override(new(*storage.Miner), modules.StorageMiner(config.DefaultStorageMiner().Fees)), Override(new(*storage.Miner), modules.StorageMiner(config.DefaultStorageMiner().Fees)),

View File

@ -53,6 +53,7 @@ type StorageMinerAPI struct {
StorageMgr *sectorstorage.Manager `optional:"true"` StorageMgr *sectorstorage.Manager `optional:"true"`
IStorageMgr sectorstorage.SectorManager IStorageMgr sectorstorage.SectorManager
*stores.Index *stores.Index
storiface.WorkerReturn
DataTransfer dtypes.ProviderDataTransfer DataTransfer dtypes.ProviderDataTransfer
Host host.Host Host host.Host