finish integration tests
This commit is contained in:
parent
c17300dc1f
commit
78a0458ada
338
extern/sector-storage/piece_provider_test.go
vendored
338
extern/sector-storage/piece_provider_test.go
vendored
@ -1,40 +1,36 @@
|
|||||||
package sectorstorage
|
package sectorstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strings"
|
"math/rand"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/go-statestore"
|
"github.com/filecoin-project/go-statestore"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||||
specstorage "github.com/filecoin-project/specs-storage/storage"
|
specstorage "github.com/filecoin-project/specs-storage/storage"
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
"github.com/ipfs/go-datastore"
|
"github.com/ipfs/go-datastore"
|
||||||
"github.com/ipfs/go-datastore/namespace"
|
"github.com/ipfs/go-datastore/namespace"
|
||||||
ds_sync "github.com/ipfs/go-datastore/sync"
|
ds_sync "github.com/ipfs/go-datastore/sync"
|
||||||
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestPieceProviderReadPiece verifies that the ReadPiece method works correctly
|
// TestPieceProviderReadPiece verifies that the ReadPiece method works correctly
|
||||||
func TestPieceProviderReadPiece(t *testing.T) {
|
// only uses miner and does NOT use any remote worker.
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
func TestPieceProviderSimpleNoRemoteWorker(t *testing.T) {
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
runTest := func(t *testing.T, alreadyUnsealed bool) {
|
runTest := func(t *testing.T, alreadyUnsealed bool) {
|
||||||
// Set up sector storage manager
|
// Set up sector storage manager
|
||||||
storage := newTestStorage(t)
|
|
||||||
index := stores.NewIndex()
|
|
||||||
localStore, err := stores.NewLocal(ctx, storage, index, nil)
|
|
||||||
require.NoError(t, err)
|
|
||||||
remoteStore := stores.NewRemote(localStore, index, nil, 6000, &stores.DefaultPartialFileHandler{})
|
|
||||||
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
|
|
||||||
wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls")))
|
|
||||||
smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
|
|
||||||
sealerCfg := SealerConfig{
|
sealerCfg := SealerConfig{
|
||||||
ParallelFetchLimit: 10,
|
ParallelFetchLimit: 10,
|
||||||
AllowAddPiece: true,
|
AllowAddPiece: true,
|
||||||
@ -43,76 +39,31 @@ func TestPieceProviderReadPiece(t *testing.T) {
|
|||||||
AllowCommit: true,
|
AllowCommit: true,
|
||||||
AllowUnseal: true,
|
AllowUnseal: true,
|
||||||
}
|
}
|
||||||
mgr, err := New(ctx, localStore, remoteStore, storage, index, sealerCfg, wsts, smsts)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Set up worker
|
ppt := newPieceProviderTestHarness(t, sealerCfg, abi.RegisteredSealProof_StackedDrg8MiBV1)
|
||||||
localTasks := []sealtasks.TaskType{
|
defer ppt.shutdown(t)
|
||||||
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
|
|
||||||
}
|
|
||||||
|
|
||||||
csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
|
// Create some padded data that aligns with the piece boundaries.
|
||||||
|
pieceData := generatePieceData(8 * 127 * 1024 * 8)
|
||||||
// passing a nil executor here mirrors an actual worker setup as it
|
|
||||||
// will initialize the worker to use the rust proofs lib under the hood
|
|
||||||
worker := newLocalWorker(nil, WorkerConfig{
|
|
||||||
TaskTypes: localTasks,
|
|
||||||
}, remoteStore, localStore, index, mgr, csts)
|
|
||||||
|
|
||||||
err = mgr.AddWorker(ctx, worker)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Create piece provider
|
|
||||||
pp := NewPieceProvider(remoteStore, index, mgr)
|
|
||||||
|
|
||||||
// Mock sector
|
|
||||||
sector := specstorage.SectorRef{
|
|
||||||
ID: abi.SectorID{
|
|
||||||
Miner: 1000,
|
|
||||||
Number: 1,
|
|
||||||
},
|
|
||||||
ProofType: abi.RegisteredSealProof_StackedDrg8MiBV1,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create some data that when padded will be 8MB
|
|
||||||
pieceData := strings.Repeat("testthis", 127*1024*8)
|
|
||||||
size := abi.UnpaddedPieceSize(len(pieceData))
|
size := abi.UnpaddedPieceSize(len(pieceData))
|
||||||
pieceInfo, err := mgr.AddPiece(ctx, sector, nil, size, strings.NewReader(pieceData))
|
ppt.addPiece(t, pieceData)
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// pre-commit 1
|
// pre-commit 1
|
||||||
pieces := []abi.PieceInfo{pieceInfo}
|
preCommit1 := ppt.preCommit1(t)
|
||||||
ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9}
|
|
||||||
preCommit1, err := mgr.SealPreCommit1(ctx, sector, ticket, pieces)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// pre-commit 2
|
// pre-commit 2
|
||||||
sectorCids, err := mgr.SealPreCommit2(ctx, sector, preCommit1)
|
ppt.preCommit2(t, preCommit1)
|
||||||
require.NoError(t, err)
|
|
||||||
commD := sectorCids.Unsealed
|
|
||||||
|
|
||||||
// If we want to test what happens when the data must be unsealed
|
// If we want to test what happens when the data must be unsealed
|
||||||
// (ie there is not an unsealed copy already available)
|
// (ie there is not an unsealed copy already available)
|
||||||
if !alreadyUnsealed {
|
if !alreadyUnsealed {
|
||||||
// Remove the unsealed copy from local storage
|
// Remove the unsealed copy from local storage
|
||||||
err = localStore.Remove(ctx, sector.ID, storiface.FTUnsealed, false)
|
ppt.removeAllUnsealedSectorFiles(t)
|
||||||
require.NoError(t, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the piece
|
// Read the piece
|
||||||
offset := storiface.UnpaddedByteIndex(0)
|
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), size,
|
||||||
require.NoError(t, err)
|
!alreadyUnsealed, pieceData)
|
||||||
reader, unsealed, err := pp.ReadPiece(ctx, sector, offset, size, ticket, commD)
|
|
||||||
require.NoError(t, err)
|
|
||||||
requiresUnseal := !alreadyUnsealed
|
|
||||||
require.Equal(t, requiresUnseal, unsealed)
|
|
||||||
|
|
||||||
defer func() { _ = reader.Close() }()
|
|
||||||
|
|
||||||
// Make sure the input matches the output
|
|
||||||
readData, err := ioutil.ReadAll(reader)
|
|
||||||
require.NoError(t, err)
|
|
||||||
require.Equal(t, pieceData, string(readData))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("already unsealed", func(t *testing.T) {
|
t.Run("already unsealed", func(t *testing.T) {
|
||||||
@ -122,3 +73,252 @@ func TestPieceProviderReadPiece(t *testing.T) {
|
|||||||
runTest(t, false)
|
runTest(t, false)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReadPieceRemoteWorkers(t *testing.T) {
|
||||||
|
logging.SetAllLoggers(logging.LevelDebug)
|
||||||
|
|
||||||
|
// miner's worker can only add pieces to an unsealed sector.
|
||||||
|
sealerCfg := SealerConfig{
|
||||||
|
ParallelFetchLimit: 10,
|
||||||
|
AllowAddPiece: true,
|
||||||
|
AllowPreCommit1: false,
|
||||||
|
AllowPreCommit2: false,
|
||||||
|
AllowCommit: false,
|
||||||
|
AllowUnseal: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
// test harness for an 8M sector.
|
||||||
|
ppt := newPieceProviderTestHarness(t, sealerCfg, abi.RegisteredSealProof_StackedDrg8MiBV1)
|
||||||
|
defer ppt.shutdown(t)
|
||||||
|
|
||||||
|
// worker 2 will ONLY help with the sealing by first fetching
|
||||||
|
// the unsealed file from the miner.
|
||||||
|
ppt.addRemoteWorker(t, []sealtasks.TaskType{
|
||||||
|
sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit1,
|
||||||
|
sealtasks.TTFetch, sealtasks.TTFinalize,
|
||||||
|
})
|
||||||
|
|
||||||
|
// create a worker that can ONLY unseal and fetch
|
||||||
|
ppt.addRemoteWorker(t, []sealtasks.TaskType{
|
||||||
|
sealtasks.TTUnseal, sealtasks.TTFetch,
|
||||||
|
})
|
||||||
|
|
||||||
|
// run the test
|
||||||
|
|
||||||
|
// add one piece that aligns with the padding/piece boundaries.
|
||||||
|
pd1 := generatePieceData(8 * 127 * 4 * 1024)
|
||||||
|
pi1 := ppt.addPiece(t, pd1)
|
||||||
|
pd1size := pi1.Size.Unpadded()
|
||||||
|
|
||||||
|
pd2 := generatePieceData(8 * 127 * 4 * 1024)
|
||||||
|
pi2 := ppt.addPiece(t, pd2)
|
||||||
|
pd2size := pi2.Size.Unpadded()
|
||||||
|
|
||||||
|
// pre-commit 1
|
||||||
|
pC1 := ppt.preCommit1(t)
|
||||||
|
|
||||||
|
// pre-commit 2
|
||||||
|
ppt.preCommit2(t, pC1)
|
||||||
|
|
||||||
|
// finalize the sector so we declare to the index we have the sealed file
|
||||||
|
// so the unsealing worker can later look it up and fetch it if needed
|
||||||
|
// sending nil here will remove all unsealed files after sector is finalized.
|
||||||
|
ppt.finalizeSector(t, nil)
|
||||||
|
|
||||||
|
// Read the piece -> have to unseal since we removed the file.
|
||||||
|
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
|
||||||
|
true, pd1)
|
||||||
|
|
||||||
|
// Read the same piece again -> will NOT have to unseal.
|
||||||
|
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size, false, pd1)
|
||||||
|
|
||||||
|
// remove the unsealed file and read again -> will have to unseal.
|
||||||
|
ppt.removeAllUnsealedSectorFiles(t)
|
||||||
|
ppt.readPiece(t, storiface.UnpaddedByteIndex(0), pd1size,
|
||||||
|
true, pd1)
|
||||||
|
|
||||||
|
// Read Piece 2 -> no unsealing as it got unsealed above.
|
||||||
|
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, false, pd2)
|
||||||
|
|
||||||
|
// remove all unseal files -> Read Piece 2 -> will have to Unseal.
|
||||||
|
ppt.removeAllUnsealedSectorFiles(t)
|
||||||
|
ppt.readPiece(t, storiface.UnpaddedByteIndex(pd1size), pd2size, true, pd2)
|
||||||
|
}
|
||||||
|
|
||||||
|
type pieceProviderTestHarness struct {
|
||||||
|
ctx context.Context
|
||||||
|
index *stores.Index
|
||||||
|
pp PieceProvider
|
||||||
|
sector specstorage.SectorRef
|
||||||
|
mgr *Manager
|
||||||
|
ticket abi.SealRandomness
|
||||||
|
commD cid.Cid
|
||||||
|
localStores []*stores.Local
|
||||||
|
|
||||||
|
servers []*http.Server
|
||||||
|
|
||||||
|
addedPieces []abi.PieceInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func generatePieceData(size uint64) []byte {
|
||||||
|
bz := make([]byte, size)
|
||||||
|
rand.Read(bz)
|
||||||
|
return bz
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPieceProviderTestHarness(t *testing.T, mgrConfig SealerConfig, sectorProofType abi.RegisteredSealProof) *pieceProviderTestHarness {
|
||||||
|
ctx := context.Background()
|
||||||
|
// listen on tcp socket to create an http server later
|
||||||
|
address := "0.0.0.0:0"
|
||||||
|
nl, err := net.Listen("tcp", address)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// create index, storage, local store & remote store.
|
||||||
|
index := stores.NewIndex()
|
||||||
|
storage := newTestStorage(t)
|
||||||
|
localStore, err := stores.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
remoteStore := stores.NewRemote(localStore, index, nil, 6000, &stores.DefaultPartialFileHandler{})
|
||||||
|
|
||||||
|
// data stores for state tracking.
|
||||||
|
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
|
||||||
|
wsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/worker/calls")))
|
||||||
|
smsts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
|
||||||
|
|
||||||
|
mgr, err := New(ctx, localStore, remoteStore, storage, index, mgrConfig, wsts, smsts)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// start a http server on the manager to serve sector file requests.
|
||||||
|
svc := &http.Server{
|
||||||
|
Addr: nl.Addr().String(),
|
||||||
|
Handler: mgr,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
_ = svc.Serve(nl)
|
||||||
|
}()
|
||||||
|
|
||||||
|
pp := NewPieceProvider(remoteStore, index, mgr)
|
||||||
|
|
||||||
|
sector := specstorage.SectorRef{
|
||||||
|
ID: abi.SectorID{
|
||||||
|
Miner: 100,
|
||||||
|
Number: 10,
|
||||||
|
},
|
||||||
|
ProofType: sectorProofType,
|
||||||
|
}
|
||||||
|
|
||||||
|
ticket := abi.SealRandomness{9, 9, 9, 9, 9, 9, 9, 9}
|
||||||
|
|
||||||
|
ppt := &pieceProviderTestHarness{
|
||||||
|
ctx: ctx,
|
||||||
|
index: index,
|
||||||
|
pp: pp,
|
||||||
|
sector: sector,
|
||||||
|
mgr: mgr,
|
||||||
|
ticket: ticket,
|
||||||
|
}
|
||||||
|
ppt.servers = append(ppt.servers, svc)
|
||||||
|
ppt.localStores = append(ppt.localStores, localStore)
|
||||||
|
return ppt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtasks.TaskType) {
|
||||||
|
// start an http Server
|
||||||
|
address := "0.0.0.0:0"
|
||||||
|
nl, err := net.Listen("tcp", address)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
localStore, err := stores.NewLocal(p.ctx, newTestStorage(t), p.index, []string{"http://" + nl.Addr().String() + "/remote"})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
fh := &stores.FetchHandler{
|
||||||
|
Local: localStore,
|
||||||
|
PfHandler: &stores.DefaultPartialFileHandler{},
|
||||||
|
}
|
||||||
|
|
||||||
|
mux := mux.NewRouter()
|
||||||
|
mux.PathPrefix("/remote").HandlerFunc(fh.ServeHTTP)
|
||||||
|
svc := &http.Server{
|
||||||
|
Addr: nl.Addr().String(),
|
||||||
|
Handler: mux,
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_ = svc.Serve(nl)
|
||||||
|
}()
|
||||||
|
|
||||||
|
remote := stores.NewRemote(localStore, p.index, nil, 1000,
|
||||||
|
&stores.DefaultPartialFileHandler{})
|
||||||
|
|
||||||
|
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
|
||||||
|
csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
|
||||||
|
|
||||||
|
worker := newLocalWorker(nil, WorkerConfig{
|
||||||
|
TaskTypes: tasks,
|
||||||
|
}, remote, localStore, p.index, p.mgr, csts)
|
||||||
|
|
||||||
|
p.servers = append(p.servers, svc)
|
||||||
|
p.localStores = append(p.localStores, localStore)
|
||||||
|
|
||||||
|
// register self with manager
|
||||||
|
require.NoError(t, p.mgr.AddWorker(p.ctx, worker))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceProviderTestHarness) removeAllUnsealedSectorFiles(t *testing.T) {
|
||||||
|
for i := range p.localStores {
|
||||||
|
ls := p.localStores[i]
|
||||||
|
require.NoError(t, ls.Remove(p.ctx, p.sector.ID, storiface.FTUnsealed, false))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceProviderTestHarness) addPiece(t *testing.T, pieceData []byte) abi.PieceInfo {
|
||||||
|
var existing []abi.UnpaddedPieceSize
|
||||||
|
for _, pi := range p.addedPieces {
|
||||||
|
existing = append(existing, pi.Size.Unpadded())
|
||||||
|
}
|
||||||
|
|
||||||
|
size := abi.UnpaddedPieceSize(len(pieceData))
|
||||||
|
pieceInfo, err := p.mgr.AddPiece(p.ctx, p.sector, existing, size, bytes.NewReader(pieceData))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
p.addedPieces = append(p.addedPieces, pieceInfo)
|
||||||
|
return pieceInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceProviderTestHarness) preCommit1(t *testing.T) specstorage.PreCommit1Out {
|
||||||
|
preCommit1, err := p.mgr.SealPreCommit1(p.ctx, p.sector, p.ticket, p.addedPieces)
|
||||||
|
require.NoError(t, err)
|
||||||
|
return preCommit1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceProviderTestHarness) preCommit2(t *testing.T, pc1 specstorage.PreCommit1Out) {
|
||||||
|
sectorCids, err := p.mgr.SealPreCommit2(p.ctx, p.sector, pc1)
|
||||||
|
require.NoError(t, err)
|
||||||
|
commD := sectorCids.Unsealed
|
||||||
|
p.commD = commD
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceProviderTestHarness) readPiece(t *testing.T, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize,
|
||||||
|
expectedHadToUnseal bool, expectedBytes []byte) {
|
||||||
|
rd, isUnsealed, err := p.pp.ReadPiece(p.ctx, p.sector, offset, size, p.ticket, p.commD)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, rd)
|
||||||
|
require.Equal(t, expectedHadToUnseal, isUnsealed)
|
||||||
|
defer func() { _ = rd.Close() }()
|
||||||
|
|
||||||
|
// Make sure the input matches the output
|
||||||
|
readData, err := ioutil.ReadAll(rd)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, expectedBytes, readData)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceProviderTestHarness) finalizeSector(t *testing.T, keepUnseal []specstorage.Range) {
|
||||||
|
require.NoError(t, p.mgr.FinalizeSector(p.ctx, p.sector, keepUnseal))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *pieceProviderTestHarness) shutdown(t *testing.T) {
|
||||||
|
for _, svc := range p.servers {
|
||||||
|
s := svc
|
||||||
|
require.NoError(t, s.Shutdown(p.ctx))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user