lotus/curiosrc/market/deal_ingest.go
Andrew Jackson (Ajax) 81ba6ab6f0
feat: Curio - Easy Migration (#11617)
* feat: lp mig - first few steps

* lp mig: default tasks

* code comments

* docs

* lp-mig-progress

* shared

* comments and todos

* fix: curio: rename lotus-provider to curio (#11645)

* rename provider to curio

* install gotext

* fix lint errors, mod tidy

* fix typo

* fix API_INFO and add gotext to circleCI

* add back gotext

* add gotext after remerge

* lp: channels doc

* finish easy-migration TODOs

* out generate

* merging and more renames

* avoid make-all

* minor doc stuff

* cu: make gen

* make gen fix

* make gen

* tryfix

* go mod tidy

* minor ez migration fixes

* ez setup - ui cleanups

* better error message

* guided setup colors

* better path to saveconfigtolayer

* loadconfigwithupgrades fix

* readMiner oops

* guided - homedir

* err if miner is running

* prompt error should exit

* process already running, miner_id sectors in migration

* dont prompt for language a second time

* check miner stopped

* unlock repo

* render and sql oops

* curio easyMig - some fixes

* easyMigration runs successfully

* lint

* review fixes

* fix backup path

* fixes1

* fixes2

* fixes 3

---------

Co-authored-by: LexLuthr <88259624+LexLuthr@users.noreply.github.com>
Co-authored-by: LexLuthr <lexluthr@protocol.ai>
2024-03-15 16:38:13 -05:00

138 lines
5.0 KiB
Go

package market
import (
"context"
"encoding/json"
"net/http"
"net/url"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/curiosrc/seal"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)
type Ingester interface {
AllocatePieceToSector(ctx context.Context, maddr address.Address, piece api.PieceDealInfo, rawSize int64, source url.URL, header http.Header) (api.SectorOffset, error)
}
type PieceIngesterApi interface {
StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error)
StateMinerAllocated(ctx context.Context, a address.Address, key types.TipSetKey) (*bitfield.BitField, error)
StateNetworkVersion(ctx context.Context, key types.TipSetKey) (network.Version, error)
}
type PieceIngester struct {
db *harmonydb.DB
api PieceIngesterApi
}
func NewPieceIngester(db *harmonydb.DB, api PieceIngesterApi) *PieceIngester {
return &PieceIngester{db: db, api: api}
}
func (p *PieceIngester) AllocatePieceToSector(ctx context.Context, maddr address.Address, piece api.PieceDealInfo, rawSize int64, source url.URL, header http.Header) (api.SectorOffset, error) {
mi, err := p.api.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return api.SectorOffset{}, err
}
if piece.DealProposal.PieceSize != abi.PaddedPieceSize(mi.SectorSize) {
return api.SectorOffset{}, xerrors.Errorf("only full sector pieces supported for now")
}
// check raw size
if piece.DealProposal.PieceSize != padreader.PaddedSize(uint64(rawSize)).Padded() {
return api.SectorOffset{}, xerrors.Errorf("raw size doesn't match padded piece size")
}
// add initial piece + to a sector
nv, err := p.api.StateNetworkVersion(ctx, types.EmptyTSK)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting network version: %w", err)
}
synth := false // todo synthetic porep config
spt, err := miner.PreferredSealProofTypeFromWindowPoStType(nv, mi.WindowPoStProofType, synth)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting seal proof type: %w", err)
}
mid, err := address.IDFromAddress(maddr)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting miner ID: %w", err)
}
num, err := seal.AllocateSectorNumbers(ctx, p.api, p.db, maddr, 1, func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) {
if len(numbers) != 1 {
return false, xerrors.Errorf("expected one sector number")
}
n := numbers[0]
_, err := tx.Exec("INSERT INTO sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) VALUES ($1, $2, $3)", mid, n, spt)
if err != nil {
return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err)
}
dataHdrJson, err := json.Marshal(header)
if err != nil {
return false, xerrors.Errorf("json.Marshal(header): %w", err)
}
dealProposalJson, err := json.Marshal(piece.DealProposal)
if err != nil {
return false, xerrors.Errorf("json.Marshal(piece.DealProposal): %w", err)
}
_, err = tx.Exec(`INSERT INTO sectors_sdr_initial_pieces (sp_id,
sector_number,
piece_index,
piece_cid,
piece_size,
data_url,
data_headers,
data_raw_size,
data_delete_on_finalize,
f05_publish_cid,
f05_deal_id,
f05_deal_proposal,
f05_deal_start_epoch,
f05_deal_end_epoch) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`,
mid, n, 0,
piece.DealProposal.PieceCID, piece.DealProposal.PieceSize,
source.String(), dataHdrJson, rawSize, !piece.KeepUnsealed,
piece.PublishCid, piece.DealID, dealProposalJson, piece.DealSchedule.StartEpoch, piece.DealSchedule.EndEpoch)
if err != nil {
return false, xerrors.Errorf("inserting into sectors_sdr_initial_pieces: %w", err)
}
return true, nil
})
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("allocating sector numbers: %w", err)
}
if len(num) != 1 {
return api.SectorOffset{}, xerrors.Errorf("expected one sector number")
}
// After we insert the piece/sector_pipeline entries, the lpseal/poller will take it from here
return api.SectorOffset{
Sector: num[0],
Offset: 0,
}, nil
}