package itests import ( "bytes" "context" "encoding/json" "fmt" "net/http" "net/http/httptest" "os" "path/filepath" "testing" "time" "github.com/gorilla/mux" "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api" lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" spaths "github.com/filecoin-project/lotus/storage/paths" sealing "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/sealer/tarutil" ) func TestSectorImport(t *testing.T) { type testCase struct { c1handler func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) mutateRemoteMeta func(*api.RemoteSectorMeta) expectImportErrContains string expectDoneSuccess bool expectDoneState string } makeTest := func(mut func(*testCase)) *testCase { tc := &testCase{ c1handler: testRemoteCommit1, expectDoneSuccess: true, expectDoneState: "Proving", } mut(tc) return tc } runTest := func(tc *testCase) func(t *testing.T) { return func(t *testing.T) { kit.QuietMiningLogs() var blockTime = 50 * time.Millisecond //////// // Start a miner node client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) ens.InterconnectAll().BeginMining(blockTime) ctx := context.Background() //////// // Reserve some sector numbers on the miner node; We'll use one of those when creating the sector "remotely" snums, err := miner.SectorNumReserveCount(ctx, "test-reservation-0001", 16) require.NoError(t, err) sectorDir := t.TempDir() maddr, err := miner.ActorAddress(ctx) require.NoError(t, err) mid, err := address.IDFromAddress(maddr) require.NoError(t, err) mi, err := client.StateMinerInfo(ctx, maddr, types.EmptyTSK) require.NoError(t, err) ver, err := client.StateNetworkVersion(ctx, types.EmptyTSK) require.NoError(t, err) spt, err := lminer.PreferredSealProofTypeFromWindowPoStType(ver, mi.WindowPoStProofType) require.NoError(t, err) ssize, err := spt.SectorSize() require.NoError(t, err) pieceSize := abi.PaddedPieceSize(ssize) //////// // Create/Seal a sector up to pc2 outside of the pipeline // get one sector number from the reservation done on the miner above sn, err := snums.First() require.NoError(t, err) // create all the sector identifiers snum := abi.SectorNumber(sn) sid := abi.SectorID{Miner: abi.ActorID(mid), Number: snum} sref := storiface.SectorRef{ID: sid, ProofType: spt} // create a low-level sealer instance sealer, err := ffiwrapper.New(&basicfs.Provider{ Root: sectorDir, }) require.NoError(t, err) // CREATE THE UNSEALED FILE // create a reader for all-zero (CC) data dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded()))) // create the unsealed CC sector file pieceInfo, err := sealer.AddPiece(ctx, sref, nil, pieceSize.Unpadded(), dataReader) require.NoError(t, err) // GENERATE THE TICKET // get most recent valid ticket epoch ts, err := client.ChainHead(ctx) require.NoError(t, err) ticketEpoch := ts.Height() - policy.SealRandomnessLookback // ticket entropy is cbor-seriasized miner addr buf := new(bytes.Buffer) require.NoError(t, maddr.MarshalCBOR(buf)) // generate ticket randomness rand, err := client.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key()) require.NoError(t, err) // EXECUTE PRECOMMIT 1 / 2 // run PC1 pc1out, err := sealer.SealPreCommit1(ctx, sref, abi.SealRandomness(rand), []abi.PieceInfo{pieceInfo}) require.NoError(t, err) // run pc2 scids, err := sealer.SealPreCommit2(ctx, sref, pc1out) require.NoError(t, err) // make finalized cache, put it in [sectorDir]/fin-cache while keeping the large cache for remote C1 finDst := filepath.Join(sectorDir, "fin-cache", fmt.Sprintf("s-t01000-%d", snum)) require.NoError(t, os.MkdirAll(finDst, 0777)) require.NoError(t, sealer.FinalizeSectorInto(ctx, sref, finDst)) //////// // start http server serving sector data doneResp := new(*api.RemoteSealingDoneParams) m := mux.NewRouter() m.HandleFunc("/sectors/{type}/{id}", testRemoteGetSector(sectorDir)).Methods("GET") m.HandleFunc("/sectors/{id}/commit1", tc.c1handler(sealer, miner)).Methods("POST") m.HandleFunc("/sectors/{id}/sealed", testRemoteDone(doneResp)).Methods("POST") m.HandleFunc("/commit2", testRemoteCommit2(sealer)).Methods("POST") srv := httptest.NewServer(m) unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum) sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum) cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum) remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum) remoteC2URL := fmt.Sprintf("%s/commit2", srv.URL) doneURL := fmt.Sprintf("%s/sectors/s-t0%d-%d/sealed", srv.URL, mid, snum) //////// // import the sector and continue sealing rmeta := api.RemoteSectorMeta{ State: "PreCommitting", Sector: sid, Type: spt, Pieces: []api.SectorPiece{ { Piece: pieceInfo, DealInfo: nil, }, }, TicketValue: abi.SealRandomness(rand), TicketEpoch: ticketEpoch, PreCommit1Out: pc1out, CommD: &scids.Unsealed, CommR: &scids.Sealed, DataUnsealed: &storiface.SectorLocation{ Local: false, URL: unsealedURL, }, DataSealed: &storiface.SectorLocation{ Local: false, URL: sealedURL, }, DataCache: &storiface.SectorLocation{ Local: false, URL: cacheURL, }, RemoteCommit1Endpoint: remoteC1URL, RemoteCommit2Endpoint: remoteC2URL, RemoteSealingDoneEndpoint: doneURL, } if tc.mutateRemoteMeta != nil { tc.mutateRemoteMeta(&rmeta) } err = miner.SectorReceive(ctx, rmeta) if tc.expectImportErrContains != "" { require.ErrorContains(t, err, tc.expectImportErrContains) return } require.NoError(t, err) // check that we see the imported sector ng, err := miner.SectorsListNonGenesis(ctx) require.NoError(t, err) require.Len(t, ng, 1) require.Equal(t, snum, ng[0]) miner.WaitSectorsProvingAllowFails(ctx, map[abi.SectorNumber]struct{}{snum: {}}, map[api.SectorState]struct{}{api.SectorState(sealing.RemoteCommitFailed): {}}) require.NotNil(t, *doneResp) require.Equal(t, tc.expectDoneSuccess, (*doneResp).Successful) require.Equal(t, tc.expectDoneState, (*doneResp).State) if tc.expectDoneSuccess { require.NotNil(t, (*doneResp).CommitMessage) } } } // fail first remote c1, verifies that c1 retry works t.Run("c1-retry", runTest(makeTest(func(testCase *testCase) { prt := sealing.MinRetryTime sealing.MinRetryTime = time.Second t.Cleanup(func() { sealing.MinRetryTime = prt }) testCase.c1handler = func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) { var failedOnce bool return func(w http.ResponseWriter, r *http.Request) { if !failedOnce { failedOnce = true w.WriteHeader(http.StatusBadGateway) return } testRemoteCommit1(s, m)(w, r) } } }))) t.Run("c1-fail-remove", runTest(makeTest(func(testCase *testCase) { prt := sealing.MinRetryTime sealing.MinRetryTime = time.Second t.Cleanup(func() { sealing.MinRetryTime = prt }) testCase.c1handler = func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) id, err := storiface.ParseSectorID(vars["id"]) if err != nil { panic(err) } err = m.SectorRemove(r.Context(), id.Number) if err != nil { panic(err) } w.WriteHeader(http.StatusBadGateway) } } testCase.expectDoneSuccess = false testCase.expectDoneState = "Removing" }))) t.Run("nil-commd", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { meta.CommD = nil } testCase.expectImportErrContains = "both CommR/CommD cids need to be set for sectors in PreCommitting and later states" }))) t.Run("nil-commr", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { meta.CommR = nil } testCase.expectImportErrContains = "both CommR/CommD cids need to be set for sectors in PreCommitting and later states" }))) t.Run("nil-uns", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { meta.DataUnsealed = nil } testCase.expectImportErrContains = "expected DataUnsealed to be set" }))) t.Run("nil-sealed", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { meta.DataSealed = nil } testCase.expectImportErrContains = "expected DataSealed to be set" }))) t.Run("nil-cache", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { meta.DataCache = nil } testCase.expectImportErrContains = "expected DataCache to be set" }))) t.Run("bad-commd", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { meta.CommD = meta.CommR } testCase.expectImportErrContains = "CommD cid has wrong prefix" }))) t.Run("bad-commr", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { meta.CommR = meta.CommD } testCase.expectImportErrContains = "CommR cid has wrong prefix" }))) t.Run("bad-ticket", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { // flip one bit meta.TicketValue[23] ^= 4 } testCase.expectImportErrContains = "tickets differ" }))) } // note: stuff below is almost the same as in _simple version of this file; We need // to copy it because on Circle we can't call those functions between test files, // and for the _simple test we want everything in one file to make it easy to follow func testRemoteDone(rs **api.RemoteSealingDoneParams) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { *rs = new(api.RemoteSealingDoneParams) if err := json.NewDecoder(r.Body).Decode(*rs); err != nil { w.WriteHeader(500) return } w.WriteHeader(200) } } func testRemoteCommit1(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) // validate sector id id, err := storiface.ParseSectorID(vars["id"]) if err != nil { w.WriteHeader(500) return } var params api.RemoteCommit1Params if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil { w.WriteHeader(500) return } sref := storiface.SectorRef{ ID: id, ProofType: params.ProofType, } ssize, err := params.ProofType.SectorSize() if err != nil { w.WriteHeader(500) return } p, err := s.SealCommit1(r.Context(), sref, params.Ticket, params.Seed, []abi.PieceInfo{ { Size: abi.PaddedPieceSize(ssize), PieceCID: params.Unsealed, }, }, storiface.SectorCids{ Unsealed: params.Unsealed, Sealed: params.Sealed, }) if err != nil { w.WriteHeader(500) return } if _, err := w.Write(p); err != nil { fmt.Println("c1 write error") } } } func testRemoteCommit2(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { var params api.RemoteCommit2Params if err := json.NewDecoder(r.Body).Decode(¶ms); err != nil { w.WriteHeader(500) return } sref := storiface.SectorRef{ ID: params.Sector, ProofType: params.ProofType, } p, err := s.SealCommit2(r.Context(), sref, params.Commit1Out) if err != nil { fmt.Println("c2 error: ", err) w.WriteHeader(500) return } if _, err := w.Write(p); err != nil { fmt.Println("c2 write error") } } } func testRemoteGetSector(sectorRoot string) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) // validate sector id id, err := storiface.ParseSectorID(vars["id"]) if err != nil { w.WriteHeader(500) return } // validate type _, err = spaths.FileTypeFromString(vars["type"]) if err != nil { w.WriteHeader(500) return } typ := vars["type"] if typ == "cache" { // if cache is requested, send the finalized cache we've created above typ = "fin-cache" } path := filepath.Join(sectorRoot, typ, vars["id"]) stat, err := os.Stat(path) if err != nil { w.WriteHeader(500) return } if stat.IsDir() { if _, has := r.Header["Range"]; has { w.WriteHeader(500) return } w.Header().Set("Content-Type", "application/x-tar") w.WriteHeader(200) err := tarutil.TarDirectory(path, w, make([]byte, 1<<20)) if err != nil { return } } else { w.Header().Set("Content-Type", "application/octet-stream") // will do a ranged read over the file at the given path if the caller has asked for a ranged read in the request headers. http.ServeFile(w, r, path) } fmt.Printf("served sector file/dir, sectorID=%+v, fileType=%s, path=%s\n", id, vars["type"], path) } }