Merge pull request #8715 from filecoin-project/feat/miner-commp-cmd

feat: miner cli: sealing data-cid command
This commit is contained in:
Łukasz Magiera 2022-05-24 18:09:26 +02:00 committed by GitHub
commit 6f2c8d6f5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 190 additions and 8 deletions

View File

@ -4,20 +4,27 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"math" "math"
"net/http"
"os" "os"
"sort" "sort"
"strings" "strings"
"text/tabwriter" "text/tabwriter"
"time" "time"
"github.com/dustin/go-humanize"
"github.com/fatih/color" "github.com/fatih/color"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/lib/httpreader"
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
@ -31,6 +38,7 @@ var sealingCmd = &cli.Command{
workersCmd(true), workersCmd(true),
sealingSchedDiagCmd, sealingSchedDiagCmd,
sealingAbortCmd, sealingAbortCmd,
sealingDataCidCmd,
}, },
} }
@ -349,3 +357,94 @@ var sealingAbortCmd = &cli.Command{
return nodeApi.SealingAbort(ctx, job.ID) return nodeApi.SealingAbort(ctx, job.ID)
}, },
} }
var sealingDataCidCmd = &cli.Command{
Name: "data-cid",
Usage: "Compute data CID using workers",
ArgsUsage: "[file/url] <padded piece size>",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "file-size",
Usage: "real file size",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 1 || cctx.Args().Len() > 2 {
return xerrors.Errorf("expected 1 or 2 arguments")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
var r io.Reader
sz := cctx.Uint64("file-size")
if strings.HasPrefix(cctx.Args().First(), "http://") || strings.HasPrefix(cctx.Args().First(), "https://") {
r = &httpreader.HttpReader{
URL: cctx.Args().First(),
}
if !cctx.IsSet("file-size") {
resp, err := http.Head(cctx.Args().First())
if err != nil {
return xerrors.Errorf("http head: %w", err)
}
if resp.ContentLength < 0 {
return xerrors.Errorf("head response didn't contain content length; specify --file-size")
}
sz = uint64(resp.ContentLength)
}
} else {
p, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding path: %w", err)
}
f, err := os.OpenFile(p, os.O_RDONLY, 0)
if err != nil {
return xerrors.Errorf("opening source file: %w", err)
}
if !cctx.IsSet("file-size") {
st, err := f.Stat()
if err != nil {
return xerrors.Errorf("stat: %w", err)
}
sz = uint64(st.Size())
}
r = f
}
var psize abi.PaddedPieceSize
if cctx.Args().Len() == 2 {
rps, err := humanize.ParseBytes(cctx.Args().Get(1))
if err != nil {
return xerrors.Errorf("parsing piece size: %w", err)
}
psize = abi.PaddedPieceSize(rps)
if err := psize.Validate(); err != nil {
return xerrors.Errorf("checking piece size: %w", err)
}
if sz > uint64(psize.Unpadded()) {
return xerrors.Errorf("file larger than the piece")
}
} else {
psize = padreader.PaddedSize(sz).Padded()
}
pc, err := nodeApi.ComputeDataCid(ctx, psize.Unpadded(), r)
if err != nil {
return xerrors.Errorf("computing data CID: %w", err)
}
fmt.Println(pc.PieceCID, " ", pc.Size)
return nil
},
}

View File

@ -2331,6 +2331,7 @@ COMMANDS:
workers list workers workers list workers
sched-diag Dump internal scheduler state sched-diag Dump internal scheduler state
abort Abort a running job abort Abort a running job
data-cid Compute data CID using workers
help, h Shows a list of commands or help for one command help, h Shows a list of commands or help for one command
OPTIONS: OPTIONS:
@ -2393,3 +2394,17 @@ OPTIONS:
--help, -h show help (default: false) --help, -h show help (default: false)
``` ```
### lotus-miner sealing data-cid
```
NAME:
lotus-miner sealing data-cid - Compute data CID using workers
USAGE:
lotus-miner sealing data-cid [command options] [file/url] <padded piece size>
OPTIONS:
--file-size value real file size (default: 0)
--help, -h show help (default: false)
```

View File

@ -32,6 +32,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/partialfile" "github.com/filecoin-project/lotus/extern/sector-storage/partialfile"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader" nr "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
"github.com/filecoin-project/lotus/lib/nullreader"
) )
var _ Storage = &Sealer{} var _ Storage = &Sealer{}
@ -53,6 +54,11 @@ func (sb *Sealer) NewSector(ctx context.Context, sector storage.SectorRef) error
} }
func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
pieceData = io.LimitReader(io.MultiReader(
pieceData,
nullreader.Reader{},
), int64(pieceSize))
// TODO: allow tuning those: // TODO: allow tuning those:
chunk := abi.PaddedPieceSize(4 << 20) chunk := abi.PaddedPieceSize(4 << 20)
parallel := runtime.NumCPU() parallel := runtime.NumCPU()
@ -73,6 +79,7 @@ func (sb *Sealer) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize,
for { for {
var read int var read int
for rbuf := buf; len(rbuf) > 0; { for rbuf := buf; len(rbuf) > 0; {
n, err := pieceData.Read(rbuf) n, err := pieceData.Read(rbuf)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err) return abi.PieceInfo{}, xerrors.Errorf("pr read error: %w", err)

View File

@ -49,17 +49,23 @@ func TestWorkerDataCid(t *testing.T) {
e, err := worker.Enabled(ctx) e, err := worker.Enabled(ctx)
require.NoError(t, err) require.NoError(t, err)
require.True(t, e) require.True(t, e)
/*
pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016))) pi, err := miner.ComputeDataCid(ctx, 1016, strings.NewReader(strings.Repeat("a", 1016)))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size) require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String()) require.Equal(t, "baga6ea4seaqlhznlutptgfwhffupyer6txswamerq5fc2jlwf2lys2mm5jtiaeq", pi.PieceCID.String())
*/
bigPiece := abi.PaddedPieceSize(16 << 20).Unpadded() bigPiece := abi.PaddedPieceSize(16 << 20).Unpadded()
pi, err := miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece)))) pi, err = miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(bigPiece))))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, bigPiece.Padded(), pi.Size) require.Equal(t, bigPiece.Padded(), pi.Size)
require.Equal(t, "baga6ea4seaqmhoxl2ybw5m2wyd3pt3h4zmp7j52yumzu2rar26twns3uocq7yfa", pi.PieceCID.String()) require.Equal(t, "baga6ea4seaqmhoxl2ybw5m2wyd3pt3h4zmp7j52yumzu2rar26twns3uocq7yfa", pi.PieceCID.String())
nonFullPiece := abi.PaddedPieceSize(10 << 20).Unpadded()
pi, err = miner.ComputeDataCid(ctx, bigPiece, strings.NewReader(strings.Repeat("a", int(nonFullPiece))))
require.NoError(t, err)
require.Equal(t, bigPiece.Padded(), pi.Size)
require.Equal(t, "baga6ea4seaqbxib4pdxs5cqdn3fmtj4rcxk6rx6ztiqmrx7fcpo3ymuxbp2rodi", pi.PieceCID.String())
} }
func TestWinningPostWorker(t *testing.T) { func TestWinningPostWorker(t *testing.T) {

View File

@ -0,0 +1,47 @@
package httpreader
import (
"io"
"net/http"
"golang.org/x/xerrors"
)
// HttpReader is a reader which will read a http resource with a simple get request.
// Before first Read it will be passed over JsonRPC as a URL.
type HttpReader struct {
URL string
reader io.ReadCloser
}
func (h *HttpReader) Close() error {
h.URL = ""
if h.reader != nil {
return h.reader.Close()
}
return nil
}
func (h *HttpReader) Read(p []byte) (n int, err error) {
if h.reader == nil {
res, err := http.Get(h.URL)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, xerrors.Errorf("unexpected http status %d", res.StatusCode)
}
// mark the reader as reading
h.URL = ""
h.reader = res.Body
}
if h.reader == nil {
return 0, xerrors.Errorf("http reader closed")
}
return h.reader.Read(p)
}
var _ io.ReadCloser = &HttpReader{}

View File

@ -23,6 +23,7 @@ import (
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader" "github.com/filecoin-project/lotus/extern/storage-sealing/lib/nullreader"
"github.com/filecoin-project/lotus/lib/httpreader"
) )
var log = logging.Logger("rpcenc") var log = logging.Logger("rpcenc")
@ -34,6 +35,7 @@ type StreamType string
const ( const (
Null StreamType = "null" Null StreamType = "null"
PushStream StreamType = "push" PushStream StreamType = "push"
HTTP StreamType = "http"
// TODO: Data transfer handoff to workers? // TODO: Data transfer handoff to workers?
) )
@ -105,6 +107,9 @@ func ReaderParamEncoder(addr string) jsonrpc.Option {
if r, ok := r.(*nullreader.NullReader); ok { if r, ok := r.(*nullreader.NullReader); ok {
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
} }
if r, ok := r.(*httpreader.HttpReader); ok && r.URL != "" {
return reflect.ValueOf(ReaderStream{Type: HTTP, Info: r.URL}), nil
}
reqID := uuid.New() reqID := uuid.New()
u, err := url.Parse(addr) u, err := url.Parse(addr)
@ -413,13 +418,16 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err) return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
} }
if rs.Type == Null { switch rs.Type {
case Null:
n, err := strconv.ParseInt(rs.Info, 10, 64) n, err := strconv.ParseInt(rs.Info, 10, 64)
if err != nil { if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err) return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
} }
return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil
case HTTP:
return reflect.ValueOf(&httpreader.HttpReader{URL: rs.Info}), nil
} }
u, err := uuid.Parse(rs.Info) u, err := uuid.Parse(rs.Info)