From fad79f321813e6ed77722ecc6f0804a5212b4596 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 3 Aug 2022 19:37:02 +0530 Subject: [PATCH] respond to caller. Add itest --- itests/worker_test.go | 75 +++++++++++++++++++++++++++++++++++++++++ storage/sealer/sched.go | 6 ++-- 2 files changed, 77 insertions(+), 4 deletions(-) diff --git a/itests/worker_test.go b/itests/worker_test.go index 7328bd50f..8ef8017da 100644 --- a/itests/worker_test.go +++ b/itests/worker_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/google/uuid" logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" "golang.org/x/xerrors" @@ -14,6 +15,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" @@ -21,6 +23,7 @@ import ( "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" + sealing "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" "github.com/filecoin-project/lotus/storage/wdpost" @@ -401,3 +404,75 @@ func TestWindowPostWorkerManualPoSt(t *testing.T) { require.NoError(t, err) require.Len(t, lastPending, 0) } + +func TestSchedulerRemoveRequest(t *testing.T) { + ctx := context.Background() + _, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true), + kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTDataCid, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit2, sealtasks.TTUnseal})) // no mock proofs + + ens.InterconnectAll().BeginMining(50 * time.Millisecond) + + e, err := worker.Enabled(ctx) + require.NoError(t, err) + require.True(t, e) + + type SchedDiagRequestInfo struct { + Sector abi.SectorID + TaskType sealtasks.TaskType + Priority int + SchedId uuid.UUID + } + type SchedDiagInfo struct { + Requests []SchedDiagRequestInfo + OpenWindows []string + } + type i struct { + SchedInfo interface{} + ReturnedWork []string + Waiting []string + CallToWork map[string]string + EarlyRet []string + } + + go miner.PledgeSectors(ctx, 1, 0, nil) + // Keep checking till sector state is PC2, the request should get stuck as worker cannot process PC2 + for { + st, err := miner.SectorsStatus(ctx, 1, false) + require.NoError(t, err) + if st.State == api.SectorState(sealing.PreCommit2) { + break + } + } + + // Dump current scheduler info + schedb, err := miner.SealingSchedDiag(ctx, false) + if err != nil { + t.Log("Failed to dump scheduler state before: %w", err) + t.FailNow() + } + + var schedidb uuid.UUID + + // cast scheduler info and get the request UUID. Call the SealingRemoveRequest() + if schedb.(i).SchedInfo.(SchedDiagInfo).Requests[0].TaskType == "seal/v0/precommit/2" { + schedidb = schedb.(i).SchedInfo.(SchedDiagInfo).Requests[0].SchedId + err = miner.SealingRemoveRequest(ctx, schedidb) + if err != nil { + t.Log("Failed to dump scheduler state before: %w", err) + t.FailNow() + } + } + + // Dump the schduler again and compare the UUID if a request is present + // If no request present then pass the test + scheda, err := miner.SealingSchedDiag(ctx, false) + if err != nil { + t.Log("Failed to dump scheduler state before: %w", err) + t.FailNow() + } + + if len(scheda.(i).SchedInfo.(SchedDiagInfo).Requests) > 0 && scheda.(i).SchedInfo.(SchedDiagInfo).Requests[0].TaskType == "seal/v0/precommit/2" { + schedida := scheda.(i).SchedInfo.(SchedDiagInfo).Requests[0].SchedId + require.NotEqual(t, schedida, schedidb) + } +} diff --git a/storage/sealer/sched.go b/storage/sealer/sched.go index 5d97c61b6..335bb1249 100644 --- a/storage/sealer/sched.go +++ b/storage/sealer/sched.go @@ -406,6 +406,7 @@ func (sh *Scheduler) removeRequest(rmrequest *rmRequest) { if r.SchedId == rmrequest.id { queue.Remove(i) rmrequest.res <- nil + go r.respond(xerrors.Errorf("scheduling request removed")) return } } @@ -428,10 +429,7 @@ func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error select { case resp := <-ret: - if resp != nil { - return resp - } - return nil + return resp case <-sh.closing: return xerrors.New("closing") case <-ctx.Done():