diff --git a/ffiwrapper/sealer_cgo.go b/ffiwrapper/sealer_cgo.go index 6764c3d96..f90a6020e 100644 --- a/ffiwrapper/sealer_cgo.go +++ b/ffiwrapper/sealer_cgo.go @@ -108,33 +108,48 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie }, werr() } +type closerFunc func() error + +func (cf closerFunc) Close() error { + return cf() +} + func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealedCID cid.Cid) (io.ReadCloser, error) { - path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTUnsealed, false) - if err != nil { - return nil, xerrors.Errorf("acquire unsealed sector path: %w", err) - } - defer doneUnsealed() - f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644) - if err == nil { - if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { - return nil, xerrors.Errorf("seek: %w", err) + { + path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false) + if err != nil { + return nil, xerrors.Errorf("acquire unsealed sector path: %w", err) } - lr := io.LimitReader(f, int64(size)) + f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644) + if err == nil { + if _, err := f.Seek(int64(offset), io.SeekStart); err != nil { + doneUnsealed() + return nil, xerrors.Errorf("seek: %w", err) + } - return &struct { - io.Reader - io.Closer - }{ - Reader: lr, - Closer: f, - }, nil - } - if !os.IsNotExist(err) { - return nil, err + lr := io.LimitReader(f, int64(size)) + + return &struct { + io.Reader + io.Closer + }{ + Reader: lr, + Closer: closerFunc(func() error { + doneUnsealed() + return f.Close() + }), + }, nil + } + + doneUnsealed() + + if !os.IsNotExist(err) { + return nil, err + } } - sealed, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed|stores.FTCache, 0, false) + paths, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed, false) if err != nil { return nil, xerrors.Errorf("acquire sealed/cache sector path: %w", err) } @@ -145,9 +160,9 @@ func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.Sect // remove last used one (or use whatever other cache policy makes sense)) err = ffi.Unseal( sb.sealProofType, - sealed.Cache, - sealed.Sealed, - path.Unsealed, + paths.Cache, + paths.Sealed, + paths.Unsealed, sector.Number, sector.Miner, ticket, @@ -157,7 +172,7 @@ func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.Sect return nil, xerrors.Errorf("unseal failed: %w", err) } - f, err = os.OpenFile(string(path.Unsealed), os.O_RDONLY, 0644) + f, err := os.OpenFile(paths.Unsealed, os.O_RDONLY, 0644) if err != nil { return nil, err } diff --git a/request_queue.go b/request_queue.go new file mode 100644 index 000000000..09ca7ae3f --- /dev/null +++ b/request_queue.go @@ -0,0 +1,36 @@ +package sectorstorage + +import "container/heap" + +type requestQueue []*workerRequest + +func (q requestQueue) Len() int { return len(q) } + +func (q requestQueue) Less(i, j int) bool { + return q[i].taskType.Less(q[j].taskType) +} + +func (q requestQueue) Swap(i, j int) { + q[i], q[j] = q[j], q[i] + q[i].index = i + q[j].index = j +} + +func (q *requestQueue) Push(x interface{}) { + n := len(*q) + item := x.(*workerRequest) + item.index = n + *q = append(*q, item) +} + +func (q *requestQueue) Pop() interface{} { + old := *q + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *q = old[0 : n-1] + return item +} + +var _ heap.Interface = &requestQueue{} diff --git a/request_queue_test.go b/request_queue_test.go new file mode 100644 index 000000000..9bf231e39 --- /dev/null +++ b/request_queue_test.go @@ -0,0 +1,30 @@ +package sectorstorage + +import ( + "container/heap" + "testing" + + "github.com/filecoin-project/sector-storage/sealtasks" +) + +func TestRequestQueue(t *testing.T) { + rq := &requestQueue{} + + heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece}) + heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1}) + heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit2}) + heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1}) + heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece}) + + pt := heap.Pop(rq).(*workerRequest) + + if pt.taskType != sealtasks.TTPreCommit2 { + t.Error("expected precommit2, got", pt.taskType) + } + + pt = heap.Pop(rq).(*workerRequest) + + if pt.taskType != sealtasks.TTPreCommit1 { + t.Error("expected precommit1, got", pt.taskType) + } +} diff --git a/resources.go b/resources.go index afe4c166c..87058e80a 100644 --- a/resources.go +++ b/resources.go @@ -73,7 +73,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{ sealtasks.TTPreCommit1: { abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ MaxMemory: 64 << 30, - MinMemory: 32 << 30, + MinMemory: 48 << 30, Threads: 1, @@ -106,10 +106,11 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{ }, sealtasks.TTPreCommit2: { abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ - MaxMemory: 96 << 30, - MinMemory: 64 << 30, + MaxMemory: 32 << 30, + MinMemory: 32 << 30, Threads: -1, + CanGPU: true, BaseMinMemory: 30 << 30, }, @@ -172,9 +173,9 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{ BaseMinMemory: 8 << 20, }, }, - sealtasks.TTCommit2: { // TODO: Measure more accurately + sealtasks.TTCommit2: { abi.RegisteredProof_StackedDRG32GiBSeal: Resources{ - MaxMemory: 110 << 30, + MaxMemory: 130 << 30, MinMemory: 60 << 30, Threads: -1, diff --git a/sched.go b/sched.go index d7745da36..019febda4 100644 --- a/sched.go +++ b/sched.go @@ -1,7 +1,7 @@ package sectorstorage import ( - "container/list" + "container/heap" "context" "sort" "sync" @@ -41,7 +41,7 @@ type scheduler struct { workerFree chan WorkerID closing chan struct{} - schedQueue *list.List // List[*workerRequest] + schedQueue *requestQueue } func newScheduler(spt abi.RegisteredProof) *scheduler { @@ -60,7 +60,7 @@ func newScheduler(spt abi.RegisteredProof) *scheduler { workerFree: make(chan WorkerID), closing: make(chan struct{}), - schedQueue: list.New(), + schedQueue: &requestQueue{}, } } @@ -101,6 +101,8 @@ type workerRequest struct { prepare WorkerAction work WorkerAction + index int // The index of the item in the heap. + ret chan<- workerResponse ctx context.Context } @@ -154,7 +156,7 @@ func (sh *scheduler) runSched() { continue } - sh.schedQueue.PushBack(req) + heap.Push(sh.schedQueue, req) case wid := <-sh.workerFree: sh.onWorkerFreed(wid) case <-sh.closing: @@ -173,8 +175,8 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) { return } - for e := sh.schedQueue.Front(); e != nil; e = e.Next() { - req := e.Value.(*workerRequest) + for i := 0; i < sh.schedQueue.Len(); i++ { + req := (*sh.schedQueue)[i] ok, err := req.sel.Ok(req.ctx, req.taskType, w) if err != nil { @@ -193,15 +195,8 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) { } if scheduled { - pe := e.Prev() - sh.schedQueue.Remove(e) - if pe == nil { - pe = sh.schedQueue.Front() - } - if pe == nil { - break - } - e = pe + heap.Remove(sh.schedQueue, i) + i-- continue } } diff --git a/sealtasks/task.go b/sealtasks/task.go index 8fbe7a7b4..0a94d2c04 100644 --- a/sealtasks/task.go +++ b/sealtasks/task.go @@ -13,3 +13,17 @@ const ( TTFetch TaskType = "seal/v0/fetch" ) + +var order = map[TaskType]int{ + TTAddPiece: 7, + TTPreCommit1: 6, + TTPreCommit2: 5, + TTCommit2: 4, + TTCommit1: 3, + TTFetch: 2, + TTFinalize: 1, +} + +func (a TaskType) Less(b TaskType) bool { + return order[a] < order[b] +}