diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go index 8636a11d6..88872aac2 100644 --- a/extern/sector-storage/piece_provider_test.go +++ b/extern/sector-storage/piece_provider_test.go @@ -1,40 +1,36 @@ package sectorstorage import ( + "bytes" "context" "io/ioutil" - "strings" + "math/rand" + "net" + "net/http" "testing" - "time" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" specstorage "github.com/filecoin-project/specs-storage/storage" + "github.com/gorilla/mux" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" ds_sync "github.com/ipfs/go-datastore/sync" + logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" - "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) // TestPieceProviderReadPiece verifies that the ReadPiece method works correctly -func TestPieceProviderReadPiece(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() +// only uses miner and does NOT use any remote worker. +func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) { runTest := func(t *testing.T, alreadyUnsealed bool) { // Set up sector storage manager - storage := newTestStorage(t) - index := stores.NewIndex() - localStore, err := stores.NewLocal(ctx, storage, index, nil) - require.NoError(t, err) - remoteStore := stores.NewRemote(localStore, index, nil, 6000, &stores.DefaultPartialFileHandler{}) - dstore := ds_sync.MutexWrap(datastore.NewMapDatastore()) - wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls"))) - smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls"))) sealerCfg := SealerConfig{ ParallelFetchLimit: 10, AllowAddPiece: true, @@ -43,76 +39,31 @@ func TestPieceProviderReadPiece(t *testing.T) { AllowCommit: true, AllowUnseal: true, } - mgr, err := New(ctx, localStore, remoteStore, storage, index, sealerCfg, wsts, smsts) - require.NoError(t, err) - // Set up worker - localTasks := []sealtasks.TaskType{ - sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, - } + ppt := newPieceProviderTestHarness(t, sealerCfg, abi.RegisteredSealProof_StackedDrg8MiBV1) + defer ppt.shutdown(t) - csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls"))) - - // passing a nil executor here mirrors an actual worker setup as it - // will initialize the worker to use the rust proofs lib under the hood - worker := newLocalWorker(nil, WorkerConfig{ - TaskTypes: localTasks, - }, remoteStore, localStore, index, mgr, csts) - - err = mgr.AddWorker(ctx, worker) - require.NoError(t, err) - - // Create piece provider - pp := NewPieceProvider(remoteStore, index, mgr) - - // Mock sector - sector := specstorage.SectorRef{ - ID: abi.SectorID{ - Miner: 1000, - Number: 1, - }, - ProofType: abi.RegisteredSealProof_StackedDrg8MiBV1, - } - - // Create some data that when padded will be 8MB - pieceData := strings.Repeat("testthis", 127*1024*8) + // Create some padded data that aligns with the piece boundaries. + pieceData := generatePieceData(8 * 127 * 1024 * 8) size := abi.UnpaddedPieceSize(len(pieceData)) - pieceInfo, err := mgr.AddPiece(ctx, sector, nil, size, strings.NewReader(pieceData)) - require.NoError(t, err) + ppt.addPiece(t, pieceData) // pre-commit 1 - pieces := []abi.PieceInfo{pieceInfo} - ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9} - preCommit1, err := mgr.SealPreCommit1(ctx, sector, ticket, pieces) - require.NoError(t, err) + preCommit1 := ppt.preCommit1(t) // pre-commit 2 - sectorCids, err := mgr.SealPreCommit2(ctx, sector, preCommit1) - require.NoError(t, err) - commD := sectorCids.Unsealed + ppt.preCommit2(t, preCommit1) // If we want to test what happens when the data must be unsealed // (ie there is not an unsealed copy already available) if !alreadyUnsealed { // Remove the unsealed copy from local storage - err = localStore.Remove(ctx, sector.ID, storiface.FTUnsealed, false) - require.NoError(t, err) + ppt.removeAllUnsealedSectorFiles(t) } // Read the piece - offset := storiface.UnpaddedByteIndex(0) - require.NoError(t, err) - reader, unsealed, err := pp.ReadPiece(ctx, sector, offset, size, ticket, commD) - require.NoError(t, err) - requiresUnseal := !alreadyUnsealed - require.Equal(t, requiresUnseal, unsealed) - - defer func() { _ = reader.Close() }() - - // Make sure the input matches the output - readData, err := ioutil.ReadAll(reader) - require.NoError(t, err) - require.Equal(t, pieceData, string(readData)) + ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size, + !alreadyUnsealed, pieceData) } t.Run("already unsealed", func(t *testing.T) { @@ -122,3 +73,252 @@ func TestPieceProviderReadPiece(t *testing.T) { runTest(t, false) }) } + +func TestReadPieceRemoteWorkers(t *testing.T) { + logging.SetAllLoggers(logging.LevelDebug) + + // miner's worker can only add pieces to an unsealed sector. + sealerCfg := SealerConfig{ + ParallelFetchLimit: 10, + AllowAddPiece: true, + AllowPreCommit1: false, + AllowPreCommit2: false, + AllowCommit: false, + AllowUnseal: false, + } + + // test harness for an 8M sector. + ppt := newPieceProviderTestHarness(t, sealerCfg, abi.RegisteredSealProof_StackedDrg8MiBV1) + defer ppt.shutdown(t) + + // worker 2 will ONLY help with the sealing by first fetching + // the unsealed file from the miner. + ppt.addRemoteWorker(t, []sealtasks.TaskType{ + sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1, + sealtasks.TTFetch, sealtasks.TTFinalize, + }) + + // create a worker that can ONLY unseal and fetch + ppt.addRemoteWorker(t, []sealtasks.TaskType{ + sealtasks.TTUnseal, sealtasks.TTFetch, + }) + + // run the test + + // add one piece that aligns with the padding/piece boundaries. + pd1 := generatePieceData(8 * 127 * 4 * 1024) + pi1 := ppt.addPiece(t, pd1) + pd1size := pi1.Size.Unpadded() + + pd2 := generatePieceData(8 * 127 * 4 * 1024) + pi2 := ppt.addPiece(t, pd2) + pd2size := pi2.Size.Unpadded() + + // pre-commit 1 + pC1 := ppt.preCommit1(t) + + // pre-commit 2 + ppt.preCommit2(t, pC1) + + // finalize the sector so we declare to the index we have the sealed file + // so the unsealing worker can later look it up and fetch it if needed + // sending nil here will remove all unsealed files after sector is finalized. + ppt.finalizeSector(t, nil) + + // Read the piece -> have to unseal since we removed the file. + ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, + true, pd1) + + // Read the same piece again -> will NOT have to unseal. + ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, false, pd1) + + // remove the unsealed file and read again -> will have to unseal. + ppt.removeAllUnsealedSectorFiles(t) + ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, + true, pd1) + + // Read Piece 2 -> no unsealing as it got unsealed above. + ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2) + + // remove all unseal files -> Read Piece 2 -> will have to Unseal. + ppt.removeAllUnsealedSectorFiles(t) + ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2) +} + +type pieceProviderTestHarness struct { + ctx context.Context + index *stores.Index + pp PieceProvider + sector specstorage.SectorRef + mgr *Manager + ticket abi.SealRandomness + commD cid.Cid + localStores []*stores.Local + + servers []*http.Server + + addedPieces []abi.PieceInfo +} + +func generatePieceData(size uint64) []byte { + bz := make([]byte, size) + rand.Read(bz) + return bz +} + +func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness { + ctx := context.Background() + // listen on tcp socket to create an http server later + address := "0.0.0.0:0" + nl, err := net.Listen("tcp", address) + require.NoError(t, err) + + // create index, storage, local store & remote store. + index := stores.NewIndex() + storage := newTestStorage(t) + localStore, err := stores.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"}) + require.NoError(t, err) + remoteStore := stores.NewRemote(localStore, index, nil, 6000, &stores.DefaultPartialFileHandler{}) + + // data stores for state tracking. + dstore := ds_sync.MutexWrap(datastore.NewMapDatastore()) + wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls"))) + smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls"))) + + mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, wsts, smsts) + require.NoError(t, err) + + // start a http server on the manager to serve sector file requests. + svc := &http.Server{ + Addr: nl.Addr().String(), + Handler: mgr, + } + go func() { + _ = svc.Serve(nl) + }() + + pp := NewPieceProvider(remoteStore, index, mgr) + + sector := specstorage.SectorRef{ + ID: abi.SectorID{ + Miner: 100, + Number: 10, + }, + ProofType: sectorProofType, + } + + ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9} + + ppt := &pieceProviderTestHarness{ + ctx: ctx, + index: index, + pp: pp, + sector: sector, + mgr: mgr, + ticket: ticket, + } + ppt.servers = append(ppt.servers, svc) + ppt.localStores = append(ppt.localStores, localStore) + return ppt +} + +func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtasks.TaskType) { + // start an http Server + address := "0.0.0.0:0" + nl, err := net.Listen("tcp", address) + require.NoError(t, err) + + localStore, err := stores.NewLocal(p.ctx, newTestStorage(t), p.index, []string{"http://" + nl.Addr().String() + "/remote"}) + require.NoError(t, err) + + fh := &stores.FetchHandler{ + Local: localStore, + PfHandler: &stores.DefaultPartialFileHandler{}, + } + + mux := mux.NewRouter() + mux.PathPrefix("/remote").HandlerFunc(fh.ServeHTTP) + svc := &http.Server{ + Addr: nl.Addr().String(), + Handler: mux, + } + + go func() { + _ = svc.Serve(nl) + }() + + remote := stores.NewRemote(localStore, p.index, nil, 1000, + &stores.DefaultPartialFileHandler{}) + + dstore := ds_sync.MutexWrap(datastore.NewMapDatastore()) + csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls"))) + + worker := newLocalWorker(nil, WorkerConfig{ + TaskTypes: tasks, + }, remote, localStore, p.index, p.mgr, csts) + + p.servers = append(p.servers, svc) + p.localStores = append(p.localStores, localStore) + + // register self with manager + require.NoError(t, p.mgr.AddWorker(p.ctx, worker)) +} + +func (p *pieceProviderTestHarness) removeAllUnsealedSectorFiles(t *testing.T) { + for i := range p.localStores { + ls := p.localStores[i] + require.NoError(t, ls.Remove(p.ctx, p.sector.ID, storiface.FTUnsealed, false)) + } +} + +func (p *pieceProviderTestHarness) addPiece(t *testing.T, pieceData []byte) abi.PieceInfo { + var existing []abi.UnpaddedPieceSize + for _, pi := range p.addedPieces { + existing = append(existing, pi.Size.Unpadded()) + } + + size := abi.UnpaddedPieceSize(len(pieceData)) + pieceInfo, err := p.mgr.AddPiece(p.ctx, p.sector, existing, size, bytes.NewReader(pieceData)) + require.NoError(t, err) + + p.addedPieces = append(p.addedPieces, pieceInfo) + return pieceInfo +} + +func (p *pieceProviderTestHarness) preCommit1(t *testing.T) specstorage.PreCommit1Out { + preCommit1, err := p.mgr.SealPreCommit1(p.ctx, p.sector, p.ticket, p.addedPieces) + require.NoError(t, err) + return preCommit1 +} + +func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreCommit1Out) { + sectorCids, err := p.mgr.SealPreCommit2(p.ctx, p.sector, pc1) + require.NoError(t, err) + commD := sectorCids.Unsealed + p.commD = commD +} + +func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, + expectedHadToUnseal bool, expectedBytes []byte) { + rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD) + require.NoError(t, err) + require.NotNil(t, rd) + require.Equal(t, expectedHadToUnseal, isUnsealed) + defer func() { _ = rd.Close() }() + + // Make sure the input matches the output + readData, err := ioutil.ReadAll(rd) + require.NoError(t, err) + require.Equal(t, expectedBytes, readData) +} + +func (p *pieceProviderTestHarness) finalizeSector(t *testing.T, keepUnseal []specstorage.Range) { + require.NoError(t, p.mgr.FinalizeSector(p.ctx, p.sector, keepUnseal)) +} + +func (p *pieceProviderTestHarness) shutdown(t *testing.T) { + for _, svc := range p.servers { + s := svc + require.NoError(t, s.Shutdown(p.ctx)) + } +}