lotus/cmd/lotus-shed/market.go
Łukasz Magiera 6a0f16b084 feat: sealing: Support nv22 DDO features in the sealing pipeline (#11226)
* Initial work supporting DDO pieces in lotus-miner

* sealing: Update pipeline input to operate on UniversalPiece

* sealing: Update pipeline checks/sealing states to operate on UniversalPiece

* sealing: Make pipeline build with UniversalPiece

* move PieceDealInfo out of api

* make gen

* make sealing pipeline unit tests pass

* fix itest ensemble build

* don't panic in SectorsStatus with deals

* stop linter from complaining about checkPieces

* fix sector import tests

* mod tidy

* sealing: Add logic for (pre)committing DDO sectors

* sealing: state-types with method defs

* DDO non-snap pipeline works(?), DDO Itests

* DDO support in snapdeals pipeline

* make gen

* update actor bundles

* update the gst market fix

* fix: chain: use PreCommitSectorsBatch2 when setting up genesis

* some bug fixes

* integration working changes

* update actor bundles

* Make TestOnboardRawPieceSnap pass

* Appease the linter

* Make deadlines test pass with v12 actors

* Update go-state-types, abstract market DealState

* make gen

* mod tidy, lint fixes

* Fix some more tests

* Bump version in master

Bump version in master

* Make gen

Make gen

* fix sender

* fix: lotus-provider: Fix winning PoSt

* fix: sql Scan cannot write to an object

* Actually show miner-addrs in info-log

Actually show miner-addrs in lotus-provider info-log

* [WIP] feat: Add nv22 skeleton

Addition of Network Version 22 skeleton

* update FFI

* ddo is now nv22

* make gen

* temp actor bundle with ddo

* use working go-state-types

* gst with v13 market migration

* update bundle, builtin.MethodsMiner.ProveCommitSectors2 -> 3

* actually working v13 migration, v13 migration itest

* Address review

* sealing: Correct DDO snap pledge math

* itests: Mixed ddo itest

* pipeline: Fix sectorWeight

* sealing: convert market deals into PAMs in mixed sectors

* sealing: make market to ddo conversion work

* fix lint

* update gst

* Update actors and GST to lastest integ branch

* commit batcher: Update ProveCommitSectors3Params builder logic

* make gen

* use builtin-actors master

* ddo: address review

* itests: Add commd assertions to ddo tests

* make gen

* gst with fixed types

* config knobs for RequireActivationSuccess

* storage: Drop obsolete flaky tasts

---------

Co-authored-by: Jennifer Wang <jiayingw703@gmail.com>
Co-authored-by: Aayush <arajasek94@gmail.com>
Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai>
Co-authored-by: Phi <orjan.roren@gmail.com>
Co-authored-by: Andrew Jackson (Ajax) <snadrus@gmail.com>
Co-authored-by: TippyFlits <james.bluett@protocol.ai>
2024-03-22 07:00:28 +01:00

429 lines
11 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
levelds "github.com/ipfs/go-ds-leveldb"
ipldcbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
market11 "github.com/filecoin-project/go-state-types/builtin/v11/market"
"github.com/filecoin-project/go-state-types/builtin/v11/util/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/backupds"
"github.com/filecoin-project/lotus/node/repo"
)
var marketCmd = &cli.Command{
Name: "market",
Usage: "Interact with the market actor",
Flags: []cli.Flag{},
Subcommands: []*cli.Command{
marketDealFeesCmd,
marketExportDatastoreCmd,
marketImportDatastoreCmd,
marketDealsTotalStorageCmd,
marketCronStateCmd,
},
}
var marketCronStateCmd = &cli.Command{
Name: "cron-state",
Usage: "Display summary of all deal operation state scheduled for cron processing",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "tipset",
Usage: "specify tipset to call method on (pass comma separated array of cids)",
},
},
Action: func(cctx *cli.Context) error {
ctx := lcli.ReqContext(cctx)
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ts, err := lcli.LoadTipSet(ctx, cctx, api)
if err != nil {
return err
}
bs := ReadOnlyAPIBlockstore{api}
adtStore := adt.WrapStore(ctx, ipldcbor.NewCborStore(&bs))
mAct, err := api.StateGetActor(ctx, builtin.StorageMarketActorAddr, ts.Key())
if err != nil {
return err
}
var mSt market11.State
err = adtStore.Get(ctx, mAct.Head, &mSt)
if err != nil {
return err
}
dealOpsEpochSet, err := adt.AsMap(adtStore, mSt.DealOpsByEpoch, builtin.DefaultHamtBitwidth)
if err != nil {
return err
}
dealOpsRecord := make(map[uint64][]abi.DealID)
if err := dealOpsEpochSet.ForEach(&cbg.Deferred{}, func(eKey string) error {
e, err := abi.ParseUIntKey(eKey)
if err != nil {
return err
}
dealOpsRecord[e] = make([]abi.DealID, 0)
return nil
}); err != nil {
return err
}
dealOpsMultiMap, err := market11.AsSetMultimap(adtStore, mSt.DealOpsByEpoch, builtin.DefaultHamtBitwidth, builtin.DefaultHamtBitwidth)
if err != nil {
return err
}
for e := range dealOpsRecord {
e := e
err := dealOpsMultiMap.ForEach(abi.ChainEpoch(e), func(id abi.DealID) error {
dealOpsRecord[e] = append(dealOpsRecord[e], id)
return nil
})
if err != nil {
return err
}
}
jsonStr, err := json.Marshal(dealOpsRecord)
if err != nil {
return err
}
fmt.Printf("%s\n", jsonStr)
return nil
},
}
var marketDealFeesCmd = &cli.Command{
Name: "get-deal-fees",
Usage: "View the storage fees associated with a particular deal or storage provider",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "provider",
Usage: "provider whose outstanding fees you'd like to calculate",
},
&cli.IntFlag{
Name: "dealId",
Usage: "deal whose outstanding fees you'd like to calculate",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
ts, err := lcli.LoadTipSet(ctx, cctx, api)
if err != nil {
return err
}
ht := ts.Height()
if cctx.IsSet("provider") {
p, err := address.NewFromString(cctx.String("provider"))
if err != nil {
return fmt.Errorf("failed to parse provider: %w", err)
}
deals, err := api.StateMarketDeals(ctx, ts.Key())
if err != nil {
return err
}
ef := big.Zero()
pf := big.Zero()
count := 0
for _, deal := range deals {
if deal.Proposal.Provider == p {
e, p := market.GetDealFees(deal.Proposal, ht)
ef = big.Add(ef, e)
pf = big.Add(pf, p)
count++
}
}
fmt.Println("Total deals: ", count)
fmt.Println("Total earned fees: ", ef)
fmt.Println("Total pending fees: ", pf)
fmt.Println("Total fees: ", big.Add(ef, pf))
return nil
}
if dealid := cctx.Int("dealId"); dealid != 0 {
deal, err := api.StateMarketStorageDeal(ctx, abi.DealID(dealid), ts.Key())
if err != nil {
return err
}
ef, pf := market.GetDealFees(deal.Proposal, ht)
fmt.Println("Earned fees: ", ef)
fmt.Println("Pending fees: ", pf)
fmt.Println("Total fees: ", big.Add(ef, pf))
return nil
}
return xerrors.New("must provide either --provider or --dealId flag")
},
}
const mktsMetadataNamespace = "metadata"
var marketExportDatastoreCmd = &cli.Command{
Name: "export-datastore",
Description: "export markets datastore key/values to a file",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Usage: "path to the repo",
},
&cli.StringFlag{
Name: "backup-dir",
Usage: "path to the backup directory",
},
},
Action: func(cctx *cli.Context) error {
logging.SetLogLevel("badger", "ERROR") // nolint:errcheck
// If the backup dir is not specified, just use the OS temp dir
backupDir := cctx.String("backup-dir")
if backupDir == "" {
backupDir = os.TempDir()
}
// Open the repo at the repo path
repoPath := cctx.String("repo")
lr, err := openLockedRepo(repoPath)
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
// Open the metadata datastore on the repo
ds, err := lr.Datastore(cctx.Context, datastore.NewKey(mktsMetadataNamespace).String())
if err != nil {
return xerrors.Errorf("opening datastore %s on repo %s: %w", mktsMetadataNamespace, repoPath, err)
}
// Create a tmp datastore that we'll add the exported key / values to
// and then backup
backupDsDir := path.Join(backupDir, "markets-backup-datastore")
if err := os.MkdirAll(backupDsDir, 0775); err != nil { //nolint:gosec
return xerrors.Errorf("creating tmp datastore directory: %w", err)
}
defer os.RemoveAll(backupDsDir) //nolint:errcheck
backupDs, err := levelds.NewDatastore(backupDsDir, &levelds.Options{
Compression: ldbopts.NoCompression,
NoSync: false,
Strict: ldbopts.StrictAll,
ReadOnly: false,
})
if err != nil {
return xerrors.Errorf("opening backup datastore at %s: %w", backupDir, err)
}
// Export the key / values
prefixes := []string{
"/deals/provider",
"/retrievals/provider",
"/storagemarket",
}
for _, prefix := range prefixes {
err := exportPrefix(prefix, ds, backupDs)
if err != nil {
return err
}
}
// Wrap the datastore in a backup datastore
bds, err := backupds.Wrap(backupDs, "")
if err != nil {
return xerrors.Errorf("opening backupds: %w", err)
}
// Create a file for the backup
fpath := path.Join(backupDir, "markets.datastore.backup")
out, err := os.OpenFile(fpath, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return xerrors.Errorf("opening backup file %s: %w", fpath, err)
}
// Write the backup to the file
if err := bds.Backup(context.Background(), out); err != nil {
if cerr := out.Close(); cerr != nil {
log.Errorw("error closing backup file while handling backup error", "closeErr", cerr, "backupErr", err)
}
return xerrors.Errorf("backup error: %w", err)
}
if err := out.Close(); err != nil {
return xerrors.Errorf("closing backup file: %w", err)
}
fmt.Println("Wrote backup file to " + fpath)
return nil
},
}
func exportPrefix(prefix string, ds datastore.Batching, backupDs datastore.Batching) error {
q, err := ds.Query(context.Background(), dsq.Query{
Prefix: prefix,
})
if err != nil {
return xerrors.Errorf("datastore query: %w", err)
}
defer q.Close() //nolint:errcheck
for res := range q.Next() {
fmt.Println("Exporting key " + res.Key)
err := backupDs.Put(context.Background(), datastore.NewKey(res.Key), res.Value)
if err != nil {
return xerrors.Errorf("putting %s to backup datastore: %w", res.Key, err)
}
}
return nil
}
var marketImportDatastoreCmd = &cli.Command{
Name: "import-datastore",
Description: "import markets datastore key/values from a backup file",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Usage: "path to the repo",
},
&cli.StringFlag{
Name: "backup-path",
Usage: "path to the backup file",
Required: true,
},
},
Action: func(cctx *cli.Context) error {
logging.SetLogLevel("badger", "ERROR") // nolint:errcheck
backupPath := cctx.String("backup-path")
// Open the repo at the repo path
lr, err := openLockedRepo(cctx.String("repo"))
if err != nil {
return err
}
defer lr.Close() //nolint:errcheck
// Open the metadata datastore on the repo
repoDs, err := lr.Datastore(cctx.Context, datastore.NewKey(mktsMetadataNamespace).String())
if err != nil {
return err
}
r, err := os.Open(backupPath)
if err != nil {
return xerrors.Errorf("opening backup path %s: %w", backupPath, err)
}
fmt.Println("Importing from backup file " + backupPath)
err = backupds.RestoreInto(r, repoDs)
if err != nil {
return xerrors.Errorf("restoring backup from path %s: %w", backupPath, err)
}
fmt.Println("Completed importing from backup file " + backupPath)
return nil
},
}
var marketDealsTotalStorageCmd = &cli.Command{
Name: "get-deals-total-storage",
Usage: "View the total storage available in all active market deals",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
deals, err := api.StateMarketDeals(ctx, types.EmptyTSK)
if err != nil {
return err
}
total := big.Zero()
count := 0
for _, deal := range deals {
if market.IsDealActive(deal.State.Iface()) {
dealStorage := big.NewIntUnsigned(uint64(deal.Proposal.PieceSize))
total = big.Add(total, dealStorage)
count++
}
}
fmt.Println("Total deals: ", count)
fmt.Println("Total storage: ", total)
return nil
},
}
func openLockedRepo(path string) (repo.LockedRepo, error) {
// Open the repo at the repo path
rpo, err := repo.NewFS(path)
if err != nil {
return nil, xerrors.Errorf("could not open repo %s: %w", path, err)
}
// Make sure the repo exists
exists, err := rpo.Exists()
if err != nil {
return nil, xerrors.Errorf("checking repo %s exists: %w", path, err)
}
if !exists {
return nil, xerrors.Errorf("repo does not exist: %s", path)
}
// Lock the repo
lr, err := rpo.Lock(repo.StorageMiner)
if err != nil {
return nil, xerrors.Errorf("locking repo %s: %w", path, err)
}
return lr, nil
}