From 01ec22974942fb7328a1e665704c6cfd75d93372 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 7 May 2024 13:03:51 +0200 Subject: [PATCH] feat: curio: Keep more sector metadata in the DB long-term (#11933) * feat: curio: Migrate lotus-miner sector metadata into Curio DB * curio seal: Transfer seal pipeline entries to long-term sector metadata table * curio: Only open db in sectors migrate cmd * curio: Add an envvar to force migration * curio: Debugging sector migration * curio: Fix typo in table name * curio: Plumb KeepUnsealed into the sealing pipeline * Don't add redundant keep_data to sdr pipeline pieces table * fix transferFinalizedSectorData where check * ui for sector fail --------- Co-authored-by: Andrew Jackson (Ajax) --- cmd/curio/guidedsetup/guidedsetup.go | 17 +- cmd/curio/guidedsetup/shared.go | 180 +++++++++++++++++- cmd/curio/migrate.go | 1 - cmd/curio/pipeline.go | 81 ++++++++ curiosrc/market/deal_ingest.go | 3 +- curiosrc/seal/task_submit_commit.go | 103 ++++++++++ .../harmonydb/sql/20240425-sector_meta.sql | 44 +++++ 7 files changed, 422 insertions(+), 7 deletions(-) delete mode 100644 cmd/curio/migrate.go create mode 100644 lib/harmony/harmonydb/sql/20240425-sector_meta.sql diff --git a/cmd/curio/guidedsetup/guidedsetup.go b/cmd/curio/guidedsetup/guidedsetup.go index 1bdb8e784..44bca3b17 100644 --- a/cmd/curio/guidedsetup/guidedsetup.go +++ b/cmd/curio/guidedsetup/guidedsetup.go @@ -279,7 +279,22 @@ func configToDB(d *MigrationData) { chainApiInfo := fmt.Sprintf("%s:%s", string(token), ainfo.Addr) - d.MinerID, err = SaveConfigToLayer(d.MinerConfigPath, chainApiInfo) + shouldErrPrompt := func() bool { + i, _, err := (&promptui.Select{ + Label: d.T("Unmigratable sectors found. Do you want to continue?"), + Items: []string{ + d.T("Yes, continue"), + d.T("No, abort")}, + Templates: d.selectTemplates, + }).Run() + if err != nil { + d.say(notice, "Aborting migration.", err.Error()) + os.Exit(1) + } + return i == 1 + } + + d.MinerID, err = SaveConfigToLayerMigrateSectors(d.MinerConfigPath, chainApiInfo, shouldErrPrompt) if err != nil { d.say(notice, "Error saving config to layer: %s. Aborting Migration", err.Error()) os.Exit(1) diff --git a/cmd/curio/guidedsetup/shared.go b/cmd/curio/guidedsetup/shared.go index 6e7d81c03..0636bcfb1 100644 --- a/cmd/curio/guidedsetup/shared.go +++ b/cmd/curio/guidedsetup/shared.go @@ -4,23 +4,29 @@ import ( "bytes" "context" "encoding/base64" + "encoding/json" "fmt" "os" "path" "strings" "github.com/BurntSushi/toml" + "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" "github.com/samber/lo" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-statestore" "github.com/filecoin-project/lotus/cmd/curio/deps" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/lib/must" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/modules" "github.com/filecoin-project/lotus/node/repo" + sealing "github.com/filecoin-project/lotus/storage/pipeline" ) const ( @@ -29,7 +35,7 @@ const ( const FlagMinerRepoDeprecation = "storagerepo" -func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address.Address, err error) { +func SaveConfigToLayerMigrateSectors(minerRepoPath, chainApiInfo string, unmigSectorShouldFail func() bool) (minerAddress address.Address, err error) { _, say := SetupLanguage() ctx := context.Background() @@ -97,9 +103,6 @@ func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address if err != nil { return minerAddress, xerrors.Errorf("opening miner metadata datastore: %w", err) } - defer func() { - // _ = mmeta.Close() - }() maddrBytes, err := mmeta.Get(ctx, datastore.NewKey("miner-address")) if err != nil { @@ -111,6 +114,12 @@ func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address return minerAddress, xerrors.Errorf("parsing miner actor address: %w", err) } + if err := MigrateSectors(ctx, addr, mmeta, db, func(nSectors int) { + say(plain, "Migrating metadata for %d sectors.", nSectors) + }, unmigSectorShouldFail); err != nil { + return address.Address{}, xerrors.Errorf("migrating sectors: %w", err) + } + minerAddress = addr curioCfg.Addresses = []config.CurioAddresses{{ @@ -256,3 +265,166 @@ func ensureEmptyArrays(cfg *config.CurioConfig) { cfg.Apis.ChainApiInfo = []string{} } } + +func cidPtrToStrptr(c *cid.Cid) *string { + if c == nil { + return nil + } + s := c.String() + return &s +} + +func coalescePtrs[A any](a, b *A) *A { + if a != nil { + return a + } + return b +} + +func MigrateSectors(ctx context.Context, maddr address.Address, mmeta datastore.Batching, db *harmonydb.DB, logMig func(int), unmigSectorShouldFail func() bool) error { + mid, err := address.IDFromAddress(maddr) + if err != nil { + return xerrors.Errorf("getting miner ID: %w", err) + } + + sts := statestore.New(namespace.Wrap(mmeta, datastore.NewKey(sealing.SectorStorePrefix))) + + var sectors []sealing.SectorInfo + if err := sts.List(§ors); err != nil { + return xerrors.Errorf("getting sector list: %w", err) + } + + logMig(len(sectors)) + + migratableState := func(state sealing.SectorState) bool { + switch state { + case sealing.Proving, sealing.Available, sealing.Removed: + return true + default: + return false + } + } + + unmigratable := map[sealing.SectorState]int{} + + for _, sector := range sectors { + if !migratableState(sector.State) { + unmigratable[sector.State]++ + continue + } + } + + if len(unmigratable) > 0 { + fmt.Println("The following sector states are not migratable:") + for state, count := range unmigratable { + fmt.Printf(" %s: %d\n", state, count) + } + + if unmigSectorShouldFail() { + return xerrors.Errorf("aborting migration because sectors were found that are not migratable.") + } + } + + for _, sector := range sectors { + if !migratableState(sector.State) || sector.State == sealing.Removed { + continue + } + + // Insert sector metadata + _, err := db.Exec(ctx, ` + INSERT INTO sectors_meta (sp_id, sector_num, reg_seal_proof, ticket_epoch, ticket_value, + orig_sealed_cid, orig_unsealed_cid, cur_sealed_cid, cur_unsealed_cid, + msg_cid_precommit, msg_cid_commit, msg_cid_update, seed_epoch, seed_value) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) + ON CONFLICT (sp_id, sector_num) DO UPDATE + SET reg_seal_proof = excluded.reg_seal_proof, ticket_epoch = excluded.ticket_epoch, ticket_value = excluded.ticket_value, + orig_sealed_cid = excluded.orig_sealed_cid, orig_unsealed_cid = excluded.orig_unsealed_cid, cur_sealed_cid = excluded.cur_sealed_cid, + cur_unsealed_cid = excluded.cur_unsealed_cid, msg_cid_precommit = excluded.msg_cid_precommit, msg_cid_commit = excluded.msg_cid_commit, + msg_cid_update = excluded.msg_cid_update, seed_epoch = excluded.seed_epoch, seed_value = excluded.seed_value`, + mid, + sector.SectorNumber, + sector.SectorType, + sector.TicketEpoch, + sector.TicketValue, + cidPtrToStrptr(sector.CommR), + cidPtrToStrptr(sector.CommD), + cidPtrToStrptr(coalescePtrs(sector.UpdateSealed, sector.CommR)), + cidPtrToStrptr(coalescePtrs(sector.UpdateUnsealed, sector.CommD)), + cidPtrToStrptr(sector.PreCommitMessage), + cidPtrToStrptr(sector.CommitMessage), + cidPtrToStrptr(sector.ReplicaUpdateMessage), + sector.SeedEpoch, + sector.SeedValue, + ) + if err != nil { + b, _ := json.MarshalIndent(sector, "", " ") + fmt.Println(string(b)) + + return xerrors.Errorf("inserting/updating sectors_meta for sector %d: %w", sector.SectorNumber, err) + } + + // Process each piece within the sector + for j, piece := range sector.Pieces { + dealID := int64(0) + startEpoch := int64(0) + endEpoch := int64(0) + var pamJSON *string + + if piece.HasDealInfo() { + dealInfo := piece.DealInfo() + if dealInfo.Impl().DealProposal != nil { + dealID = int64(dealInfo.Impl().DealID) + } + + startEpoch = int64(must.One(dealInfo.StartEpoch())) + endEpoch = int64(must.One(dealInfo.EndEpoch())) + if piece.Impl().PieceActivationManifest != nil { + pam, err := json.Marshal(piece.Impl().PieceActivationManifest) + if err != nil { + return xerrors.Errorf("error marshalling JSON for piece %d in sector %d: %w", j, sector.SectorNumber, err) + } + ps := string(pam) + pamJSON = &ps + } + } + + // Splitting the SQL statement for readability and adding new fields + _, err = db.Exec(ctx, ` + INSERT INTO sectors_meta_pieces ( + sp_id, sector_num, piece_num, piece_cid, piece_size, + requested_keep_data, raw_data_size, start_epoch, orig_end_epoch, + f05_deal_id, ddo_pam + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (sp_id, sector_num, piece_num) DO UPDATE + SET + piece_cid = excluded.piece_cid, + piece_size = excluded.piece_size, + requested_keep_data = excluded.requested_keep_data, + raw_data_size = excluded.raw_data_size, + start_epoch = excluded.start_epoch, + orig_end_epoch = excluded.orig_end_epoch, + f05_deal_id = excluded.f05_deal_id, + ddo_pam = excluded.ddo_pam`, + mid, + sector.SectorNumber, + j, + piece.PieceCID(), + piece.Piece().Size, + piece.HasDealInfo(), + nil, // raw_data_size might be calculated based on the piece size, or retrieved if available + startEpoch, + endEpoch, + dealID, + pamJSON, + ) + if err != nil { + b, _ := json.MarshalIndent(sector, "", " ") + fmt.Println(string(b)) + + return xerrors.Errorf("inserting/updating sector_meta_pieces for sector %d, piece %d: %w", sector.SectorNumber, j, err) + } + } + } + + return nil +} diff --git a/cmd/curio/migrate.go b/cmd/curio/migrate.go deleted file mode 100644 index 06ab7d0f9..000000000 --- a/cmd/curio/migrate.go +++ /dev/null @@ -1 +0,0 @@ -package main diff --git a/cmd/curio/pipeline.go b/cmd/curio/pipeline.go index 1c3f5d94a..8f57b5694 100644 --- a/cmd/curio/pipeline.go +++ b/cmd/curio/pipeline.go @@ -3,6 +3,7 @@ package main import ( "fmt" + "github.com/ipfs/go-datastore" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -13,8 +14,10 @@ import ( "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/cmd/curio/deps" + "github.com/filecoin-project/lotus/cmd/curio/guidedsetup" "github.com/filecoin-project/lotus/curiosrc/seal" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/node/repo" ) var sealCmd = &cli.Command{ @@ -22,6 +25,7 @@ var sealCmd = &cli.Command{ Usage: "Manage the sealing pipeline", Subcommands: []*cli.Command{ sealStartCmd, + sealMigrateLMSectorsCmd, }, } @@ -133,3 +137,80 @@ var sealStartCmd = &cli.Command{ return nil }, } + +var sealMigrateLMSectorsCmd = &cli.Command{ + Name: "migrate-lm-sectors", + Usage: "(debug tool) Copy LM sector metadata into Curio DB", + Hidden: true, // only needed in advanced cases where manual repair is needed + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "miner-repo", + Usage: "Path to miner repo", + Value: "~/.lotusminer", + }, + &cli.BoolFlag{ + Name: "seal-ignore", + Usage: "Ignore sectors that cannot be migrated", + Value: false, + EnvVars: []string{"CURUO_MIGRATE_SEAL_IGNORE"}, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + db, err := deps.MakeDB(cctx) + if err != nil { + return err + } + + r, err := repo.NewFS(cctx.String("miner-repo")) + if err != nil { + return err + } + + ok, err := r.Exists() + if err != nil { + return err + } + + if !ok { + return fmt.Errorf("repo not initialized at: %s", cctx.String("miner-repo")) + } + + lr, err := r.LockRO(repo.StorageMiner) + if err != nil { + return fmt.Errorf("locking repo: %w", err) + } + defer func() { + err = lr.Close() + if err != nil { + fmt.Println("error closing repo: ", err) + } + }() + + mmeta, err := lr.Datastore(ctx, "/metadata") + if err != nil { + return xerrors.Errorf("opening miner metadata datastore: %w", err) + } + + maddrBytes, err := mmeta.Get(ctx, datastore.NewKey("miner-address")) + if err != nil { + return xerrors.Errorf("getting miner address datastore entry: %w", err) + } + + addr, err := address.NewFromBytes(maddrBytes) + if err != nil { + return xerrors.Errorf("parsing miner actor address: %w", err) + } + + unmigSectorShouldFail := func() bool { return !cctx.Bool("seal-ignore") } + + err = guidedsetup.MigrateSectors(ctx, addr, mmeta, db, func(n int) { + fmt.Printf("Migrating %d sectors\n", n) + }, unmigSectorShouldFail) + if err != nil { + return xerrors.Errorf("migrating sectors: %w", err) + } + + return nil + }, +} diff --git a/curiosrc/market/deal_ingest.go b/curiosrc/market/deal_ingest.go index ea382717a..f3125887d 100644 --- a/curiosrc/market/deal_ingest.go +++ b/curiosrc/market/deal_ingest.go @@ -113,7 +113,8 @@ func (p *PieceIngester) AllocatePieceToSector(ctx context.Context, maddr address mid, n, 0, piece.DealProposal.PieceCID, piece.DealProposal.PieceSize, source.String(), dataHdrJson, rawSize, !piece.KeepUnsealed, - piece.PublishCid, piece.DealID, dealProposalJson, piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch) + piece.PublishCid, piece.DealID, dealProposalJson, + piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch) if err != nil { return false, xerrors.Errorf("inserting into sectors_sdr_initial_pieces: %w", err) } diff --git a/curiosrc/seal/task_submit_commit.go b/curiosrc/seal/task_submit_commit.go index d7f133db7..73d452e0e 100644 --- a/curiosrc/seal/task_submit_commit.go +++ b/curiosrc/seal/task_submit_commit.go @@ -3,6 +3,7 @@ package seal import ( "bytes" "context" + "fmt" "golang.org/x/xerrors" @@ -150,9 +151,111 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("inserting into message_waits: %w", err) } + if err := s.transferFinalizedSectorData(ctx, sectorParams.SpID, sectorParams.SectorNumber); err != nil { + return false, xerrors.Errorf("transferring finalized sector data: %w", err) + } + return true, nil } +func (s *SubmitCommitTask) transferFinalizedSectorData(ctx context.Context, spID, sectorNum int64) error { + if _, err := s.db.Exec(ctx, ` + INSERT INTO sectors_meta ( + sp_id, + sector_num, + reg_seal_proof, + ticket_epoch, + ticket_value, + orig_sealed_cid, + orig_unsealed_cid, + cur_sealed_cid, + cur_unsealed_cid, + msg_cid_precommit, + msg_cid_commit, + seed_epoch, + seed_value + ) + SELECT + sp_id, + sector_number as sector_num, + reg_seal_proof, + ticket_epoch, + ticket_value, + tree_r_cid as orig_sealed_cid, + tree_d_cid as orig_unsealed_cid, + tree_r_cid as cur_sealed_cid, + tree_d_cid as cur_unsealed_cid, + precommit_msg_cid, + commit_msg_cid, + seed_epoch, + seed_value + FROM + sectors_sdr_pipeline + WHERE + sp_id = $1 AND + sector_number = $2 + ON CONFLICT (sp_id, sector_num) DO UPDATE SET + reg_seal_proof = excluded.reg_seal_proof, + ticket_epoch = excluded.ticket_epoch, + ticket_value = excluded.ticket_value, + orig_sealed_cid = excluded.orig_sealed_cid, + cur_sealed_cid = excluded.cur_sealed_cid, + msg_cid_precommit = excluded.msg_cid_precommit, + msg_cid_commit = excluded.msg_cid_commit, + seed_epoch = excluded.seed_epoch, + seed_value = excluded.seed_value; + `, spID, sectorNum); err != nil { + return fmt.Errorf("failed to insert/update sectors_meta: %w", err) + } + + // Execute the query for piece metadata + if _, err := s.db.Exec(ctx, ` + INSERT INTO sectors_meta_pieces ( + sp_id, + sector_num, + piece_num, + piece_cid, + piece_size, + requested_keep_data, + raw_data_size, + start_epoch, + orig_end_epoch, + f05_deal_id, + ddo_pam + ) + SELECT + sp_id, + sector_number AS sector_num, + piece_index AS piece_num, + piece_cid, + piece_size, + not data_delete_on_finalize as requested_keep_data, + data_raw_size, + COALESCE(f05_deal_start_epoch, direct_start_epoch) as start_epoch, + COALESCE(f05_deal_end_epoch, direct_end_epoch) as orig_end_epoch, + f05_deal_id, + direct_piece_activation_manifest as ddo_pam + FROM + sectors_sdr_initial_pieces + WHERE + sp_id = $1 AND + sector_number = $2 + ON CONFLICT (sp_id, sector_num, piece_num) DO UPDATE SET + piece_cid = excluded.piece_cid, + piece_size = excluded.piece_size, + requested_keep_data = excluded.requested_keep_data, + raw_data_size = excluded.raw_data_size, + start_epoch = excluded.start_epoch, + orig_end_epoch = excluded.orig_end_epoch, + f05_deal_id = excluded.f05_deal_id, + ddo_pam = excluded.ddo_pam; + `, spID, sectorNum); err != nil { + return fmt.Errorf("failed to insert/update sector_meta_pieces: %w", err) + } + + return nil +} + func (s *SubmitCommitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { id := ids[0] return &id, nil diff --git a/lib/harmony/harmonydb/sql/20240425-sector_meta.sql b/lib/harmony/harmonydb/sql/20240425-sector_meta.sql new file mode 100644 index 000000000..4a83e0f95 --- /dev/null +++ b/lib/harmony/harmonydb/sql/20240425-sector_meta.sql @@ -0,0 +1,44 @@ +CREATE TABLE sectors_meta ( + sp_id BIGINT NOT NULL, + sector_num BIGINT NOT NULL, + + reg_seal_proof INT NOT NULL, + ticket_epoch BIGINT NOT NULL, + ticket_value BYTEA NOT NULL, + + orig_sealed_cid TEXT NOT NULL, + orig_unsealed_cid TEXT NOT NULL, + + cur_sealed_cid TEXT NOT NULL, + cur_unsealed_cid TEXT NOT NULL, + + msg_cid_precommit TEXT, + msg_cid_commit TEXT, + msg_cid_update TEXT, -- snapdeal update + + seed_epoch BIGINT NOT NULL, + seed_value BYTEA NOT NULL, + + PRIMARY KEY (sp_id, sector_num) +); + +CREATE TABLE sectors_meta_pieces ( + sp_id BIGINT NOT NULL, + sector_num BIGINT NOT NULL, + piece_num BIGINT NOT NULL, + + piece_cid TEXT NOT NULL, + piece_size BIGINT NOT NULL, -- padded size + + requested_keep_data BOOLEAN NOT NULL, + raw_data_size BIGINT, -- null = piece_size.unpadded() + + start_epoch BIGINT, + orig_end_epoch BIGINT, + + f05_deal_id BIGINT, + ddo_pam jsonb, + + PRIMARY KEY (sp_id, sector_num, piece_num), + FOREIGN KEY (sp_id, sector_num) REFERENCES sectors_meta(sp_id, sector_num) ON DELETE CASCADE +);