package main import ( "bufio" "encoding/csv" "fmt" "os" "sort" "strconv" "strings" "sync" "time" "github.com/docker/go-units" "github.com/fatih/color" cbor "github.com/ipfs/go-ipld-cbor" "github.com/urfave/cli/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/actors/adt" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" "github.com/filecoin-project/lotus/cli/spcli" cliutil "github.com/filecoin-project/lotus/cli/util" "github.com/filecoin-project/lotus/lib/result" "github.com/filecoin-project/lotus/lib/strle" "github.com/filecoin-project/lotus/lib/tablewriter" sealing "github.com/filecoin-project/lotus/storage/pipeline" ) const parallelSectorChecks = 300 var sectorsCmd = &cli.Command{ Name: "sectors", Usage: "interact with sector store", Subcommands: []*cli.Command{ spcli.SectorsStatusCmd(LMActorOrEnvGetter, getOnDiskInfo), sectorsListCmd, sectorsRefsCmd, sectorsUpdateCmd, sectorsPledgeCmd, sectorsNumbersCmd, spcli.SectorPreCommitsCmd(LMActorOrEnvGetter), spcli.SectorsCheckExpireCmd(LMActorOrEnvGetter), sectorsExpiredCmd, spcli.SectorsExtendCmd(LMActorOrEnvGetter), sectorsTerminateCmd, sectorsRemoveCmd, sectorsSnapUpCmd, sectorsSnapAbortCmd, sectorsStartSealCmd, sectorsSealDelayCmd, sectorsCapacityCollateralCmd, sectorsBatching, sectorsRefreshPieceMatchingCmd, spcli.SectorsCompactPartitionsCmd(LMActorOrEnvGetter), sectorsUnsealCmd, }, } func getOnDiskInfo(cctx *cli.Context, id abi.SectorNumber, onChainInfo bool) (api.SectorInfo, error) { minerApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return api.SectorInfo{}, err } defer closer() return minerApi.SectorsStatus(cctx.Context, id, onChainInfo) } var sectorsPledgeCmd = &cli.Command{ Name: "pledge", Usage: "store random data in a sector", Action: func(cctx *cli.Context) error { minerApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } defer closer() ctx := lcli.ReqContext(cctx) id, err := minerApi.PledgeSector(ctx) if err != nil { return err } fmt.Println("Created CC sector: ", id.Number) return nil }, } var sectorsListCmd = &cli.Command{ Name: "list", Usage: "List sectors", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "show-removed", Usage: "show removed sectors", Aliases: []string{"r"}, }, &cli.BoolFlag{ Name: "fast", Usage: "don't show on-chain info for better performance", Aliases: []string{"f"}, }, &cli.BoolFlag{ Name: "events", Usage: "display number of events the sector has received", Aliases: []string{"e"}, }, &cli.BoolFlag{ Name: "initial-pledge", Usage: "display initial pledge", Aliases: []string{"p"}, }, &cli.BoolFlag{ Name: "seal-time", Usage: "display how long it took for the sector to be sealed", Aliases: []string{"t"}, }, &cli.StringFlag{ Name: "states", Usage: "filter sectors by a comma-separated list of states", }, &cli.BoolFlag{ Name: "unproven", Usage: "only show sectors which aren't in the 'Proving' state", Aliases: []string{"u"}, }, &cli.Int64Flag{ Name: "check-parallelism", Usage: "number of parallel requests to make for checking sector states", Value: parallelSectorChecks, }, }, Subcommands: []*cli.Command{ sectorsListUpgradeBoundsCmd, }, Action: func(cctx *cli.Context) error { // http mode allows for parallel json decoding/encoding, which was a bottleneck here minerApi, closer, err := lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp) if err != nil { return err } defer closer() fullApi, closer2, err := lcli.GetFullNodeAPI(cctx) // TODO: consider storing full node address in config if err != nil { return err } defer closer2() ctx := lcli.ReqContext(cctx) var list []abi.SectorNumber showRemoved := cctx.Bool("show-removed") var states []api.SectorState if cctx.IsSet("states") && cctx.IsSet("unproven") { return xerrors.Errorf("only one of --states or --unproven can be specified at once") } if cctx.IsSet("states") { showRemoved = true sList := strings.Split(cctx.String("states"), ",") states = make([]api.SectorState, len(sList)) for i := range sList { states[i] = api.SectorState(sList[i]) } } if cctx.Bool("unproven") { for state := range sealing.ExistSectorStateList { if state == sealing.Proving || state == sealing.Available { continue } states = append(states, api.SectorState(state)) } } if len(states) == 0 { list, err = minerApi.SectorsList(ctx) } else { list, err = minerApi.SectorsListInStates(ctx, states) } if err != nil { return err } maddr, err := minerApi.ActorAddress(ctx) if err != nil { return err } head, err := fullApi.ChainHead(ctx) if err != nil { return err } activeSet, err := fullApi.StateMinerActiveSectors(ctx, maddr, head.Key()) if err != nil { return err } activeIDs := make(map[abi.SectorNumber]struct{}, len(activeSet)) for _, info := range activeSet { activeIDs[info.SectorNumber] = struct{}{} } sset, err := fullApi.StateMinerSectors(ctx, maddr, nil, head.Key()) if err != nil { return err } commitedIDs := make(map[abi.SectorNumber]struct{}, len(sset)) for _, info := range sset { commitedIDs[info.SectorNumber] = struct{}{} } sort.Slice(list, func(i, j int) bool { return list[i] < list[j] }) tw := tablewriter.New( tablewriter.Col("ID"), tablewriter.Col("State"), tablewriter.Col("OnChain"), tablewriter.Col("Active"), tablewriter.Col("Expiration"), tablewriter.Col("SealTime"), tablewriter.Col("Events"), tablewriter.Col("Deals"), tablewriter.Col("DealWeight"), tablewriter.Col("VerifiedPower"), tablewriter.Col("Pledge"), tablewriter.NewLineCol("Error"), tablewriter.NewLineCol("RecoveryTimeout")) fast := cctx.Bool("fast") throttle := make(chan struct{}, cctx.Int64("check-parallelism")) slist := make([]result.Result[api.SectorInfo], len(list)) var wg sync.WaitGroup for i, s := range list { throttle <- struct{}{} wg.Add(1) go func(i int, s abi.SectorNumber) { defer wg.Done() defer func() { <-throttle }() r := result.Wrap(minerApi.SectorsStatus(ctx, s, !fast)) if r.Error != nil { r.Value.SectorID = s } slist[i] = r }(i, s) } wg.Wait() for _, rsn := range slist { if rsn.Error != nil { tw.Write(map[string]interface{}{ "ID": rsn.Value.SectorID, "Error": err, }) continue } st := rsn.Value s := st.SectorID if !showRemoved && st.State == api.SectorState(sealing.Removed) { continue } _, inSSet := commitedIDs[s] _, inASet := activeIDs[s] const verifiedPowerGainMul = 9 dw, vp := .0, .0 estimate := (st.Expiration-st.Activation <= 0) || sealing.IsUpgradeState(sealing.SectorState(st.State)) if !estimate { rdw := big.Add(st.DealWeight, st.VerifiedDealWeight) dw = float64(big.Div(rdw, big.NewInt(int64(st.Expiration-st.Activation))).Uint64()) vp = float64(big.Div(big.Mul(st.VerifiedDealWeight, big.NewInt(verifiedPowerGainMul)), big.NewInt(int64(st.Expiration-st.Activation))).Uint64()) } else { for _, piece := range st.Pieces { if piece.DealInfo != nil { dw += float64(piece.Piece.Size) if piece.DealInfo.DealProposal != nil && piece.DealInfo.DealProposal.VerifiedDeal { vp += float64(piece.Piece.Size) * verifiedPowerGainMul } } } } var deals int for _, deal := range st.Deals { if deal != 0 { deals++ } } exp := st.Expiration if st.OnTime > 0 && st.OnTime < exp { exp = st.OnTime // Can be different when the sector was CC upgraded } m := map[string]interface{}{ "ID": s, "State": color.New(spcli.StateOrder[sealing.SectorState(st.State)].Col).Sprint(st.State), "OnChain": yesno(inSSet), "Active": yesno(inASet), } if deals > 0 { m["Deals"] = color.GreenString("%d", deals) } else { m["Deals"] = color.BlueString("CC") if st.ToUpgrade { m["Deals"] = color.CyanString("CC(upgrade)") } } if !fast { if !inSSet { m["Expiration"] = "n/a" } else { m["Expiration"] = cliutil.EpochTime(head.Height(), exp) if st.Early > 0 { m["RecoveryTimeout"] = color.YellowString(cliutil.EpochTime(head.Height(), st.Early)) } } if inSSet && cctx.Bool("initial-pledge") { m["Pledge"] = types.FIL(st.InitialPledge).Short() } } if !fast && deals > 0 { estWrap := func(s string) string { if !estimate { return s } return fmt.Sprintf("[%s]", s) } m["DealWeight"] = estWrap(units.BytesSize(dw)) if vp > 0 { m["VerifiedPower"] = estWrap(color.GreenString(units.BytesSize(vp))) } } if cctx.Bool("events") { var events int for _, sectorLog := range st.Log { if !strings.HasPrefix(sectorLog.Kind, "event") { continue } if sectorLog.Kind == "event;sealing.SectorRestart" { continue } events++ } pieces := len(st.Deals) switch { case events < 12+pieces: m["Events"] = color.GreenString("%d", events) case events < 20+pieces: m["Events"] = color.YellowString("%d", events) default: m["Events"] = color.RedString("%d", events) } } if cctx.Bool("seal-time") && len(st.Log) > 1 { start := time.Unix(int64(st.Log[0].Timestamp), 0) for _, sectorLog := range st.Log { if sectorLog.Kind == "event;sealing.SectorProving" { // todo: figure out a good way to not hardcode end := time.Unix(int64(sectorLog.Timestamp), 0) dur := end.Sub(start) switch { case dur < 12*time.Hour: m["SealTime"] = color.GreenString("%s", dur) case dur < 24*time.Hour: m["SealTime"] = color.YellowString("%s", dur) default: m["SealTime"] = color.RedString("%s", dur) } break } } } tw.Write(m) } return tw.Flush(os.Stdout) }, } var sectorsListUpgradeBoundsCmd = &cli.Command{ Name: "upgrade-bounds", Usage: "Output upgrade bounds for available sectors", Flags: []cli.Flag{ &cli.IntFlag{ Name: "buckets", Value: 25, }, &cli.BoolFlag{ Name: "csv", Usage: "output machine-readable values", }, &cli.BoolFlag{ Name: "deal-terms", Usage: "bucket by how many deal-sectors can start at a given expiration", }, }, Action: func(cctx *cli.Context) error { minerApi, closer, err := lcli.GetStorageMinerAPI(cctx, cliutil.StorageMinerUseHttp) if err != nil { return err } defer closer() fullApi, closer2, err := lcli.GetFullNodeAPI(cctx) if err != nil { return err } defer closer2() ctx := lcli.ReqContext(cctx) list, err := minerApi.SectorsListInStates(ctx, []api.SectorState{ api.SectorState(sealing.Available), }) if err != nil { return xerrors.Errorf("getting sector list: %w", err) } head, err := fullApi.ChainHead(ctx) if err != nil { return xerrors.Errorf("getting chain head: %w", err) } filter := bitfield.New() for _, s := range list { filter.Set(uint64(s)) } maddr, err := minerApi.ActorAddress(ctx) if err != nil { return err } sset, err := fullApi.StateMinerSectors(ctx, maddr, &filter, head.Key()) if err != nil { return err } if len(sset) == 0 { return nil } var minExpiration, maxExpiration abi.ChainEpoch for _, s := range sset { if s.Expiration < minExpiration || minExpiration == 0 { minExpiration = s.Expiration } if s.Expiration > maxExpiration { maxExpiration = s.Expiration } } buckets := cctx.Int("buckets") bucketSize := (maxExpiration - minExpiration) / abi.ChainEpoch(buckets) bucketCounts := make([]int, buckets+1) for b := range bucketCounts { bucketMin := minExpiration + abi.ChainEpoch(b)*bucketSize bucketMax := minExpiration + abi.ChainEpoch(b+1)*bucketSize if cctx.Bool("deal-terms") { bucketMax = bucketMax + policy.MarketDefaultAllocationTermBuffer } for _, s := range sset { isInBucket := s.Expiration >= bucketMin && s.Expiration < bucketMax if isInBucket { bucketCounts[b]++ } } } // Creating CSV writer writer := csv.NewWriter(os.Stdout) // Writing CSV headers err = writer.Write([]string{"Max Expiration in Bucket", "Sector Count"}) if err != nil { return xerrors.Errorf("writing csv headers: %w", err) } // Writing bucket details if cctx.Bool("csv") { for i := 0; i < buckets; i++ { maxExp := minExpiration + abi.ChainEpoch(i+1)*bucketSize timeStr := strconv.FormatInt(int64(maxExp), 10) err = writer.Write([]string{ timeStr, strconv.Itoa(bucketCounts[i]), }) if err != nil { return xerrors.Errorf("writing csv row: %w", err) } } // Flush to make sure all data is written to the underlying writer writer.Flush() if err := writer.Error(); err != nil { return xerrors.Errorf("flushing csv writer: %w", err) } return nil } tw := tablewriter.New( tablewriter.Col("Bucket Expiration"), tablewriter.Col("Sector Count"), tablewriter.Col("Bar"), ) var barCols = 40 var maxCount int for _, c := range bucketCounts { if c > maxCount { maxCount = c } } for i := 0; i < buckets; i++ { maxExp := minExpiration + abi.ChainEpoch(i+1)*bucketSize timeStr := cliutil.EpochTime(head.Height(), maxExp) tw.Write(map[string]interface{}{ "Bucket Expiration": timeStr, "Sector Count": color.YellowString("%d", bucketCounts[i]), "Bar": "[" + color.GreenString(strings.Repeat("|", bucketCounts[i]*barCols/maxCount)) + strings.Repeat(" ", barCols-bucketCounts[i]*barCols/maxCount) + "]", }) } return tw.Flush(os.Stdout) }, } var sectorsRefsCmd = &cli.Command{ Name: "refs", Usage: "List References to sectors", Action: func(cctx *cli.Context) error { nodeApi, closer, err := lcli.GetMarketsAPI(cctx) if err != nil { return err } defer closer() ctx := lcli.ReqContext(cctx) refs, err := nodeApi.SectorsRefs(ctx) if err != nil { return err } for name, refs := range refs { fmt.Printf("Block %s:\n", name) for _, ref := range refs { fmt.Printf("\t%d+%d %d bytes\n", ref.SectorID, ref.Offset, ref.Size) } } return nil }, } var sectorsTerminateCmd = &cli.Command{ Name: "terminate", Usage: "Terminate sector on-chain then remove (WARNING: This means losing power and collateral for the removed sector)", ArgsUsage: "", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "really-do-it", Usage: "pass this flag if you know what you are doing", }, }, Subcommands: []*cli.Command{ sectorsTerminateFlushCmd, sectorsTerminatePendingCmd, }, Action: func(cctx *cli.Context) error { minerApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } defer closer() ctx := lcli.ReqContext(cctx) if cctx.NArg() != 1 { return lcli.IncorrectNumArgs(cctx) } if !cctx.Bool("really-do-it") { return xerrors.Errorf("pass --really-do-it to confirm this action") } id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) if err != nil { return xerrors.Errorf("could not parse sector number: %w", err) } return minerApi.SectorTerminate(ctx, abi.SectorNumber(id)) }, } var sectorsTerminateFlushCmd = &cli.Command{ Name: "flush", Usage: "Send a terminate message if there are sectors queued for termination", Action: func(cctx *cli.Context) error { minerApi, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } defer closer() ctx := lcli.ReqContext(cctx) mcid, err := minerApi.SectorTerminateFlush(ctx) if err != nil { return err } if mcid == nil { return xerrors.New("no sectors were queued for termination") } fmt.Println(mcid) return nil }, } var sectorsTerminatePendingCmd = &cli.Command{ Name: "pending", Usage: "List sector numbers of sectors pending termination", Action: func(cctx *cli.Context) error { minerAPI, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } defer closer() api, nCloser, err := lcli.GetFullNodeAPI(cctx) if err != nil { return err } defer nCloser() ctx := lcli.ReqContext(cctx) pending, err := minerAPI.SectorTerminatePending(ctx) if err != nil { return err } maddr, err := minerAPI.ActorAddress(ctx) if err != nil { return err } dl, err := api.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) if err != nil { return xerrors.Errorf("getting proving deadline info failed: %w", err) } for _, id := range pending { loc, err := api.StateSectorPartition(ctx, maddr, id.Number, types.EmptyTSK) if err != nil { return xerrors.Errorf("finding sector partition: %w", err) } fmt.Print(id.Number) if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain) loc.Deadline == dl.Index || // not in current (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous fmt.Print(" (in proving window)") } fmt.Println() } return nil }, } var sectorsRemoveCmd = &cli.Command{ Name: "remove", Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector (use 'terminate' for lower penalty))", ArgsUsage: "", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "really-do-it", Usage: "pass this flag if you know what you are doing", }, }, Action: func(cctx *cli.Context) error { if !cctx.Bool("really-do-it") { return xerrors.Errorf("this is a command for advanced users, only use it if you are sure of what you are doing") } minerAPI, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } defer closer() ctx := lcli.ReqContext(cctx) if cctx.NArg() != 1 { return lcli.IncorrectNumArgs(cctx) } id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) if err != nil { return xerrors.Errorf("could not parse sector number: %w", err) } // Check if the sector exists _, err = minerAPI.SectorsStatus(ctx, abi.SectorNumber(id), false) if err != nil { return xerrors.Errorf("sectorID %d has not been created yet: %w", id, err) } return minerAPI.SectorRemove(ctx, abi.SectorNumber(id)) }, } var sectorsSnapUpCmd = &cli.Command{ Name: "snap-up", Usage: "Mark a committed capacity sector to be filled with deals", ArgsUsage: "", Action: func(cctx *cli.Context) error { if cctx.NArg() != 1 { return lcli.IncorrectNumArgs(cctx) } minerAPI, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } defer closer() api, nCloser, err := lcli.GetFullNodeAPI(cctx) if err != nil { return err } defer nCloser() ctx := lcli.ReqContext(cctx) nv, err := api.StateNetworkVersion(ctx, types.EmptyTSK) if err != nil { return xerrors.Errorf("failed to get network version: %w", err) } if nv < network.Version15 { return xerrors.Errorf("snap deals upgrades enabled in network v15") } id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) if err != nil { return xerrors.Errorf("could not parse sector number: %w", err) } return minerAPI.SectorMarkForUpgrade(ctx, abi.SectorNumber(id), true) }, } var sectorsSnapAbortCmd = &cli.Command{ Name: "abort-upgrade", Usage: "Abort the attempted (SnapDeals) upgrade of a CC sector, reverting it to as before", ArgsUsage: "", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "really-do-it", Usage: "pass this flag if you know what you are doing", }, }, Action: func(cctx *cli.Context) error { if cctx.NArg() != 1 { return lcli.ShowHelp(cctx, xerrors.Errorf("must pass sector number")) } really := cctx.Bool("really-do-it") if !really { //nolint:golint return fmt.Errorf("--really-do-it must be specified for this action to have an effect; you have been warned") } minerAPI, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } defer closer() ctx := lcli.ReqContext(cctx) id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) if err != nil { return xerrors.Errorf("could not parse sector number: %w", err) } return minerAPI.SectorAbortUpgrade(ctx, abi.SectorNumber(id)) }, } var sectorsStartSealCmd = &cli.Command{ Name: "seal", Usage: "Manually start sealing a sector (filling any unused space with junk)", ArgsUsage: "", Action: func(cctx *cli.Context) error { minerAPI, closer, err := lcli.GetStorageMinerAPI(cctx) if err != nil { return err } defer closer() ctx := lcli.ReqContext(cctx) if cctx.NArg() != 1 { return lcli.IncorrectNumArgs(cctx) } id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) if err != nil { return xerrors.Errorf("could not parse sector number: %w", err) } return minerAPI.SectorStartSealing(ctx, abi.SectorNumber(id)) }, } var sectorsSealDelayCmd = &cli.Command{ Name: "set-seal-delay", Usage: "Set the time (in minutes) that a new sector waits for deals before sealing starts", ArgsUsage: "