lotus/storage/sealer/testworker_test.go

158 lines
4.6 KiB
Go
Raw Normal View History

package sealer
2020-05-08 11:36:08 +00:00
import (
"context"
"sync"
2020-05-19 16:09:36 +00:00
"github.com/google/uuid"
"github.com/ipfs/go-cid"
2020-05-08 11:36:08 +00:00
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/mock"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
2020-05-08 11:36:08 +00:00
)
type testWorker struct {
acceptTasks map[sealtasks.TaskType]struct{}
lstor *paths.Local
2020-09-07 14:35:54 +00:00
ret storiface.WorkerReturn
2020-05-08 11:36:08 +00:00
mockSeal *mock.SectorMgr
2020-09-16 22:35:30 +00:00
pc1s int
pc1lk sync.Mutex
pc1wait *sync.WaitGroup
session uuid.UUID
Worker
2020-05-08 11:36:08 +00:00
}
func newTestWorker(wcfg WorkerConfig, lstor *paths.Local, ret storiface.WorkerReturn) *testWorker {
2020-05-08 11:36:08 +00:00
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
}
return &testWorker{
acceptTasks: acceptTasks,
lstor: lstor,
2020-09-07 14:35:54 +00:00
ret: ret,
2020-05-08 11:36:08 +00:00
2020-11-05 06:34:24 +00:00
mockSeal: mock.NewMockSectorMgr(nil),
session: uuid.New(),
2020-05-08 11:36:08 +00:00
}
}
2020-11-05 06:34:24 +00:00
func (t *testWorker) asyncCall(sector storage.SectorRef, work func(ci storiface.CallID)) (storiface.CallID, error) {
2020-09-07 14:35:54 +00:00
ci := storiface.CallID{
2020-11-05 06:34:24 +00:00
Sector: sector.ID,
2020-09-07 14:35:54 +00:00
ID: uuid.New(),
}
go work(ci)
return ci, nil
}
2020-11-05 06:34:24 +00:00
func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
2020-09-07 14:35:54 +00:00
return t.asyncCall(sector, func(ci storiface.CallID) {
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
2020-11-17 15:17:45 +00:00
if err := t.ret.ReturnAddPiece(ctx, ci, p, toCallError(err)); err != nil {
2020-09-07 14:35:54 +00:00
log.Error(err)
}
})
2020-05-08 11:36:08 +00:00
}
func (t *testWorker) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
out, err := t.mockSeal.ReplicaUpdate(ctx, sector, pieces)
if err := t.ret.ReturnReplicaUpdate(ctx, ci, out, toCallError(err)); err != nil {
log.Error(err)
}
})
}
func (t *testWorker) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
vanillaProofs, err := t.mockSeal.ProveReplicaUpdate1(ctx, sector, sectorKey, newSealed, newUnsealed)
if err := t.ret.ReturnProveReplicaUpdate1(ctx, ci, vanillaProofs, toCallError(err)); err != nil {
log.Error(err)
}
})
}
func (t *testWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
proof, err := t.mockSeal.ProveReplicaUpdate2(ctx, sector, sectorKey, newSealed, newUnsealed, vanillaProofs)
if err := t.ret.ReturnProveReplicaUpdate2(ctx, ci, proof, toCallError(err)); err != nil {
log.Error(err)
}
})
}
2020-11-05 06:34:24 +00:00
func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
t.pc1s++
if t.pc1wait != nil {
t.pc1wait.Done()
}
t.pc1lk.Lock()
defer t.pc1lk.Unlock()
p1o, err := t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces)
2020-11-17 15:17:45 +00:00
if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, toCallError(err)); err != nil {
log.Error(err)
}
})
}
2020-11-05 06:34:24 +00:00
func (t *testWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
2020-09-07 14:35:54 +00:00
return t.asyncCall(sector, func(ci storiface.CallID) {
2020-11-17 15:17:45 +00:00
if err := t.ret.ReturnFetch(ctx, ci, nil); err != nil {
2020-09-07 14:35:54 +00:00
log.Error(err)
}
})
2020-05-08 11:36:08 +00:00
}
func (t *testWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return t.acceptTasks, nil
}
2022-01-18 10:57:04 +00:00
func (t *testWorker) Paths(ctx context.Context) ([]storiface.StoragePath, error) {
2020-05-08 11:36:08 +00:00
return t.lstor.Local(ctx)
}
func (t *testWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
2021-11-29 14:14:57 +00:00
res := storiface.ResourceTable[sealtasks.TTPreCommit2][abi.RegisteredSealProof_StackedDrg2KiBV1]
2020-05-08 11:36:08 +00:00
return storiface.WorkerInfo{
Hostname: "testworkerer",
Resources: storiface.WorkerResources{
MemPhysical: res.MinMemory * 3,
MemUsed: res.MinMemory,
MemSwapUsed: 0,
2020-05-08 11:36:08 +00:00
MemSwap: 0,
CPUs: 32,
GPUs: nil,
},
}, nil
}
func (t *testWorker) Session(context.Context) (uuid.UUID, error) {
return t.session, nil
2020-05-08 11:36:08 +00:00
}
func (t *testWorker) Close() error {
panic("implement me")
}
var _ Worker = &testWorker{}