Merge pull request #10169 from filecoin-project/feat/shed-read-sector
feat: shed: Add a tool to read data from sectors
This commit is contained in:
commit
8f1c23296e
@ -18,6 +18,7 @@ import (
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/urfave/cli/v2"
|
||||
"golang.org/x/xerrors"
|
||||
"gopkg.in/cheggaaa/pb.v1"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
@ -31,6 +32,11 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/lib/parmap"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fr32"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
var sectorsCmd = &cli.Command{
|
||||
@ -42,6 +48,8 @@ var sectorsCmd = &cli.Command{
|
||||
terminateSectorPenaltyEstimationCmd,
|
||||
visAllocatedSectorsCmd,
|
||||
dumpRLESectorCmd,
|
||||
sectorReadCmd,
|
||||
sectorDeleteCmd,
|
||||
},
|
||||
}
|
||||
|
||||
@ -563,3 +571,219 @@ func rleToPng(rleBytes []byte) ([]byte, error) {
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
|
||||
var sectorReadCmd = &cli.Command{
|
||||
Name: "read",
|
||||
Usage: "read data from a sector into stdout",
|
||||
ArgsUsage: "[sector num] [padded length] [padded offset]",
|
||||
Description: `Read data from a sector.
|
||||
|
||||
TIP: to get sectornum/len/offset for a piece you can use 'boostd pieces piece-info [cid]'
|
||||
|
||||
fr32 padding is removed from the output.`,
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if cctx.NArg() != 3 {
|
||||
return xerrors.Errorf("must pass sectornum/len/offset")
|
||||
}
|
||||
|
||||
sectorNum, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing sector number: %w", err)
|
||||
}
|
||||
|
||||
length, err := strconv.ParseUint(cctx.Args().Get(1), 10, 64)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing length: %w", err)
|
||||
}
|
||||
|
||||
offset, err := strconv.ParseUint(cctx.Args().Get(2), 10, 64)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing offset: %w", err)
|
||||
}
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maddr, err := api.ActorAddress(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting miner actor address: %w", err)
|
||||
}
|
||||
|
||||
mid, err := address.IDFromAddress(maddr)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting miner id: %w", err)
|
||||
}
|
||||
|
||||
sid := abi.SectorID{
|
||||
Miner: abi.ActorID(mid),
|
||||
Number: abi.SectorNumber(sectorNum),
|
||||
}
|
||||
|
||||
si, err := api.SectorsStatus(ctx, sid.Number, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting sector status: %w", err)
|
||||
}
|
||||
|
||||
sref := storiface.SectorRef{
|
||||
ID: sid,
|
||||
ProofType: si.SealProof,
|
||||
}
|
||||
|
||||
defer closer()
|
||||
|
||||
// Setup remote sector store
|
||||
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("could not get api info: %w", err)
|
||||
}
|
||||
|
||||
localStore, err := paths.NewLocal(ctx, &emptyLocalStorage{}, api, []string{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
remote := paths.NewRemote(localStore, api, sminfo.AuthHeader(), 10,
|
||||
&paths.DefaultPartialFileHandler{})
|
||||
|
||||
readStarter, err := remote.Reader(ctx, sref, abi.PaddedPieceSize(offset), abi.PaddedPieceSize(length))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting reader: %w", err)
|
||||
}
|
||||
|
||||
rd, err := readStarter(0)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("starting reader: %w", err)
|
||||
}
|
||||
|
||||
upr, err := fr32.NewUnpadReaderBuf(rd, abi.PaddedPieceSize(length), make([]byte, 1<<20))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("creating unpadded reader: %w", err)
|
||||
}
|
||||
|
||||
l := int64(abi.PaddedPieceSize(length).Unpadded())
|
||||
|
||||
bar := pb.New64(l)
|
||||
br := bar.NewProxyReader(upr)
|
||||
bar.ShowTimeLeft = true
|
||||
bar.ShowPercent = true
|
||||
bar.ShowSpeed = true
|
||||
bar.Units = pb.U_BYTES
|
||||
bar.Output = os.Stderr
|
||||
bar.Start()
|
||||
|
||||
_, err = io.CopyN(os.Stdout, br, l)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("reading data: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var sectorDeleteCmd = &cli.Command{
|
||||
Name: "delete",
|
||||
Usage: "delete a sector file from sector storage",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "really-do-it",
|
||||
},
|
||||
},
|
||||
ArgsUsage: "[sector num] [file type]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if cctx.NArg() != 2 {
|
||||
return xerrors.Errorf("must pass sectornum/filetype")
|
||||
}
|
||||
|
||||
sectorNum, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("parsing sector number: %w", err)
|
||||
}
|
||||
|
||||
var ft storiface.SectorFileType
|
||||
switch cctx.Args().Get(1) {
|
||||
case "cache":
|
||||
ft = storiface.FTCache
|
||||
case "sealed":
|
||||
ft = storiface.FTSealed
|
||||
case "unsealed":
|
||||
ft = storiface.FTUnsealed
|
||||
case "update-cache":
|
||||
ft = storiface.FTUpdateCache
|
||||
case "update":
|
||||
ft = storiface.FTUpdate
|
||||
default:
|
||||
return xerrors.Errorf("invalid file type")
|
||||
}
|
||||
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
api, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
|
||||
maddr, err := api.ActorAddress(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting miner actor address: %w", err)
|
||||
}
|
||||
|
||||
mid, err := address.IDFromAddress(maddr)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting miner id: %w", err)
|
||||
}
|
||||
|
||||
sid := abi.SectorID{
|
||||
Miner: abi.ActorID(mid),
|
||||
Number: abi.SectorNumber(sectorNum),
|
||||
}
|
||||
|
||||
// get remote store
|
||||
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("could not get api info: %w", err)
|
||||
}
|
||||
|
||||
localStore, err := paths.NewLocal(ctx, &emptyLocalStorage{}, api, []string{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !cctx.Bool("really-do-it") {
|
||||
return xerrors.Errorf("pass --really-do-it to actually perform the deletion")
|
||||
}
|
||||
|
||||
remote := paths.NewRemote(localStore, api, sminfo.AuthHeader(), 10,
|
||||
&paths.DefaultPartialFileHandler{})
|
||||
|
||||
err = remote.Remove(ctx, sid, ft, true, nil)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("removing sector: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
type emptyLocalStorage struct {
|
||||
}
|
||||
|
||||
func (e *emptyLocalStorage) GetStorage() (storiface.StorageConfig, error) {
|
||||
return storiface.StorageConfig{}, nil
|
||||
}
|
||||
|
||||
func (e *emptyLocalStorage) SetStorage(f func(*storiface.StorageConfig)) error {
|
||||
panic("don't call")
|
||||
}
|
||||
|
||||
func (e *emptyLocalStorage) Stat(path string) (fsutil.FsStat, error) {
|
||||
panic("don't call")
|
||||
}
|
||||
|
||||
func (e *emptyLocalStorage) DiskUsage(path string) (int64, error) {
|
||||
panic("don't call")
|
||||
}
|
||||
|
||||
var _ paths.LocalStorage = &emptyLocalStorage{}
|
||||
|
Loading…
Reference in New Issue
Block a user