diff --git a/extern/sector-storage/piece_provider_test.go b/extern/sector-storage/piece_provider_test.go new file mode 100644 index 000000000..08aadb78e --- /dev/null +++ b/extern/sector-storage/piece_provider_test.go @@ -0,0 +1,118 @@ +package sectorstorage + +import ( + "context" + "io/ioutil" + "strings" + "testing" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-statestore" + specstorage "github.com/filecoin-project/specs-storage/storage" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + ds_sync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" + "github.com/filecoin-project/lotus/extern/sector-storage/storiface" +) + +// TestPieceProviderReadPiece verifies that the ReadPiece method works correctly +func TestPieceProviderReadPiece(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + runTest := func(t *testing.T, alreadyUnsealed bool) { + // Set up sector storage manager + storage := newTestStorage(t) + index := stores.NewIndex() + localStore, err := stores.NewLocal(ctx, storage, index, nil) + require.NoError(t, err) + remoteStore := stores.NewRemote(localStore, index, nil, 6000) + dstore := ds_sync.MutexWrap(datastore.NewMapDatastore()) + wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls"))) + smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls"))) + sealerCfg := SealerConfig{ + ParallelFetchLimit: 10, + AllowAddPiece: true, + AllowPreCommit1: true, + AllowPreCommit2: true, + AllowCommit: true, + AllowUnseal: true, + } + mgr, err := New(ctx, localStore, remoteStore, storage, index, sealerCfg, wsts, smsts) + require.NoError(t, err) + + // Set up worker + localTasks := []sealtasks.TaskType{ + sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch, + } + testWorker := newTestWorker(WorkerConfig{ + TaskTypes: localTasks, + }, localStore, mgr) + err = mgr.AddWorker(ctx, testWorker) + require.NoError(t, err) + + // Create piece provider + pp := NewPieceProvider(remoteStore, index, mgr) + + // Mock sector + sector := specstorage.SectorRef{ + ID: abi.SectorID{ + Miner: 1000, + Number: 1, + }, + ProofType: abi.RegisteredSealProof_StackedDrg8MiBV1, + } + + // Create some data that when padded will be 8MB + pieceData := strings.Repeat("testthis", 127*1024*8) + size := abi.UnpaddedPieceSize(len(pieceData)) + pieceInfo, err := mgr.AddPiece(ctx, sector, nil, size, strings.NewReader(pieceData)) + require.NoError(t, err) + + // pre-commit 1 + pieces := []abi.PieceInfo{pieceInfo} + ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9} + preCommit1, err := mgr.SealPreCommit1(ctx, sector, ticket, pieces) + require.NoError(t, err) + + // pre-commit 2 + sectorCids, err := mgr.SealPreCommit2(ctx, sector, preCommit1) + require.NoError(t, err) + commD := sectorCids.Unsealed + + // If we want to test what happens when the data must be unsealed + // (ie there is not an unsealed copy already available) + if !alreadyUnsealed { + // Remove the unsealed copy from local storage + err = localStore.Remove(ctx, sector.ID, storiface.FTUnsealed, false) + require.NoError(t, err) + } + + // Read the piece + offset := storiface.UnpaddedByteIndex(0) + require.NoError(t, err) + reader, unsealed, err := pp.ReadPiece(ctx, sector, offset, size, ticket, commD) + require.NoError(t, err) + requiresUnseal := !alreadyUnsealed + require.Equal(t, requiresUnseal, unsealed) + + defer func() { _ = reader.Close() }() + + // Make sure the input matches the output + readData, err := ioutil.ReadAll(reader) + require.NoError(t, err) + require.Equal(t, pieceData, string(readData)) + } + + t.Run("already unsealed", func(t *testing.T) { + runTest(t, true) + }) + t.Run("requires unseal", func(t *testing.T) { + runTest(t, false) + }) +}