lpseal: Sector number allocator, pipeline start cli
This commit is contained in:
parent
67fb3bd27f
commit
e89fff2308
@ -292,3 +292,32 @@ func GetConfig(cctx *cli.Context, db *harmonydb.DB) (*config.LotusProviderConfig
|
||||
// validate the config. Because of layering, we must validate @ startup.
|
||||
return lp, nil
|
||||
}
|
||||
|
||||
func GetDepsCLI(ctx context.Context, cctx *cli.Context) (*Deps, error) {
|
||||
db, err := MakeDB(cctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg, err := GetConfig(cctx, db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
fullCloser()
|
||||
}
|
||||
}()
|
||||
|
||||
return &Deps{
|
||||
Cfg: cfg,
|
||||
DB: db,
|
||||
Full: full,
|
||||
}, nil
|
||||
}
|
||||
|
@ -47,6 +47,7 @@ func main() {
|
||||
configCmd,
|
||||
testCmd,
|
||||
webCmd,
|
||||
pipelineCmd,
|
||||
//backupCmd,
|
||||
//lcli.WithCategory("chain", actorCmd),
|
||||
//lcli.WithCategory("storage", sectorsCmd),
|
||||
|
144
cmd/lotus-provider/pipeline.go
Normal file
144
cmd/lotus-provider/pipeline.go
Normal file
@ -0,0 +1,144 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-commp-utils/zerocomm"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/cmd/lotus-provider/deps"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"github.com/filecoin-project/lotus/provider/lpseal"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var pipelineCmd = &cli.Command{
|
||||
Name: "pipeline",
|
||||
Usage: "Manage the sealing pipeline",
|
||||
Subcommands: []*cli.Command{
|
||||
pipelineStartCmd,
|
||||
},
|
||||
}
|
||||
|
||||
var pipelineStartCmd = &cli.Command{
|
||||
Name: "start",
|
||||
Usage: "Start new sealing operations manually",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "actor",
|
||||
Usage: "Specify actor address to start sealing sectors for",
|
||||
Required: true,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "now",
|
||||
Usage: "Start sealing sectors for all actors now (not on schedule)",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "cc",
|
||||
Usage: "Start sealing new CC sectors",
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "count",
|
||||
Usage: "Number of sectors to start",
|
||||
Value: 1,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "synthetic",
|
||||
Usage: "Use synthetic PoRep",
|
||||
Value: true,
|
||||
},
|
||||
&cli.StringSliceFlag{ // todo consider moving layers top level
|
||||
Name: "layers",
|
||||
Usage: "list of layers to be interpreted (atop defaults). Default: base",
|
||||
Value: cli.NewStringSlice("base"),
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if !cctx.Bool("now") {
|
||||
return xerrors.Errorf("schedule not implemented, use --now")
|
||||
}
|
||||
if !cctx.IsSet("actor") {
|
||||
return cli.ShowCommandHelp(cctx, "start")
|
||||
}
|
||||
|
||||
act, err := address.NewFromString(cctx.String("actor"))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing --actor: %w", err)
|
||||
}
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
dep, err := deps.GetDepsCLI(ctx, cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
create table sectors_sdr_pipeline (
|
||||
sp_id bigint not null,
|
||||
sector_number bigint not null,
|
||||
|
||||
-- at request time
|
||||
create_time timestamp not null,
|
||||
reg_seal_proof int not null,
|
||||
comm_d_cid text not null,
|
||||
|
||||
[... other not relevant fields]
|
||||
*/
|
||||
|
||||
mid, err := address.IDFromAddress(act)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting miner id: %w", err)
|
||||
}
|
||||
|
||||
mi, err := dep.Full.StateMinerInfo(ctx, act, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting miner info: %w", err)
|
||||
}
|
||||
|
||||
nv, err := dep.Full.StateNetworkVersion(ctx, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting network version: %w", err)
|
||||
}
|
||||
|
||||
wpt := mi.WindowPoStProofType
|
||||
spt, err := miner.PreferredSealProofTypeFromWindowPoStType(nv, wpt, cctx.Bool("synthetic"))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting seal proof type: %w", err)
|
||||
}
|
||||
|
||||
ssize, err := spt.SectorSize()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting sector size: %w", err)
|
||||
}
|
||||
|
||||
var commd cid.Cid
|
||||
if cctx.Bool("cc") {
|
||||
commd = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
|
||||
} else {
|
||||
return xerrors.Errorf("only CC sectors supported for now")
|
||||
}
|
||||
|
||||
num, err := lpseal.AllocateSectorNumbers(ctx, dep.Full, dep.DB, act, cctx.Int("count"), func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) {
|
||||
for _, n := range numbers {
|
||||
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof, comm_d_cid) values ($1, $2, $3, $4)", mid, n, spt, commd.String())
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err)
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("allocating sector numbers: %w", err)
|
||||
}
|
||||
|
||||
for _, number := range num {
|
||||
fmt.Println(number)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
@ -6,8 +6,9 @@ create table sectors_sdr_pipeline (
|
||||
sector_number bigint not null,
|
||||
|
||||
-- at request time
|
||||
create_time timestamp not null,
|
||||
create_time timestamp not null default current_timestamp,
|
||||
reg_seal_proof int not null,
|
||||
comm_d_cid text not null,
|
||||
|
||||
-- sdr
|
||||
ticket_epoch bigint,
|
||||
@ -17,7 +18,7 @@ create table sectors_sdr_pipeline (
|
||||
after_sdr bool not null default false,
|
||||
|
||||
-- tree D
|
||||
tree_d_cid text, -- commd from treeD compute
|
||||
tree_d_cid text, -- commd from treeD compute, should match comm_d_cid
|
||||
|
||||
task_id_tree_d bigint,
|
||||
after_tree_d bool not null default false,
|
||||
@ -84,3 +85,8 @@ create table sectors_sdr_initial_pieces (
|
||||
|
||||
primary key (sp_id, sector_number, piece_index)
|
||||
);
|
||||
|
||||
create table sectors_allocated_numbers (
|
||||
sp_id bigint not null primary key,
|
||||
allocated jsonb not null
|
||||
);
|
119
provider/lpseal/sector_num_alloc.go
Normal file
119
provider/lpseal/sector_num_alloc.go
Normal file
@ -0,0 +1,119 @@
|
||||
package lpseal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
type AllocAPI interface {
|
||||
StateMinerAllocated(context.Context, address.Address, types.TipSetKey) (*bitfield.BitField, error)
|
||||
}
|
||||
|
||||
func AllocateSectorNumbers(ctx context.Context, a AllocAPI, db *harmonydb.DB, maddr address.Address, count int, txcb ...func(*harmonydb.Tx, []abi.SectorNumber) (bool, error)) ([]abi.SectorNumber, error) {
|
||||
chainAlloc, err := a.StateMinerAllocated(ctx, maddr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting on-chain allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
mid, err := address.IDFromAddress(maddr)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting miner id: %w", err)
|
||||
}
|
||||
|
||||
var res []abi.SectorNumber
|
||||
|
||||
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
|
||||
// query from db, if exists unmarsal to bitfield
|
||||
var dbAllocated bitfield.BitField
|
||||
var rawJson []byte
|
||||
|
||||
err = tx.QueryRow("select COALESCE(allocated, '[0]') from sectors_allocated_numbers sa FULL OUTER JOIN (SELECT 1) AS d ON true where sp_id = $1 or sp_id is null", mid).Scan(&rawJson)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("querying allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
if rawJson != nil {
|
||||
err = dbAllocated.UnmarshalJSON(rawJson)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("unmarshaling allocated sector numbers: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := dbAllocated.UnmarshalJSON(rawJson); err != nil {
|
||||
return false, xerrors.Errorf("unmarshaling allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
merged, err := bitfield.MergeBitFields(*chainAlloc, dbAllocated)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("merging allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
allAssignable, err := bitfield.NewFromIter(&rlepluslazy.RunSliceIterator{Runs: []rlepluslazy.Run{
|
||||
{
|
||||
Val: true,
|
||||
Len: abi.MaxSectorNumber,
|
||||
},
|
||||
}})
|
||||
|
||||
inverted, err := bitfield.SubtractBitField(allAssignable, merged)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("subtracting allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
toAlloc, err := inverted.Slice(0, uint64(count))
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("getting slice of allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
err = toAlloc.ForEach(func(u uint64) error {
|
||||
res = append(res, abi.SectorNumber(u))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("iterating allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
toPersist, err := bitfield.MergeBitFields(merged, toAlloc)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("merging allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
rawJson, err = toPersist.MarshalJSON()
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("marshaling allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
_, err = tx.Exec("insert into sectors_allocated_numbers(sp_id, allocated) values($1, $2) on conflict(sp_id) do update set allocated = $2", mid, rawJson)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("persisting allocated sector numbers: %w", err)
|
||||
}
|
||||
|
||||
for i, f := range txcb {
|
||||
commit, err = f(tx, res)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("executing tx callback %d: %w", i, err)
|
||||
}
|
||||
|
||||
if !commit {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("allocating sector numbers: %w", err)
|
||||
}
|
||||
if !comm {
|
||||
return nil, xerrors.Errorf("allocating sector numbers: commit failed")
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user