feat: curio: Keep more sector metadata in the DB long-term (#11933)

* feat: curio: Migrate lotus-miner sector metadata into Curio DB

* curio seal: Transfer seal pipeline entries to long-term sector metadata table

* curio: Only open db in sectors migrate cmd

* curio: Add an envvar to force migration

* curio: Debugging sector migration

* curio: Fix typo in table name

* curio: Plumb KeepUnsealed into the sealing pipeline

* Don't add redundant keep_data to sdr pipeline pieces table

* fix transferFinalizedSectorData where check

* ui for sector fail

---------

Co-authored-by: Andrew Jackson (Ajax) <snadrus@gmail.com>
This commit is contained in:
Łukasz Magiera 2024-05-07 13:03:51 +02:00 committed by GitHub
parent 2d57bfe273
commit 01ec229749
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 422 additions and 7 deletions

View File

@ -279,7 +279,22 @@ func configToDB(d *MigrationData) {
chainApiInfo := fmt.Sprintf("%s:%s", string(token), ainfo.Addr)
d.MinerID, err = SaveConfigToLayer(d.MinerConfigPath, chainApiInfo)
shouldErrPrompt := func() bool {
i, _, err := (&promptui.Select{
Label: d.T("Unmigratable sectors found. Do you want to continue?"),
Items: []string{
d.T("Yes, continue"),
d.T("No, abort")},
Templates: d.selectTemplates,
}).Run()
if err != nil {
d.say(notice, "Aborting migration.", err.Error())
os.Exit(1)
}
return i == 1
}
d.MinerID, err = SaveConfigToLayerMigrateSectors(d.MinerConfigPath, chainApiInfo, shouldErrPrompt)
if err != nil {
d.say(notice, "Error saving config to layer: %s. Aborting Migration", err.Error())
os.Exit(1)

View File

@ -4,23 +4,29 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"path"
"strings"
"github.com/BurntSushi/toml"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/samber/lo"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/lib/must"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
sealing "github.com/filecoin-project/lotus/storage/pipeline"
)
const (
@ -29,7 +35,7 @@ const (
const FlagMinerRepoDeprecation = "storagerepo"
func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address.Address, err error) {
func SaveConfigToLayerMigrateSectors(minerRepoPath, chainApiInfo string, unmigSectorShouldFail func() bool) (minerAddress address.Address, err error) {
_, say := SetupLanguage()
ctx := context.Background()
@ -97,9 +103,6 @@ func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address
if err != nil {
return minerAddress, xerrors.Errorf("opening miner metadata datastore: %w", err)
}
defer func() {
// _ = mmeta.Close()
}()
maddrBytes, err := mmeta.Get(ctx, datastore.NewKey("miner-address"))
if err != nil {
@ -111,6 +114,12 @@ func SaveConfigToLayer(minerRepoPath, chainApiInfo string) (minerAddress address
return minerAddress, xerrors.Errorf("parsing miner actor address: %w", err)
}
if err := MigrateSectors(ctx, addr, mmeta, db, func(nSectors int) {
say(plain, "Migrating metadata for %d sectors.", nSectors)
}, unmigSectorShouldFail); err != nil {
return address.Address{}, xerrors.Errorf("migrating sectors: %w", err)
}
minerAddress = addr
curioCfg.Addresses = []config.CurioAddresses{{
@ -256,3 +265,166 @@ func ensureEmptyArrays(cfg *config.CurioConfig) {
cfg.Apis.ChainApiInfo = []string{}
}
}
func cidPtrToStrptr(c *cid.Cid) *string {
if c == nil {
return nil
}
s := c.String()
return &s
}
func coalescePtrs[A any](a, b *A) *A {
if a != nil {
return a
}
return b
}
func MigrateSectors(ctx context.Context, maddr address.Address, mmeta datastore.Batching, db *harmonydb.DB, logMig func(int), unmigSectorShouldFail func() bool) error {
mid, err := address.IDFromAddress(maddr)
if err != nil {
return xerrors.Errorf("getting miner ID: %w", err)
}
sts := statestore.New(namespace.Wrap(mmeta, datastore.NewKey(sealing.SectorStorePrefix)))
var sectors []sealing.SectorInfo
if err := sts.List(&sectors); err != nil {
return xerrors.Errorf("getting sector list: %w", err)
}
logMig(len(sectors))
migratableState := func(state sealing.SectorState) bool {
switch state {
case sealing.Proving, sealing.Available, sealing.Removed:
return true
default:
return false
}
}
unmigratable := map[sealing.SectorState]int{}
for _, sector := range sectors {
if !migratableState(sector.State) {
unmigratable[sector.State]++
continue
}
}
if len(unmigratable) > 0 {
fmt.Println("The following sector states are not migratable:")
for state, count := range unmigratable {
fmt.Printf(" %s: %d\n", state, count)
}
if unmigSectorShouldFail() {
return xerrors.Errorf("aborting migration because sectors were found that are not migratable.")
}
}
for _, sector := range sectors {
if !migratableState(sector.State) || sector.State == sealing.Removed {
continue
}
// Insert sector metadata
_, err := db.Exec(ctx, `
INSERT INTO sectors_meta (sp_id, sector_num, reg_seal_proof, ticket_epoch, ticket_value,
orig_sealed_cid, orig_unsealed_cid, cur_sealed_cid, cur_unsealed_cid,
msg_cid_precommit, msg_cid_commit, msg_cid_update, seed_epoch, seed_value)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT (sp_id, sector_num) DO UPDATE
SET reg_seal_proof = excluded.reg_seal_proof, ticket_epoch = excluded.ticket_epoch, ticket_value = excluded.ticket_value,
orig_sealed_cid = excluded.orig_sealed_cid, orig_unsealed_cid = excluded.orig_unsealed_cid, cur_sealed_cid = excluded.cur_sealed_cid,
cur_unsealed_cid = excluded.cur_unsealed_cid, msg_cid_precommit = excluded.msg_cid_precommit, msg_cid_commit = excluded.msg_cid_commit,
msg_cid_update = excluded.msg_cid_update, seed_epoch = excluded.seed_epoch, seed_value = excluded.seed_value`,
mid,
sector.SectorNumber,
sector.SectorType,
sector.TicketEpoch,
sector.TicketValue,
cidPtrToStrptr(sector.CommR),
cidPtrToStrptr(sector.CommD),
cidPtrToStrptr(coalescePtrs(sector.UpdateSealed, sector.CommR)),
cidPtrToStrptr(coalescePtrs(sector.UpdateUnsealed, sector.CommD)),
cidPtrToStrptr(sector.PreCommitMessage),
cidPtrToStrptr(sector.CommitMessage),
cidPtrToStrptr(sector.ReplicaUpdateMessage),
sector.SeedEpoch,
sector.SeedValue,
)
if err != nil {
b, _ := json.MarshalIndent(sector, "", " ")
fmt.Println(string(b))
return xerrors.Errorf("inserting/updating sectors_meta for sector %d: %w", sector.SectorNumber, err)
}
// Process each piece within the sector
for j, piece := range sector.Pieces {
dealID := int64(0)
startEpoch := int64(0)
endEpoch := int64(0)
var pamJSON *string
if piece.HasDealInfo() {
dealInfo := piece.DealInfo()
if dealInfo.Impl().DealProposal != nil {
dealID = int64(dealInfo.Impl().DealID)
}
startEpoch = int64(must.One(dealInfo.StartEpoch()))
endEpoch = int64(must.One(dealInfo.EndEpoch()))
if piece.Impl().PieceActivationManifest != nil {
pam, err := json.Marshal(piece.Impl().PieceActivationManifest)
if err != nil {
return xerrors.Errorf("error marshalling JSON for piece %d in sector %d: %w", j, sector.SectorNumber, err)
}
ps := string(pam)
pamJSON = &ps
}
}
// Splitting the SQL statement for readability and adding new fields
_, err = db.Exec(ctx, `
INSERT INTO sectors_meta_pieces (
sp_id, sector_num, piece_num, piece_cid, piece_size,
requested_keep_data, raw_data_size, start_epoch, orig_end_epoch,
f05_deal_id, ddo_pam
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (sp_id, sector_num, piece_num) DO UPDATE
SET
piece_cid = excluded.piece_cid,
piece_size = excluded.piece_size,
requested_keep_data = excluded.requested_keep_data,
raw_data_size = excluded.raw_data_size,
start_epoch = excluded.start_epoch,
orig_end_epoch = excluded.orig_end_epoch,
f05_deal_id = excluded.f05_deal_id,
ddo_pam = excluded.ddo_pam`,
mid,
sector.SectorNumber,
j,
piece.PieceCID(),
piece.Piece().Size,
piece.HasDealInfo(),
nil, // raw_data_size might be calculated based on the piece size, or retrieved if available
startEpoch,
endEpoch,
dealID,
pamJSON,
)
if err != nil {
b, _ := json.MarshalIndent(sector, "", " ")
fmt.Println(string(b))
return xerrors.Errorf("inserting/updating sector_meta_pieces for sector %d, piece %d: %w", sector.SectorNumber, j, err)
}
}
}
return nil
}

View File

@ -1 +0,0 @@
package main

View File

@ -3,6 +3,7 @@ package main
import (
"fmt"
"github.com/ipfs/go-datastore"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
@ -13,8 +14,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/cmd/curio/guidedsetup"
"github.com/filecoin-project/lotus/curiosrc/seal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/repo"
)
var sealCmd = &cli.Command{
@ -22,6 +25,7 @@ var sealCmd = &cli.Command{
Usage: "Manage the sealing pipeline",
Subcommands: []*cli.Command{
sealStartCmd,
sealMigrateLMSectorsCmd,
},
}
@ -133,3 +137,80 @@ var sealStartCmd = &cli.Command{
return nil
},
}
var sealMigrateLMSectorsCmd = &cli.Command{
Name: "migrate-lm-sectors",
Usage: "(debug tool) Copy LM sector metadata into Curio DB",
Hidden: true, // only needed in advanced cases where manual repair is needed
Flags: []cli.Flag{
&cli.StringFlag{
Name: "miner-repo",
Usage: "Path to miner repo",
Value: "~/.lotusminer",
},
&cli.BoolFlag{
Name: "seal-ignore",
Usage: "Ignore sectors that cannot be migrated",
Value: false,
EnvVars: []string{"CURUO_MIGRATE_SEAL_IGNORE"},
},
},
Action: func(cctx *cli.Context) error {
ctx := lcli.ReqContext(cctx)
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
r, err := repo.NewFS(cctx.String("miner-repo"))
if err != nil {
return err
}
ok, err := r.Exists()
if err != nil {
return err
}
if !ok {
return fmt.Errorf("repo not initialized at: %s", cctx.String("miner-repo"))
}
lr, err := r.LockRO(repo.StorageMiner)
if err != nil {
return fmt.Errorf("locking repo: %w", err)
}
defer func() {
err = lr.Close()
if err != nil {
fmt.Println("error closing repo: ", err)
}
}()
mmeta, err := lr.Datastore(ctx, "/metadata")
if err != nil {
return xerrors.Errorf("opening miner metadata datastore: %w", err)
}
maddrBytes, err := mmeta.Get(ctx, datastore.NewKey("miner-address"))
if err != nil {
return xerrors.Errorf("getting miner address datastore entry: %w", err)
}
addr, err := address.NewFromBytes(maddrBytes)
if err != nil {
return xerrors.Errorf("parsing miner actor address: %w", err)
}
unmigSectorShouldFail := func() bool { return !cctx.Bool("seal-ignore") }
err = guidedsetup.MigrateSectors(ctx, addr, mmeta, db, func(n int) {
fmt.Printf("Migrating %d sectors\n", n)
}, unmigSectorShouldFail)
if err != nil {
return xerrors.Errorf("migrating sectors: %w", err)
}
return nil
},
}

View File

@ -113,7 +113,8 @@ func (p *PieceIngester) AllocatePieceToSector(ctx context.Context, maddr address
mid, n, 0,
piece.DealProposal.PieceCID, piece.DealProposal.PieceSize,
source.String(), dataHdrJson, rawSize, !piece.KeepUnsealed,
piece.PublishCid, piece.DealID, dealProposalJson, piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch)
piece.PublishCid, piece.DealID, dealProposalJson,
piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch)
if err != nil {
return false, xerrors.Errorf("inserting into sectors_sdr_initial_pieces: %w", err)
}

View File

@ -3,6 +3,7 @@ package seal
import (
"bytes"
"context"
"fmt"
"golang.org/x/xerrors"
@ -150,9 +151,111 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
return false, xerrors.Errorf("inserting into message_waits: %w", err)
}
if err := s.transferFinalizedSectorData(ctx, sectorParams.SpID, sectorParams.SectorNumber); err != nil {
return false, xerrors.Errorf("transferring finalized sector data: %w", err)
}
return true, nil
}
func (s *SubmitCommitTask) transferFinalizedSectorData(ctx context.Context, spID, sectorNum int64) error {
if _, err := s.db.Exec(ctx, `
INSERT INTO sectors_meta (
sp_id,
sector_num,
reg_seal_proof,
ticket_epoch,
ticket_value,
orig_sealed_cid,
orig_unsealed_cid,
cur_sealed_cid,
cur_unsealed_cid,
msg_cid_precommit,
msg_cid_commit,
seed_epoch,
seed_value
)
SELECT
sp_id,
sector_number as sector_num,
reg_seal_proof,
ticket_epoch,
ticket_value,
tree_r_cid as orig_sealed_cid,
tree_d_cid as orig_unsealed_cid,
tree_r_cid as cur_sealed_cid,
tree_d_cid as cur_unsealed_cid,
precommit_msg_cid,
commit_msg_cid,
seed_epoch,
seed_value
FROM
sectors_sdr_pipeline
WHERE
sp_id = $1 AND
sector_number = $2
ON CONFLICT (sp_id, sector_num) DO UPDATE SET
reg_seal_proof = excluded.reg_seal_proof,
ticket_epoch = excluded.ticket_epoch,
ticket_value = excluded.ticket_value,
orig_sealed_cid = excluded.orig_sealed_cid,
cur_sealed_cid = excluded.cur_sealed_cid,
msg_cid_precommit = excluded.msg_cid_precommit,
msg_cid_commit = excluded.msg_cid_commit,
seed_epoch = excluded.seed_epoch,
seed_value = excluded.seed_value;
`, spID, sectorNum); err != nil {
return fmt.Errorf("failed to insert/update sectors_meta: %w", err)
}
// Execute the query for piece metadata
if _, err := s.db.Exec(ctx, `
INSERT INTO sectors_meta_pieces (
sp_id,
sector_num,
piece_num,
piece_cid,
piece_size,
requested_keep_data,
raw_data_size,
start_epoch,
orig_end_epoch,
f05_deal_id,
ddo_pam
)
SELECT
sp_id,
sector_number AS sector_num,
piece_index AS piece_num,
piece_cid,
piece_size,
not data_delete_on_finalize as requested_keep_data,
data_raw_size,
COALESCE(f05_deal_start_epoch, direct_start_epoch) as start_epoch,
COALESCE(f05_deal_end_epoch, direct_end_epoch) as orig_end_epoch,
f05_deal_id,
direct_piece_activation_manifest as ddo_pam
FROM
sectors_sdr_initial_pieces
WHERE
sp_id = $1 AND
sector_number = $2
ON CONFLICT (sp_id, sector_num, piece_num) DO UPDATE SET
piece_cid = excluded.piece_cid,
piece_size = excluded.piece_size,
requested_keep_data = excluded.requested_keep_data,
raw_data_size = excluded.raw_data_size,
start_epoch = excluded.start_epoch,
orig_end_epoch = excluded.orig_end_epoch,
f05_deal_id = excluded.f05_deal_id,
ddo_pam = excluded.ddo_pam;
`, spID, sectorNum); err != nil {
return fmt.Errorf("failed to insert/update sector_meta_pieces: %w", err)
}
return nil
}
func (s *SubmitCommitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
id := ids[0]
return &id, nil

View File

@ -0,0 +1,44 @@
CREATE TABLE sectors_meta (
sp_id BIGINT NOT NULL,
sector_num BIGINT NOT NULL,
reg_seal_proof INT NOT NULL,
ticket_epoch BIGINT NOT NULL,
ticket_value BYTEA NOT NULL,
orig_sealed_cid TEXT NOT NULL,
orig_unsealed_cid TEXT NOT NULL,
cur_sealed_cid TEXT NOT NULL,
cur_unsealed_cid TEXT NOT NULL,
msg_cid_precommit TEXT,
msg_cid_commit TEXT,
msg_cid_update TEXT, -- snapdeal update
seed_epoch BIGINT NOT NULL,
seed_value BYTEA NOT NULL,
PRIMARY KEY (sp_id, sector_num)
);
CREATE TABLE sectors_meta_pieces (
sp_id BIGINT NOT NULL,
sector_num BIGINT NOT NULL,
piece_num BIGINT NOT NULL,
piece_cid TEXT NOT NULL,
piece_size BIGINT NOT NULL, -- padded size
requested_keep_data BOOLEAN NOT NULL,
raw_data_size BIGINT, -- null = piece_size.unpadded()
start_epoch BIGINT,
orig_end_epoch BIGINT,
f05_deal_id BIGINT,
ddo_pam jsonb,
PRIMARY KEY (sp_id, sector_num, piece_num),
FOREIGN KEY (sp_id, sector_num) REFERENCES sectors_meta(sp_id, sector_num) ON DELETE CASCADE
);