diff --git a/api/api_storage.go b/api/api_storage.go index 6273c4881..5d7455340 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -573,12 +573,17 @@ type RemoteSectorMeta struct { // SEALING SERVICE HOOKS // URL - // todo better doc + // RemoteCommit1Endpoint is an URL of POST endpoint which lotus will call requesting Commit1 (seal_commit_phase1) + // request body will be json-serialized RemoteCommit1Params struct RemoteCommit1Endpoint string + // RemoteCommit2Endpoint is an URL of POST endpoint which lotus will call requesting Commit2 (seal_commit_phase2) + // request body will be json-serialized RemoteCommit2Params struct RemoteCommit2Endpoint string - // todo OnDone / OnStateChange + // RemoteSealingDoneEndpoint is called after the sector exists the sealing pipeline + // request body will be json-serialized RemoteSealingDoneParams struct + RemoteSealingDoneEndpoint string } type RemoteCommit1Params struct { @@ -597,3 +602,15 @@ type RemoteCommit2Params struct { // todo spec better Commit1Out storiface.Commit1Out } + +type RemoteSealingDoneParams struct { + // Successful is true if the sector has entered state considered as "successfully sealed" + Successful bool + + // State is the state the sector has entered + // For example "Proving" / "Removing" + State string + + // Optional commit message CID + CommitMessage *cid.Cid +} diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 3d42364dd..f2bd95d2f 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index d106596ba..c473f1904 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -3300,7 +3300,8 @@ Inputs: ] }, "RemoteCommit1Endpoint": "string value", - "RemoteCommit2Endpoint": "string value" + "RemoteCommit2Endpoint": "string value", + "RemoteSealingDoneEndpoint": "string value" } ] ``` diff --git a/itests/kit/node_miner.go b/itests/kit/node_miner.go index b08ce1847..83f6178f7 100644 --- a/itests/kit/node_miner.go +++ b/itests/kit/node_miner.go @@ -109,7 +109,7 @@ func (tm *TestMiner) WaitSectorsProvingAllowFails(ctx context.Context, toCheck m st, err := tm.StorageMiner.SectorsStatus(ctx, n, false) require.NoError(tm.t, err) states[st.State]++ - if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) { + if st.State == api.SectorState(sealing.Proving) || st.State == api.SectorState(sealing.Available) || st.State == api.SectorState(sealing.Removed) { delete(toCheck, n) } if strings.Contains(string(st.State), "Fail") { diff --git a/itests/sector_import_full_test.go b/itests/sector_import_full_test.go index b26a24145..761bee3a5 100644 --- a/itests/sector_import_full_test.go +++ b/itests/sector_import_full_test.go @@ -5,10 +5,6 @@ import ( "context" "encoding/json" "fmt" - lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/types" - spaths "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer/tarutil" "net/http" "net/http/httptest" "os" @@ -24,27 +20,37 @@ import ( "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api" + lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" + spaths "github.com/filecoin-project/lotus/storage/paths" sealing "github.com/filecoin-project/lotus/storage/pipeline" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs" "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/filecoin-project/lotus/storage/sealer/tarutil" ) func TestSectorImport(t *testing.T) { type testCase struct { - c1handler func(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) + c1handler func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) mutateRemoteMeta func(*api.RemoteSectorMeta) expectImportErrContains string + + expectDoneSuccess bool + expectDoneState string } makeTest := func(mut func(*testCase)) *testCase { tc := &testCase{ c1handler: testRemoteCommit1, + + expectDoneSuccess: true, + expectDoneState: "Proving", } mut(tc) return tc @@ -149,9 +155,12 @@ func TestSectorImport(t *testing.T) { //////// // start http server serving sector data + doneResp := new(*api.RemoteSealingDoneParams) + m := mux.NewRouter() m.HandleFunc("/sectors/{type}/{id}", testRemoteGetSector(sectorDir)).Methods("GET") - m.HandleFunc("/sectors/{id}/commit1", tc.c1handler(sealer)).Methods("POST") + m.HandleFunc("/sectors/{id}/commit1", tc.c1handler(sealer, miner)).Methods("POST") + m.HandleFunc("/sectors/{id}/sealed", testRemoteDone(doneResp)).Methods("POST") m.HandleFunc("/commit2", testRemoteCommit2(sealer)).Methods("POST") srv := httptest.NewServer(m) @@ -159,6 +168,8 @@ func TestSectorImport(t *testing.T) { sealedURL := fmt.Sprintf("%s/sectors/sealed/s-t0%d-%d", srv.URL, mid, snum) cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum) remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum) + remoteC2URL := fmt.Sprintf("%s/commit2", srv.URL) + doneURL := fmt.Sprintf("%s/sectors/s-t0%d-%d/sealed", srv.URL, mid, snum) //////// // import the sector and continue sealing @@ -196,7 +207,9 @@ func TestSectorImport(t *testing.T) { URL: cacheURL, }, - RemoteCommit1Endpoint: remoteC1URL, + RemoteCommit1Endpoint: remoteC1URL, + RemoteCommit2Endpoint: remoteC2URL, + RemoteSealingDoneEndpoint: doneURL, } if tc.mutateRemoteMeta != nil { @@ -218,6 +231,13 @@ func TestSectorImport(t *testing.T) { require.Equal(t, snum, ng[0]) miner.WaitSectorsProvingAllowFails(ctx, map[abi.SectorNumber]struct{}{snum: {}}, map[api.SectorState]struct{}{api.SectorState(sealing.RemoteCommitFailed): {}}) + + require.NotNil(t, *doneResp) + require.Equal(t, tc.expectDoneSuccess, (*doneResp).Successful) + require.Equal(t, tc.expectDoneState, (*doneResp).State) + if tc.expectDoneSuccess { + require.NotNil(t, (*doneResp).CommitMessage) + } } } @@ -229,7 +249,7 @@ func TestSectorImport(t *testing.T) { sealing.MinRetryTime = prt }) - testCase.c1handler = func(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) { + testCase.c1handler = func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) { var failedOnce bool return func(w http.ResponseWriter, r *http.Request) { @@ -244,6 +264,35 @@ func TestSectorImport(t *testing.T) { } }))) + t.Run("c1-fail-remove", runTest(makeTest(func(testCase *testCase) { + prt := sealing.MinRetryTime + sealing.MinRetryTime = time.Second + t.Cleanup(func() { + sealing.MinRetryTime = prt + }) + + testCase.c1handler = func(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + + id, err := storiface.ParseSectorID(vars["id"]) + if err != nil { + panic(err) + } + + err = m.SectorRemove(r.Context(), id.Number) + if err != nil { + panic(err) + } + + w.WriteHeader(http.StatusBadGateway) + } + } + + testCase.expectDoneSuccess = false + testCase.expectDoneState = "Removing" + }))) + t.Run("nil-commd", runTest(makeTest(func(testCase *testCase) { testCase.mutateRemoteMeta = func(meta *api.RemoteSectorMeta) { meta.CommD = nil @@ -302,8 +351,19 @@ func TestSectorImport(t *testing.T) { // note: stuff below is almost the same as in _simple version of this file; We need // to copy it because on Circle we can't call those functions between test files, // and for the _simple test we want everything in one file to make it easy to follow +func testRemoteDone(rs **api.RemoteSealingDoneParams) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + *rs = new(api.RemoteSealingDoneParams) + if err := json.NewDecoder(r.Body).Decode(*rs); err != nil { + w.WriteHeader(500) + return + } -func testRemoteCommit1(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(200) + } +} + +func testRemoteCommit1(s *ffiwrapper.Sealer, m *kit.TestMiner) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) diff --git a/itests/sector_import_simple_test.go b/itests/sector_import_simple_test.go index f75d0d842..94c1e6810 100644 --- a/itests/sector_import_simple_test.go +++ b/itests/sector_import_simple_test.go @@ -129,10 +129,14 @@ func TestSectorImportAfterPC2(t *testing.T) { //////// // start http server serving sector data + doneResp := new(*api.RemoteSealingDoneParams) + m := mux.NewRouter() m.HandleFunc("/sectors/{type}/{id}", remoteGetSector(sectorDir)).Methods("GET") m.HandleFunc("/sectors/{id}/commit1", remoteCommit1(sealer)).Methods("POST") + m.HandleFunc("/sectors/{id}/sealed", remoteDone(doneResp)).Methods("POST") m.HandleFunc("/commit2", remoteCommit2(sealer)).Methods("POST") + srv := httptest.NewServer(m) unsealedURL := fmt.Sprintf("%s/sectors/unsealed/s-t0%d-%d", srv.URL, mid, snum) @@ -140,6 +144,7 @@ func TestSectorImportAfterPC2(t *testing.T) { cacheURL := fmt.Sprintf("%s/sectors/cache/s-t0%d-%d", srv.URL, mid, snum) remoteC1URL := fmt.Sprintf("%s/sectors/s-t0%d-%d/commit1", srv.URL, mid, snum) remoteC2URL := fmt.Sprintf("%s/commit2", srv.URL) + doneURL := fmt.Sprintf("%s/sectors/s-t0%d-%d/sealed", srv.URL, mid, snum) //////// // import the sector and continue sealing @@ -177,8 +182,9 @@ func TestSectorImportAfterPC2(t *testing.T) { URL: cacheURL, }, - RemoteCommit1Endpoint: remoteC1URL, - RemoteCommit2Endpoint: remoteC2URL, + RemoteCommit1Endpoint: remoteC1URL, + RemoteCommit2Endpoint: remoteC2URL, + RemoteSealingDoneEndpoint: doneURL, }) require.NoError(t, err) @@ -189,6 +195,11 @@ func TestSectorImportAfterPC2(t *testing.T) { require.Equal(t, snum, ng[0]) miner.WaitSectorsProving(ctx, map[abi.SectorNumber]struct{}{snum: {}}) + + require.NotNil(t, *doneResp) + require.True(t, (*doneResp).Successful) + require.Equal(t, "Proving", (*doneResp).State) + require.NotNil(t, (*doneResp).CommitMessage) } func remoteCommit1(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) { @@ -239,6 +250,18 @@ func remoteCommit1(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Req } } +func remoteDone(rs **api.RemoteSealingDoneParams) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + *rs = new(api.RemoteSealingDoneParams) + if err := json.NewDecoder(r.Body).Decode(*rs); err != nil { + w.WriteHeader(500) + return + } + + w.WriteHeader(200) + } +} + func remoteCommit2(s *ffiwrapper.Sealer) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { var params api.RemoteCommit2Params diff --git a/storage/pipeline/cbor_gen.go b/storage/pipeline/cbor_gen.go index c595deebe..f4dbbd3ee 100644 --- a/storage/pipeline/cbor_gen.go +++ b/storage/pipeline/cbor_gen.go @@ -31,7 +31,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { cw := cbg.NewCborWriter(w) - if _, err := cw.Write([]byte{184, 38}); err != nil { + if _, err := cw.Write([]byte{184, 39}); err != nil { return err } @@ -749,6 +749,29 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { return err } + // t.RemoteSealingDoneEndpoint (string) (string) + if len("RemoteSealingDoneEndpoint") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"RemoteSealingDoneEndpoint\" was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("RemoteSealingDoneEndpoint"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("RemoteSealingDoneEndpoint")); err != nil { + return err + } + + if len(t.RemoteSealingDoneEndpoint) > cbg.MaxLength { + return xerrors.Errorf("Value in field t.RemoteSealingDoneEndpoint was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.RemoteSealingDoneEndpoint))); err != nil { + return err + } + if _, err := io.WriteString(w, string(t.RemoteSealingDoneEndpoint)); err != nil { + return err + } + // t.RemoteDataFinalized (bool) (bool) if len("RemoteDataFinalized") > cbg.MaxLength { return xerrors.Errorf("Value in field \"RemoteDataFinalized\" was too long") @@ -1547,6 +1570,17 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) { t.RemoteCommit2Endpoint = string(sval) } + // t.RemoteSealingDoneEndpoint (string) (string) + case "RemoteSealingDoneEndpoint": + + { + sval, err := cbg.ReadString(cr) + if err != nil { + return err + } + + t.RemoteSealingDoneEndpoint = string(sval) + } // t.RemoteDataFinalized (bool) (bool) case "RemoteDataFinalized": diff --git a/storage/pipeline/fsm.go b/storage/pipeline/fsm.go index fc0a93a85..264a6ca49 100644 --- a/storage/pipeline/fsm.go +++ b/storage/pipeline/fsm.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "reflect" "time" @@ -14,6 +15,8 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statemachine" + + "github.com/filecoin-project/lotus/api" ) func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) { @@ -142,8 +145,8 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto ), FinalizeSector: planOne( - on(SectorFinalized{}, Proving), - on(SectorFinalizedAvailable{}, Available), + onWithCB(SectorFinalized{}, Proving, maybeNotifyRemoteDone(true, "Proving")), + onWithCB(SectorFinalizedAvailable{}, Available, maybeNotifyRemoteDone(true, "Available")), on(SectorFinalizeFailed{}, FinalizeFailed), ), @@ -218,7 +221,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryWaitSeed{}, WaitSeed), on(SectorSealPreCommit1Failed{}, SealPreCommit1Failed), on(SectorPreCommitLanded{}, WaitSeed), - on(SectorDealsExpired{}, DealsExpired), + onWithCB(SectorDealsExpired{}, DealsExpired, maybeNotifyRemoteDone(false, "DealsExpired")), on(SectorInvalidDealIDs{}, RecoverDealIDs), ), ComputeProofFailed: planOne( @@ -241,9 +244,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryPreCommit{}, PreCommitting), on(SectorRetryCommitWait{}, CommitWait), on(SectorRetrySubmitCommit{}, SubmitCommit), - on(SectorDealsExpired{}, DealsExpired), + onWithCB(SectorDealsExpired{}, DealsExpired, maybeNotifyRemoteDone(false, "DealsExpired")), on(SectorInvalidDealIDs{}, RecoverDealIDs), - on(SectorTicketExpired{}, Removing), + onWithCB(SectorTicketExpired{}, Removing, maybeNotifyRemoteDone(false, "Removing")), ), FinalizeFailed: planOne( on(SectorRetryFinalize{}, FinalizeSector), @@ -736,6 +739,16 @@ func on(mut mutator, next SectorState) func() (mutator, func(*SectorInfo) (bool, } } +func onWithCB(mut mutator, next SectorState, cb func(info *SectorInfo)) func() (mutator, func(*SectorInfo) (bool, error)) { + return func() (mutator, func(*SectorInfo) (bool, error)) { + return mut, func(state *SectorInfo) (bool, error) { + cb(state) + state.State = next + return false, nil + } + } +} + // like `on`, but doesn't change state func apply(mut mutator) func() (mutator, func(*SectorInfo) (bool, error)) { return func() (mutator, func(*SectorInfo) (bool, error)) { @@ -812,3 +825,41 @@ func planOneOrIgnore(ts ...func() (mut mutator, next func(*SectorInfo) (more boo return cnt, nil } } + +func maybeNotifyRemoteDone(success bool, state string) func(*SectorInfo) { + return func(sector *SectorInfo) { + if sector.RemoteSealingDoneEndpoint == "" { + return + } + + reqData := api.RemoteSealingDoneParams{ + Successful: success, + State: state, + CommitMessage: sector.CommitMessage, + } + reqBody, err := json.Marshal(&reqData) + if err != nil { + log.Errorf("marshaling remote done notification request params: %s", err) + return + } + + req, err := http.NewRequest("POST", sector.RemoteSealingDoneEndpoint, bytes.NewReader(reqBody)) + if err != nil { + log.Errorf("creating new remote done notification request: %s", err) + return + } + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Errorf("sending remote done notification: %s", err) + return + } + + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + log.Errorf("remote done notification received non-200 http response %s", resp.Status) + return + } + } +} diff --git a/storage/pipeline/fsm_events.go b/storage/pipeline/fsm_events.go index 1fbd94fd0..f92f527ad 100644 --- a/storage/pipeline/fsm_events.go +++ b/storage/pipeline/fsm_events.go @@ -524,6 +524,9 @@ func (evt SectorTerminateFailed) apply(*SectorInfo) {} type SectorRemove struct{} func (evt SectorRemove) applyGlobal(state *SectorInfo) bool { + // because this event is global we need to send the notification here instead through an fsm callback + maybeNotifyRemoteDone(false, "Removing")(state) + state.State = Removing return true } diff --git a/storage/pipeline/receive.go b/storage/pipeline/receive.go index f06848bcb..e5eec5ab9 100644 --- a/storage/pipeline/receive.go +++ b/storage/pipeline/receive.go @@ -211,6 +211,15 @@ func (m *Sealing) checkSectorMeta(ctx context.Context, meta api.RemoteSectorMeta info.Pieces = meta.Pieces info.SectorType = meta.Type + if meta.RemoteSealingDoneEndpoint != "" { + // validate the url + if _, err := url.Parse(meta.RemoteSealingDoneEndpoint); err != nil { + return SectorInfo{}, xerrors.Errorf("parsing remote sealing-done endpoint url: %w", err) + } + + info.RemoteSealingDoneEndpoint = meta.RemoteSealingDoneEndpoint + } + if err := checkPieces(ctx, m.maddr, meta.Sector.Number, meta.Pieces, m.Api, false); err != nil { return SectorInfo{}, xerrors.Errorf("checking pieces: %w", err) } diff --git a/storage/pipeline/types.go b/storage/pipeline/types.go index 81a2a2fec..cb1d84383 100644 --- a/storage/pipeline/types.go +++ b/storage/pipeline/types.go @@ -94,12 +94,13 @@ type SectorInfo struct { TerminatedAt abi.ChainEpoch // Remote import - RemoteDataUnsealed *storiface.SectorData - RemoteDataSealed *storiface.SectorData - RemoteDataCache *storiface.SectorData - RemoteCommit1Endpoint string - RemoteCommit2Endpoint string - RemoteDataFinalized bool + RemoteDataUnsealed *storiface.SectorData + RemoteDataSealed *storiface.SectorData + RemoteDataCache *storiface.SectorData + RemoteCommit1Endpoint string + RemoteCommit2Endpoint string + RemoteSealingDoneEndpoint string + RemoteDataFinalized bool // Debug LastErr string