From b2dfaae68c7688dbbfd02142edd76ce18162688a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 9 Sep 2022 11:11:47 +0200 Subject: [PATCH] sector import: Test remote commit1 retry --- itests/kit/node_miner.go | 8 +- itests/sector_import_full_test.go | 221 ++++++++++++++++++ ...t_test.go => sector_import_simple_test.go} | 47 ++-- storage/pipeline/receive.go | 1 - storage/pipeline/states_failed.go | 4 +- 5 files changed, 260 insertions(+), 21 deletions(-) create mode 100644 itests/sector_import_full_test.go rename itests/{sector_import_test.go => sector_import_simple_test.go} (85%) diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index 8805ac36c..b08ce1847 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -97,6 +97,10 @@ func (tm *TestMiner) PledgeSectors(ctx context.Context, n, existing int, blockNo } func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.SectorNumber]struct{}) { + tm.WaitSectorsProvingAllowFails(ctx, toCheck, map[api.SectorState]struct{}{}) +} + +func (tm *TestMiner) WaitSectorsProvingAllowFails(ctx context.Context, toCheck map[abi.SectorNumber]struct{}, okFails map[api.SectorState]struct{}) { for len(toCheck) > 0 { tm.FlushSealingBatches(ctx) @@ -109,7 +113,9 @@ func (tm *TestMiner) WaitSectorsProving(ctx context.Context, toCheck map[abi.Sec delete(toCheck, n) } if strings.Contains(string(st.State), "Fail") { - tm.t.Fatal("sector in a failed state", st.State) + if _, ok := okFails[st.State]; !ok { + tm.t.Fatal("sector in a failed state", st.State) + } } } diff --git a/itests/sector_import_full_test.go b/itests/sector_import_full_test.go new file mode 100644 index 000000000..3be72e8eb --- /dev/null +++ b/itests/sector_import_full_test.go @@ -0,0 +1,221 @@ +package itests + +import ( + "bytes" + "context" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" + + "github.com/gorilla/mux" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/itests/kit" + sealing "github.com/filecoin-project/lotus/storage/pipeline" + "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" + "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +func TestSectorImport(t *testing.T) { + + type testCase struct { + c1handler func(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) + } + + makeTest := func(mut func(*testCase)) *testCase { + tc := &testCase{ + c1handler: remoteCommit1, + } + mut(tc) + return tc + } + + runTest := func(tc *testCase) func(t *testing.T) { + return func(t *testing.T) { + kit.QuietMiningLogs() + + var blockTime = 50 * time.Millisecond + + //////// + // Start a miner node + + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) + ens.InterconnectAll().BeginMining(blockTime) + + ctx := context.Background() + + //////// + // Reserve some sector numbers on the miner node; We'll use one of those when creating the sector "remotely" + snums, err := miner.SectorNumReserveCount(ctx, "test-reservation-0001", 16) + require.NoError(t, err) + + sectorDir := t.TempDir() + + maddr, err := miner.ActorAddress(ctx) + require.NoError(t, err) + + mid, err := address.IDFromAddress(maddr) + require.NoError(t, err) + + spt, err := currentSealProof(ctx, client, maddr) + require.NoError(t, err) + + ssize, err := spt.SectorSize() + require.NoError(t, err) + + pieceSize := abi.PaddedPieceSize(ssize) + + //////// + // Create/Seal a sector up to pc2 outside of the pipeline + + // get one sector number from the reservation done on the miner above + sn, err := snums.First() + require.NoError(t, err) + + // create all the sector identifiers + snum := abi.SectorNumber(sn) + sid := abi.SectorID{Miner: abi.ActorID(mid), Number: snum} + sref := storiface.SectorRef{ID: sid, ProofType: spt} + + // create a low-level sealer instance + sealer, err := ffiwrapper.New(&basicfs.Provider{ + Root: sectorDir, + }) + require.NoError(t, err) + + // CRETE THE UNSEALED FILE + + // create a reader for all-zero (CC) data + dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded()))) + + // create the unsealed CC sector file + pieceInfo, err := sealer.AddPiece(ctx, sref, nil, pieceSize.Unpadded(), dataReader) + require.NoError(t, err) + + // GENERATE THE TICKET + + // get most recent valid ticket epoch + ts, err := client.ChainHead(ctx) + require.NoError(t, err) + ticketEpoch := ts.Height() - policy.SealRandomnessLookback + + // ticket entropy is cbor-seriasized miner addr + buf := new(bytes.Buffer) + require.NoError(t, maddr.MarshalCBOR(buf)) + + // generate ticket randomness + rand, err := client.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key()) + require.NoError(t, err) + + // EXECUTE PRECOMMIT 1 / 2 + + // run PC1 + pc1out, err := sealer.SealPreCommit1(ctx, sref, abi.SealRandomness(rand), []abi.PieceInfo{pieceInfo}) + require.NoError(t, err) + + // run pc2 + scids, err := sealer.SealPreCommit2(ctx, sref, pc1out) + require.NoError(t, err) + + // make finalized cache, put it in [sectorDir]/fin-cache while keeping the large cache for remote C1 + finDst := filepath.Join(sectorDir, "fin-cache", fmt.Sprintf("s-t01000-%d", snum)) + require.NoError(t, os.MkdirAll(finDst, 0777)) + require.NoError(t, sealer.FinalizeSectorInto(ctx, sref, finDst)) + + //////// + // start http server serving sector data + + m := mux.NewRouter() + m.HandleFunc("/sectors/{type}/{id}", remoteGetSector(sectorDir)).Methods("GET") + m.HandleFunc("/sectors/{id}/commit1", tc.c1handler(sealer)).Methods("POST") + srv := httptest.NewServer(m) + + unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum) + sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum) + cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum) + remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum) + + //////// + // import the sector and continue sealing + + err = miner.SectorReceive(ctx, api.RemoteSectorMeta{ + State: "PreCommitting", + Sector: sid, + Type: spt, + + Pieces: []api.SectorPiece{ + { + Piece: pieceInfo, + DealInfo: nil, + }, + }, + + TicketValue: abi.SealRandomness(rand), + TicketEpoch: ticketEpoch, + + PreCommit1Out: pc1out, + + CommD: &scids.Unsealed, + CommR: &scids.Sealed, + + DataUnsealed: &storiface.SectorData{ + Local: false, + URL: unsealedURL, + }, + DataSealed: &storiface.SectorData{ + Local: false, + URL: sealedURL, + }, + DataCache: &storiface.SectorData{ + Local: false, + URL: cacheURL, + }, + + RemoteCommit1Endpoint: remoteC1URL, + }) + require.NoError(t, err) + + // check that we see the imported sector + ng, err := miner.SectorsListNonGenesis(ctx) + require.NoError(t, err) + require.Len(t, ng, 1) + require.Equal(t, snum, ng[0]) + + miner.WaitSectorsProvingAllowFails(ctx, map[abi.SectorNumber]struct{}{snum: {}}, map[api.SectorState]struct{}{api.SectorState(sealing.RemoteCommit1Failed): {}}) + } + } + + // fail first remote c1, verifies that c1 retry works + t.Run("c1-retry", runTest(makeTest(func(testCase *testCase) { + prt := sealing.MinRetryTime + sealing.MinRetryTime = time.Second + t.Cleanup(func() { + sealing.MinRetryTime = prt + }) + + testCase.c1handler = func(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) { + var failedOnce bool + + return func(w http.ResponseWriter, r *http.Request) { + if !failedOnce { + failedOnce = true + w.WriteHeader(http.StatusBadGateway) + return + } + + remoteCommit1(s)(w, r) + } + } + }))) +} diff --git a/itests/sector_import_test.go b/itests/sector_import_simple_test.go similarity index 85% rename from itests/sector_import_test.go rename to itests/sector_import_simple_test.go index 7ff4fa48f..8b1b72fd7 100644 --- a/itests/sector_import_test.go +++ b/itests/sector_import_simple_test.go @@ -36,12 +36,16 @@ func TestSectorImportAfterPC2(t *testing.T) { var blockTime = 50 * time.Millisecond + //////// + // Start a miner node + client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC()) ens.InterconnectAll().BeginMining(blockTime) ctx := context.Background() - // get some sector numbers + //////// + // Reserve some sector numbers on the miner node; We'll use one of those when creating the sector "remotely" snums, err := miner.SectorNumReserveCount(ctx, "test-reservation-0001", 16) require.NoError(t, err) @@ -62,32 +66,34 @@ func TestSectorImportAfterPC2(t *testing.T) { pieceSize := abi.PaddedPieceSize(ssize) //////// - // seal a sector up to pc2 outside of the pipeline + // Create/Seal a sector up to pc2 outside of the pipeline + // get one sector number from the reservation done on the miner above sn, err := snums.First() require.NoError(t, err) + + // create all the sector identifiers snum := abi.SectorNumber(sn) - sid := abi.SectorID{ - Miner: abi.ActorID(mid), - Number: snum, - } - - sref := storiface.SectorRef{ - ID: sid, - ProofType: spt, - } + sid := abi.SectorID{Miner: abi.ActorID(mid), Number: snum} + sref := storiface.SectorRef{ID: sid, ProofType: spt} + // create a low-level sealer instance sealer, err := ffiwrapper.New(&basicfs.Provider{ Root: sectorDir, }) require.NoError(t, err) + // CRETE THE UNSEALED FILE + + // create a reader for all-zero (CC) data dataReader := bytes.NewReader(bytes.Repeat([]byte{0}, int(pieceSize.Unpadded()))) - // create the unsealed sector file + // create the unsealed CC sector file pieceInfo, err := sealer.AddPiece(ctx, sref, nil, pieceSize.Unpadded(), dataReader) require.NoError(t, err) + // GENERATE THE TICKET + // get most recent valid ticket epoch ts, err := client.ChainHead(ctx) require.NoError(t, err) @@ -101,6 +107,8 @@ func TestSectorImportAfterPC2(t *testing.T) { rand, err := client.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key()) require.NoError(t, err) + // EXECUTE PRECOMMIT 1 / 2 + // run PC1 pc1out, err := sealer.SealPreCommit1(ctx, sref, abi.SealRandomness(rand), []abi.PieceInfo{pieceInfo}) require.NoError(t, err) @@ -109,7 +117,7 @@ func TestSectorImportAfterPC2(t *testing.T) { scids, err := sealer.SealPreCommit2(ctx, sref, pc1out) require.NoError(t, err) - // make finalized cache, put it in [sectorDir]/fin-cache + // make finalized cache, put it in [sectorDir]/fin-cache while keeping the large cache for remote C1 finDst := filepath.Join(sectorDir, "fin-cache", fmt.Sprintf("s-t01000-%d", snum)) require.NoError(t, os.MkdirAll(finDst, 0777)) require.NoError(t, sealer.FinalizeSectorInto(ctx, sref, finDst)) @@ -122,6 +130,11 @@ func TestSectorImportAfterPC2(t *testing.T) { m.HandleFunc("/sectors/{id}/commit1", remoteCommit1(sealer)).Methods("POST") srv := httptest.NewServer(m) + unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum) + sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum) + cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum) + remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum) + //////// // import the sector and continue sealing @@ -147,18 +160,18 @@ func TestSectorImportAfterPC2(t *testing.T) { DataUnsealed: &storiface.SectorData{ Local: false, - URL: fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum), + URL: unsealedURL, }, DataSealed: &storiface.SectorData{ Local: false, - URL: fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum), + URL: sealedURL, }, DataCache: &storiface.SectorData{ Local: false, - URL: fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum), + URL: cacheURL, }, - RemoteCommit1Endpoint: fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum), + RemoteCommit1Endpoint: remoteC1URL, }) require.NoError(t, err) diff --git a/storage/pipeline/receive.go b/storage/pipeline/receive.go index fd4f64a66..13ee0ff02 100644 --- a/storage/pipeline/receive.go +++ b/storage/pipeline/receive.go @@ -126,7 +126,6 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta case GetTicket: fallthrough case Packing: - // todo check num free info.Return = ReturnState(meta.State) // todo dedupe states info.State = ReceiveSector diff --git a/storage/pipeline/states_failed.go b/storage/pipeline/states_failed.go index fb7145f50..7292961d2 100644 --- a/storage/pipeline/states_failed.go +++ b/storage/pipeline/states_failed.go @@ -19,12 +19,12 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -const minRetryTime = 1 * time.Minute +var MinRetryTime = 1 * time.Minute func failedCooldown(ctx statemachine.Context, sector SectorInfo) error { // TODO: Exponential backoff when we see consecutive failures - retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime) + retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(MinRetryTime) if len(sector.Log) > 0 && !time.Now().After(retryStart) { log.Infof("%s(%d), waiting %s before retrying", sector.State, sector.SectorNumber, time.Until(retryStart)) select {