Support AddPiece on workers

This commit is contained in:
Łukasz Magiera 2020-08-14 16:06:53 +02:00
parent 4d204d2c25
commit 0c75dd3865
12 changed files with 325 additions and 19 deletions

View File

@ -23,6 +23,8 @@ type WorkerAPI interface {
Paths(context.Context) ([]stores.StoragePath, error)
Info(context.Context) (storiface.WorkerInfo, error)
AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error)
storage.Sealer
MoveStorage(ctx context.Context, sector abi.SectorID) error

View File

@ -300,6 +300,7 @@ type WorkerStruct struct {
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
AddPiece func(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"`
@ -1147,6 +1148,10 @@ func (w *WorkerStruct) Info(ctx context.Context) (storiface.WorkerInfo, error) {
return w.Internal.Info(ctx)
}
func (w *WorkerStruct) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
return w.Internal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
}
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces)
}

View File

@ -2,11 +2,14 @@ package client
import (
"net/http"
"net/url"
"path"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/apistruct"
"github.com/filecoin-project/lotus/lib/rpcenc"
)
// NewCommonRPC creates a new http jsonrpc client.
@ -49,12 +52,28 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine
}
func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerAPI, jsonrpc.ClientCloser, error) {
u, err := url.Parse(addr)
if err != nil {
return nil, nil, err
}
switch u.Scheme {
case "ws":
u.Scheme = "http"
case "wss":
u.Scheme = "https"
}
///rpc/v0 -> /rpc/streams/v0/push
u.Path = path.Join(u.Path, "../streams/v0/push")
readerEncoder := rpcenc.ReaderParamEncoder(u.String())
var res apistruct.WorkerStruct
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
[]interface{}{
&res.Internal,
},
requestHeader,
requestHeader, readerEncoder,
)
return &res, closer, err

View File

@ -29,6 +29,7 @@ import (
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/lotuslog"
"github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/node/repo"
sectorstorage "github.com/filecoin-project/sector-storage"
"github.com/filecoin-project/sector-storage/sealtasks"
@ -105,6 +106,11 @@ var runCmd = &cli.Command{
Name: "no-local-storage",
Usage: "don't use storageminer repo for sector storage",
},
&cli.BoolFlag{
Name: "addpiece",
Usage: "enable addpiece",
Value: true,
},
&cli.BoolFlag{
Name: "precommit1",
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
@ -204,6 +210,9 @@ var runCmd = &cli.Command{
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
if cctx.Bool("addpiece") {
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
}
if cctx.Bool("precommit1") {
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
}
@ -337,10 +346,12 @@ var runCmd = &cli.Command{
log.Info("Setting up control endpoint at " + address)
rpcServer := jsonrpc.NewServer()
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof

View File

@ -2,19 +2,12 @@ package sealing
import (
"context"
"io"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
nr "github.com/filecoin-project/storage-fsm/lib/nullreader"
)
func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader {
return io.LimitReader(&nr.Reader{}, int64(size))
}
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
if len(sizes) == 0 {
return nil, nil
@ -24,7 +17,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, exist
out := make([]abi.PieceInfo, len(sizes))
for i, size := range sizes {
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, m.pledgeReader(size))
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
if err != nil {
return nil, xerrors.Errorf("add piece: %w", err)
}

20
extern/storage-fsm/nullreader.go vendored Normal file
View File

@ -0,0 +1,20 @@
package sealing
import (
"io"
"github.com/filecoin-project/specs-actors/actors/abi"
nr "github.com/filecoin-project/storage-fsm/lib/nullreader"
)
type NullReader struct {
*io.LimitedReader
}
func NewNullReader(size abi.UnpaddedPieceSize) io.Reader {
return &NullReader{(io.LimitReader(&nr.Reader{}, int64(size))).(*io.LimitedReader)}
}
func (m NullReader) NullBytes() int64 {
return m.N
}

View File

@ -146,7 +146,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
}
for _, p := range pads {
err = m.addPiece(ctx, sid, p.Unpadded(), m.pledgeReader(p.Unpadded()), nil)
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
if err != nil {
m.unsealedInfoMap.mux.Unlock()
return 0, 0, xerrors.Errorf("writing pads: %w", err)

2
go.mod
View File

@ -28,7 +28,7 @@ require (
github.com/filecoin-project/go-data-transfer v0.6.1
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261

5
go.sum
View File

@ -242,8 +242,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb h1:eCLqJb1tmhMCWUFAfJuSyyv/qLrqiAhICLjhUcbi4x8=
github.com/filecoin-project/go-fil-markets v0.5.6-0.20200814021159-7be996ed8ccb/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms=
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b h1:t0PXF+dR91A+yXJ16cX5E5ByQwaMcnVjPMcpY2qXDYQ=
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200727193017-75111571155b/go.mod h1:qABZ0IE+F15++XdVZgKCE919XWw54DKrbe4Wdx3jWrE=
github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0DzzQwqsL0XarpnI=
github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ=
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
@ -385,6 +385,7 @@ github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORR
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA/KrfX8Bi1LQSO4pzoVjTiL3h4Jk+Zk=
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/handlers v1.4.2 h1:0QniY0USkHQ1RGCLfKxeNHK9bkDHGRYGNDFBCS+YARg=
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=

170
lib/rpcenc/reader.go Normal file
View File

@ -0,0 +1,170 @@
package rpcenc
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"reflect"
"strconv"
"sync"
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/specs-actors/actors/abi"
sealing "github.com/filecoin-project/storage-fsm"
)
var log = logging.Logger("rpc")
type StreamType string
const (
Null StreamType = "null"
PushStream StreamType = "push"
// TODO: Data transfer handoff to workers?
)
type ReaderStream struct {
Type StreamType
Info string
}
func ReaderParamEncoder(addr string) jsonrpc.Option {
return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) {
r := value.Interface().(io.Reader)
if r, ok := r.(*sealing.NullReader); ok {
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
}
reqID := uuid.New()
u, _ := url.Parse(addr)
u.Path = path.Join(u.Path, reqID.String())
go func() {
// TODO: figure out errors here
resp, err := http.Post(u.String(), "application/octet-stream", r)
if err != nil {
log.Errorf("sending reader param: %+v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Errorf("sending reader param: non-200 status: ", resp.Status)
return
}
}()
return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil
})
}
type waitReadCloser struct {
io.ReadCloser
wait chan struct{}
}
func (w *waitReadCloser) Read(p []byte) (int, error) {
n, err := w.ReadCloser.Read(p)
if err != nil {
close(w.wait)
}
return n, err
}
func (w *waitReadCloser) Close() error {
close(w.wait)
return w.ReadCloser.Close()
}
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
var readersLk sync.Mutex
readers := map[uuid.UUID]chan *waitReadCloser{}
hnd := func(resp http.ResponseWriter, req *http.Request) {
strId := path.Base(req.URL.Path)
u, err := uuid.Parse(strId)
if err != nil {
http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400)
}
readersLk.Lock()
ch, found := readers[u]
if !found {
ch = make(chan *waitReadCloser)
readers[u] = ch
}
readersLk.Unlock()
wr := &waitReadCloser{
ReadCloser: req.Body,
wait: make(chan struct{}),
}
select {
case ch <- wr:
case <-req.Context().Done():
log.Error("context error in reader stream handler (1): %v", req.Context().Err())
resp.WriteHeader(500)
return
}
select {
case <-wr.wait:
case <-req.Context().Done():
log.Error("context error in reader stream handler (2): %v", req.Context().Err())
resp.WriteHeader(500)
return
}
resp.WriteHeader(200)
}
dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) {
var rs ReaderStream
if err := json.Unmarshal(b, &rs); err != nil {
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
}
if rs.Type == Null {
n, err := strconv.ParseInt(rs.Info, 10, 64)
if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
}
return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil
}
u, err := uuid.Parse(rs.Info)
if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing reader UUDD: %w", err)
}
readersLk.Lock()
ch, found := readers[u]
if !found {
ch = make(chan *waitReadCloser)
readers[u] = ch
}
readersLk.Unlock()
select {
case wr := <-ch:
return reflect.ValueOf(wr), nil
case <-ctx.Done():
return reflect.Value{}, ctx.Err()
}
})
return hnd, dec
}

90
lib/rpcenc/reader_test.go Normal file
View File

@ -0,0 +1,90 @@
package rpcenc
import (
"context"
"io"
"io/ioutil"
"net/http/httptest"
"strings"
"testing"
"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-jsonrpc"
sealing "github.com/filecoin-project/storage-fsm"
)
type ReaderHandler struct {
}
func (h *ReaderHandler) ReadAll(ctx context.Context, r io.Reader) ([]byte, error) {
return ioutil.ReadAll(r)
}
func (h *ReaderHandler) ReadNullLen(ctx context.Context, r io.Reader) (int64, error) {
return r.(*sealing.NullReader).N, nil
}
func (h *ReaderHandler) ReadUrl(ctx context.Context, u string) (string, error) {
return u, nil
}
func TestReaderProxy(t *testing.T) {
var client struct {
ReadAll func(ctx context.Context, r io.Reader) ([]byte, error)
}
serverHandler := &ReaderHandler{}
readerHandler, readerServerOpt := ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("ReaderHandler", serverHandler)
mux := mux.NewRouter()
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
testServ := httptest.NewServer(mux)
defer testServ.Close()
re := ReaderParamEncoder("http://"+testServ.Listener.Addr().String()+"/rpc/streams/v0/push")
closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re)
require.NoError(t, err)
defer closer()
read, err := client.ReadAll(context.TODO(), strings.NewReader("pooooootato"))
require.NoError(t, err)
require.Equal(t, "pooooootato", string(read), "potatos weren't equal")
}
func TestNullReaderProxy(t *testing.T) {
var client struct {
ReadAll func(ctx context.Context, r io.Reader) ([]byte, error)
ReadNullLen func(ctx context.Context, r io.Reader) (int64, error)
}
serverHandler := &ReaderHandler{}
readerHandler, readerServerOpt := ReaderParamDecoder()
rpcServer := jsonrpc.NewServer(readerServerOpt)
rpcServer.Register("ReaderHandler", serverHandler)
mux := mux.NewRouter()
mux.Handle("/rpc/v0", rpcServer)
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
testServ := httptest.NewServer(mux)
defer testServ.Close()
re := ReaderParamEncoder("http://"+testServ.Listener.Addr().String()+"/rpc/streams/v0/push")
closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re)
require.NoError(t, err)
defer closer()
n, err := client.ReadNullLen(context.TODO(), sealing.NewNullReader(1016))
require.NoError(t, err)
require.Equal(t, int64(1016), n)
}

View File

@ -9,7 +9,6 @@ import (
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/specs-actors/actors/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/client"
@ -25,10 +24,6 @@ func (r *remoteWorker) NewSector(ctx context.Context, sector abi.SectorID) error
return xerrors.New("unsupported")
}
func (r *remoteWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
return abi.PieceInfo{}, xerrors.New("unsupported")
}
func connectRemoteWorker(ctx context.Context, fa api.Common, url string) (*remoteWorker, error) {
token, err := fa.AuthNew(ctx, []auth.Permission{"admin"})
if err != nil {