Merge pull request #27 from filecoin-project/fix/unseal

Fix unseal; some improvements
This commit is contained in:
Łukasz Magiera 2020-05-08 02:05:08 +02:00 committed by GitHub
commit 8c47b13d9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 136 additions and 45 deletions

View File

@ -108,33 +108,48 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
}, werr()
}
type closerFunc func() error
func (cf closerFunc) Close() error {
return cf()
}
func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.SectorID, offset UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealedCID cid.Cid) (io.ReadCloser, error) {
path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTUnsealed, false)
if err != nil {
return nil, xerrors.Errorf("acquire unsealed sector path: %w", err)
}
defer doneUnsealed()
f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644)
if err == nil {
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
return nil, xerrors.Errorf("seek: %w", err)
{
path, doneUnsealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
if err != nil {
return nil, xerrors.Errorf("acquire unsealed sector path: %w", err)
}
lr := io.LimitReader(f, int64(size))
f, err := os.OpenFile(path.Unsealed, os.O_RDONLY, 0644)
if err == nil {
if _, err := f.Seek(int64(offset), io.SeekStart); err != nil {
doneUnsealed()
return nil, xerrors.Errorf("seek: %w", err)
}
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: f,
}, nil
}
if !os.IsNotExist(err) {
return nil, err
lr := io.LimitReader(f, int64(size))
return &struct {
io.Reader
io.Closer
}{
Reader: lr,
Closer: closerFunc(func() error {
doneUnsealed()
return f.Close()
}),
}, nil
}
doneUnsealed()
if !os.IsNotExist(err) {
return nil, err
}
}
sealed, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed|stores.FTCache, 0, false)
paths, doneSealed, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed, false)
if err != nil {
return nil, xerrors.Errorf("acquire sealed/cache sector path: %w", err)
}
@ -145,9 +160,9 @@ func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.Sect
// remove last used one (or use whatever other cache policy makes sense))
err = ffi.Unseal(
sb.sealProofType,
sealed.Cache,
sealed.Sealed,
path.Unsealed,
paths.Cache,
paths.Sealed,
paths.Unsealed,
sector.Number,
sector.Miner,
ticket,
@ -157,7 +172,7 @@ func (sb *Sealer) ReadPieceFromSealedSector(ctx context.Context, sector abi.Sect
return nil, xerrors.Errorf("unseal failed: %w", err)
}
f, err = os.OpenFile(string(path.Unsealed), os.O_RDONLY, 0644)
f, err := os.OpenFile(paths.Unsealed, os.O_RDONLY, 0644)
if err != nil {
return nil, err
}

36
request_queue.go Normal file
View File

@ -0,0 +1,36 @@
package sectorstorage
import "container/heap"
type requestQueue []*workerRequest
func (q requestQueue) Len() int { return len(q) }
func (q requestQueue) Less(i, j int) bool {
return q[i].taskType.Less(q[j].taskType)
}
func (q requestQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
func (q *requestQueue) Push(x interface{}) {
n := len(*q)
item := x.(*workerRequest)
item.index = n
*q = append(*q, item)
}
func (q *requestQueue) Pop() interface{} {
old := *q
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*q = old[0 : n-1]
return item
}
var _ heap.Interface = &requestQueue{}

30
request_queue_test.go Normal file
View File

@ -0,0 +1,30 @@
package sectorstorage
import (
"container/heap"
"testing"
"github.com/filecoin-project/sector-storage/sealtasks"
)
func TestRequestQueue(t *testing.T) {
rq := &requestQueue{}
heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece})
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1})
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit2})
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1})
heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece})
pt := heap.Pop(rq).(*workerRequest)
if pt.taskType != sealtasks.TTPreCommit2 {
t.Error("expected precommit2, got", pt.taskType)
}
pt = heap.Pop(rq).(*workerRequest)
if pt.taskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.taskType)
}
}

View File

@ -73,7 +73,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
sealtasks.TTPreCommit1: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 64 << 30,
MinMemory: 32 << 30,
MinMemory: 48 << 30,
Threads: 1,
@ -106,10 +106,11 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
},
sealtasks.TTPreCommit2: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 96 << 30,
MinMemory: 64 << 30,
MaxMemory: 32 << 30,
MinMemory: 32 << 30,
Threads: -1,
CanGPU: true,
BaseMinMemory: 30 << 30,
},
@ -172,9 +173,9 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
BaseMinMemory: 8 << 20,
},
},
sealtasks.TTCommit2: { // TODO: Measure more accurately
sealtasks.TTCommit2: {
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
MaxMemory: 110 << 30,
MaxMemory: 130 << 30,
MinMemory: 60 << 30,
Threads: -1,

View File

@ -1,7 +1,7 @@
package sectorstorage
import (
"container/list"
"container/heap"
"context"
"sort"
"sync"
@ -41,7 +41,7 @@ type scheduler struct {
workerFree chan WorkerID
closing chan struct{}
schedQueue *list.List // List[*workerRequest]
schedQueue *requestQueue
}
func newScheduler(spt abi.RegisteredProof) *scheduler {
@ -60,7 +60,7 @@ func newScheduler(spt abi.RegisteredProof) *scheduler {
workerFree: make(chan WorkerID),
closing: make(chan struct{}),
schedQueue: list.New(),
schedQueue: &requestQueue{},
}
}
@ -101,6 +101,8 @@ type workerRequest struct {
prepare WorkerAction
work WorkerAction
index int // The index of the item in the heap.
ret chan<- workerResponse
ctx context.Context
}
@ -154,7 +156,7 @@ func (sh *scheduler) runSched() {
continue
}
sh.schedQueue.PushBack(req)
heap.Push(sh.schedQueue, req)
case wid := <-sh.workerFree:
sh.onWorkerFreed(wid)
case <-sh.closing:
@ -173,8 +175,8 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) {
return
}
for e := sh.schedQueue.Front(); e != nil; e = e.Next() {
req := e.Value.(*workerRequest)
for i := 0; i < sh.schedQueue.Len(); i++ {
req := (*sh.schedQueue)[i]
ok, err := req.sel.Ok(req.ctx, req.taskType, w)
if err != nil {
@ -193,15 +195,8 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) {
}
if scheduled {
pe := e.Prev()
sh.schedQueue.Remove(e)
if pe == nil {
pe = sh.schedQueue.Front()
}
if pe == nil {
break
}
e = pe
heap.Remove(sh.schedQueue, i)
i--
continue
}
}

View File

@ -13,3 +13,17 @@ const (
TTFetch TaskType = "seal/v0/fetch"
)
var order = map[TaskType]int{
TTAddPiece: 7,
TTPreCommit1: 6,
TTPreCommit2: 5,
TTCommit2: 4,
TTCommit1: 3,
TTFetch: 2,
TTFinalize: 1,
}
func (a TaskType) Less(b TaskType) bool {
return order[a] < order[b]
}