diff --git a/go.mod b/go.mod index 80fa7f46c..d9a396d07 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/filecoin-project/go-paramfetch v0.0.1 github.com/filecoin-project/specs-actors v0.3.0 github.com/filecoin-project/specs-storage v0.0.0-20200417134612-61b2d91a6102 + github.com/google/uuid v1.1.1 github.com/gorilla/mux v1.7.4 github.com/hashicorp/go-multierror v1.0.0 github.com/ipfs/go-cid v0.0.5 @@ -17,6 +18,7 @@ require ( github.com/ipfs/go-log/v2 v2.0.3 github.com/mattn/go-isatty v0.0.9 // indirect github.com/mitchellh/go-homedir v1.1.0 + github.com/stretchr/testify v1.4.0 go.opencensus.io v0.22.3 go.uber.org/atomic v1.5.1 // indirect go.uber.org/zap v1.13.0 // indirect diff --git a/go.sum b/go.sum index de87462da..25dfea911 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA/KrfX8Bi1LQSO4pzoVjTiL3h4Jk+Zk= diff --git a/manager_test.go b/manager_test.go new file mode 100644 index 000000000..f89989989 --- /dev/null +++ b/manager_test.go @@ -0,0 +1,139 @@ +package sectorstorage + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "github.com/filecoin-project/sector-storage/sealtasks" + logging "github.com/ipfs/go-log" + "io/ioutil" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/sector-storage/ffiwrapper" + "github.com/filecoin-project/sector-storage/stores" +) + +type testStorage stores.StorageConfig + +func newTestStorage(t *testing.T) *testStorage { + tp, err := ioutil.TempDir(os.TempDir(), "sector-storage-test-") + require.NoError(t, err) + + { + b, err := json.MarshalIndent(&stores.LocalStorageMeta{ + ID: stores.ID(uuid.New().String()), + Weight: 1, + CanSeal: true, + CanStore: true, + }, "", " ") + require.NoError(t, err) + + err = ioutil.WriteFile(filepath.Join(tp, "sectorstore.json"), b, 0644) + require.NoError(t, err) + } + + return &testStorage{ + StoragePaths: []stores.LocalPath{ + {Path: tp}, + }, + } +} + +func (t testStorage) cleanup() { + for _, path := range t.StoragePaths { + if err := os.RemoveAll(path.Path); err != nil { + fmt.Println("Cleanup error:", err) + } + } +} + +func (t testStorage) GetStorage() (stores.StorageConfig, error) { + return stores.StorageConfig(t), nil +} + +func (t *testStorage) SetStorage(f func(*stores.StorageConfig)) error { + f((*stores.StorageConfig)(t)) + return nil +} + +var _ stores.LocalStorage = &testStorage{} + +func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *stores.Remote, *stores.Index) { + st := newTestStorage(t) + defer st.cleanup() + + si := stores.NewIndex() + cfg := &ffiwrapper.Config{ + SealProofType: abi.RegisteredProof_StackedDRG2KiBSeal, + } + + lstor, err := stores.NewLocal(ctx, st, si, nil) + require.NoError(t, err) + + prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor}, cfg) + require.NoError(t, err) + + stor := stores.NewRemote(lstor, si, nil) + + m := &Manager{ + scfg: cfg, + + ls: st, + storage: stor, + localStore: lstor, + remoteHnd: &stores.FetchHandler{Local: lstor}, + index: si, + + sched: newScheduler(cfg.SealProofType), + + Prover: prover, + } + + go m.sched.runSched() + + return m, lstor, stor, si +} + +func TestSimple(t *testing.T) { + logging.SetAllLoggers(logging.LevelDebug) + + ctx := context.Background() + m, lstor, _, _ := newTestMgr(ctx, t) + + localTasks := []sealtasks.TaskType{ + sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, + } + + err := m.AddWorker(ctx, newTestWorker(WorkerConfig{ + SealProof: abi.RegisteredProof_StackedDRG2KiBSeal, + TaskTypes: localTasks, + }, lstor)) + require.NoError(t, err) + + sid := abi.SectorID{Miner: 1000, Number: 1} + + pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127))) + require.NoError(t, err) + require.Equal(t, abi.PaddedPieceSize(1024), pi.Size) + + piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:])) + require.NoError(t, err) + require.Equal(t, abi.PaddedPieceSize(1024), piz.Size) + + pieces := []abi.PieceInfo{pi, piz} + + ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9} + + _, err = m.SealPreCommit1(ctx, sid, ticket, pieces) + require.NoError(t, err) + +} diff --git a/mock/mock.go b/mock/mock.go index 854d89870..1e3985be0 100644 --- a/mock/mock.go +++ b/mock/mock.go @@ -16,7 +16,6 @@ import ( logging "github.com/ipfs/go-log" "golang.org/x/xerrors" - "github.com/filecoin-project/sector-storage" "github.com/filecoin-project/sector-storage/ffiwrapper" ) @@ -26,7 +25,6 @@ type SectorMgr struct { sectors map[abi.SectorID]*sectorState sectorSize abi.SectorSize nextSectorID abi.SectorNumber - rateLimit chan struct{} proofType abi.RegisteredProof lk sync.Mutex @@ -34,7 +32,7 @@ type SectorMgr struct { type mockVerif struct{} -func NewMockSectorMgr(threads int, ssize abi.SectorSize) *SectorMgr { +func NewMockSectorMgr(ssize abi.SectorSize) *SectorMgr { rt, err := ffiwrapper.SealProofTypeFromSectorSize(ssize) if err != nil { panic(err) @@ -44,7 +42,6 @@ func NewMockSectorMgr(threads int, ssize abi.SectorSize) *SectorMgr { sectors: make(map[abi.SectorID]*sectorState), sectorSize: ssize, nextSectorID: 5, - rateLimit: make(chan struct{}, threads), proofType: rt, } } @@ -64,15 +61,6 @@ type sectorState struct { lk sync.Mutex } -func (mgr *SectorMgr) RateLimit() func() { - mgr.rateLimit <- struct{}{} - - // TODO: probably want to copy over rate limit code - return func() { - <-mgr.rateLimit - } -} - func (mgr *SectorMgr) NewSector(ctx context.Context, sector abi.SectorID) error { return nil } @@ -333,4 +321,3 @@ func (m mockVerif) GenerateWinningPoStSectorChallenge(ctx context.Context, proof var MockVerifier = mockVerif{} var _ ffiwrapper.Verifier = MockVerifier -var _ sectorstorage.SectorManager = &SectorMgr{} diff --git a/mock/mock_test.go b/mock/mock_test.go index 524e8d615..5f4b9c428 100644 --- a/mock/mock_test.go +++ b/mock/mock_test.go @@ -9,7 +9,7 @@ import ( ) func TestOpFinish(t *testing.T) { - sb := NewMockSectorMgr(1, 2048) + sb := NewMockSectorMgr(2048) sid, pieces, err := sb.StageFakeData(123) if err != nil { diff --git a/testworker_test.go b/testworker_test.go new file mode 100644 index 000000000..99fa4abec --- /dev/null +++ b/testworker_test.go @@ -0,0 +1,104 @@ +package sectorstorage + +import ( + "context" + + "github.com/filecoin-project/specs-actors/actors/abi" + "github.com/filecoin-project/specs-storage/storage" + + "github.com/filecoin-project/sector-storage/mock" + "github.com/filecoin-project/sector-storage/sealtasks" + "github.com/filecoin-project/sector-storage/stores" + "github.com/filecoin-project/sector-storage/storiface" +) + +type testWorker struct { + acceptTasks map[sealtasks.TaskType]struct{} + lstor *stores.Local + + mockSeal *mock.SectorMgr +} + +func newTestWorker(wcfg WorkerConfig, lstor *stores.Local) *testWorker { + ssize, err := wcfg.SealProof.SectorSize() + if err != nil { + panic(err) + } + + acceptTasks := map[sealtasks.TaskType]struct{}{} + for _, taskType := range wcfg.TaskTypes { + acceptTasks[taskType] = struct{}{} + } + + return &testWorker{ + acceptTasks: acceptTasks, + lstor: lstor, + + mockSeal: mock.NewMockSectorMgr(ssize), + } +} + +func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { + return t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces) +} + +func (t *testWorker) NewSector(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + return t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) +} + +func (t *testWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) { + panic("implement me") +} + +func (t *testWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { + panic("implement me") +} + +func (t *testWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) { + panic("implement me") +} + +func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, b bool) error { + return nil +} + +func (t *testWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) { + return t.acceptTasks, nil +} + +func (t *testWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { + return t.lstor.Local(ctx) +} + +func (t *testWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { + res := ResourceTable[sealtasks.TTPreCommit2][abi.RegisteredProof_StackedDRG2KiBSeal] + + return storiface.WorkerInfo{ + Hostname: "testworkerer", + Resources: storiface.WorkerResources{ + MemPhysical: res.MinMemory * 3, + MemSwap: 0, + MemReserved: res.MinMemory, + CPUs: 32, + GPUs: nil, + }, + }, nil +} + +func (t *testWorker) Closing(ctx context.Context) (<-chan struct{}, error) { + return ctx.Done(), nil +} + +func (t *testWorker) Close() error { + panic("implement me") +} + +var _ Worker = &testWorker{}