Merge pull request #3053 from filecoin-project/feat/worker-addpiece2
Support AddPiece on workers
This commit is contained in:
commit
45476a208e
@ -23,6 +23,8 @@ type WorkerAPI interface {
|
||||
Paths(context.Context) ([]stores.StoragePath, error)
|
||||
Info(context.Context) (storiface.WorkerInfo, error)
|
||||
|
||||
AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error)
|
||||
|
||||
storage.Sealer
|
||||
|
||||
MoveStorage(ctx context.Context, sector abi.SectorID) error
|
||||
|
@ -300,6 +300,7 @@ type WorkerStruct struct {
|
||||
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
|
||||
Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
|
||||
|
||||
AddPiece func(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) `perm:"admin"`
|
||||
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) `perm:"admin"`
|
||||
SealPreCommit2 func(context.Context, abi.SectorID, storage.PreCommit1Out) (cids storage.SectorCids, err error) `perm:"admin"`
|
||||
SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) `perm:"admin"`
|
||||
@ -1147,6 +1148,10 @@ func (w *WorkerStruct) Info(ctx context.Context) (storiface.WorkerInfo, error) {
|
||||
return w.Internal.Info(ctx)
|
||||
}
|
||||
|
||||
func (w *WorkerStruct) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
|
||||
return w.Internal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
|
||||
}
|
||||
|
||||
func (w *WorkerStruct) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
|
||||
return w.Internal.SealPreCommit1(ctx, sector, ticket, pieces)
|
||||
}
|
||||
|
@ -2,11 +2,15 @@ package client
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/apistruct"
|
||||
"github.com/filecoin-project/lotus/lib/rpcenc"
|
||||
)
|
||||
|
||||
// NewCommonRPC creates a new http jsonrpc client.
|
||||
@ -49,12 +53,29 @@ func NewStorageMinerRPC(addr string, requestHeader http.Header) (api.StorageMine
|
||||
}
|
||||
|
||||
func NewWorkerRPC(addr string, requestHeader http.Header) (api.WorkerAPI, jsonrpc.ClientCloser, error) {
|
||||
u, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
switch u.Scheme {
|
||||
case "ws":
|
||||
u.Scheme = "http"
|
||||
case "wss":
|
||||
u.Scheme = "https"
|
||||
}
|
||||
///rpc/v0 -> /rpc/streams/v0/push
|
||||
|
||||
u.Path = path.Join(u.Path, "../streams/v0/push")
|
||||
|
||||
var res apistruct.WorkerStruct
|
||||
closer, err := jsonrpc.NewMergeClient(addr, "Filecoin",
|
||||
[]interface{}{
|
||||
&res.Internal,
|
||||
},
|
||||
requestHeader,
|
||||
rpcenc.ReaderParamEncoder(u.String()),
|
||||
jsonrpc.WithNoReconnect(),
|
||||
jsonrpc.WithWriteTimeout(30*time.Second),
|
||||
)
|
||||
|
||||
return &res, closer, err
|
||||
|
@ -25,7 +25,7 @@ func buildType() string {
|
||||
}
|
||||
|
||||
// BuildVersion is the local build version, set by build system
|
||||
const BuildVersion = "0.4.4"
|
||||
const BuildVersion = "0.4.5"
|
||||
|
||||
func UserVersion() string {
|
||||
return BuildVersion + buildType() + CurrentCommit
|
||||
@ -53,7 +53,7 @@ func (ve Version) EqMajorMinor(v2 Version) bool {
|
||||
}
|
||||
|
||||
// APIVersion is a semver version of the rpc api exposed
|
||||
var APIVersion Version = newVer(0, 10, 0)
|
||||
var APIVersion Version = newVer(0, 11, 0)
|
||||
|
||||
//nolint:varcheck,deadcode
|
||||
const (
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/lib/lotuslog"
|
||||
"github.com/filecoin-project/lotus/lib/rpcenc"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
sectorstorage "github.com/filecoin-project/sector-storage"
|
||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||
@ -105,6 +106,11 @@ var runCmd = &cli.Command{
|
||||
Name: "no-local-storage",
|
||||
Usage: "don't use storageminer repo for sector storage",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "addpiece",
|
||||
Usage: "enable addpiece",
|
||||
Value: true,
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "precommit1",
|
||||
Usage: "enable precommit1 (32G sectors: 1 core, 128GiB Memory)",
|
||||
@ -204,6 +210,9 @@ var runCmd = &cli.Command{
|
||||
|
||||
taskTypes = append(taskTypes, sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize)
|
||||
|
||||
if cctx.Bool("addpiece") {
|
||||
taskTypes = append(taskTypes, sealtasks.TTAddPiece)
|
||||
}
|
||||
if cctx.Bool("precommit1") {
|
||||
taskTypes = append(taskTypes, sealtasks.TTPreCommit1)
|
||||
}
|
||||
@ -337,10 +346,12 @@ var runCmd = &cli.Command{
|
||||
|
||||
log.Info("Setting up control endpoint at " + address)
|
||||
|
||||
rpcServer := jsonrpc.NewServer()
|
||||
readerHandler, readerServerOpt := rpcenc.ReaderParamDecoder()
|
||||
rpcServer := jsonrpc.NewServer(readerServerOpt)
|
||||
rpcServer.Register("Filecoin", apistruct.PermissionedWorkerAPI(workerApi))
|
||||
|
||||
mux.Handle("/rpc/v0", rpcServer)
|
||||
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
|
||||
mux.PathPrefix("/remote").HandlerFunc((&stores.FetchHandler{Local: localStore}).ServeHTTP)
|
||||
mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
|
||||
|
||||
|
9
extern/storage-fsm/garbage.go
vendored
9
extern/storage-fsm/garbage.go
vendored
@ -2,19 +2,12 @@ package sealing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
|
||||
nr "github.com/filecoin-project/storage-fsm/lib/nullreader"
|
||||
)
|
||||
|
||||
func (m *Sealing) pledgeReader(size abi.UnpaddedPieceSize) io.Reader {
|
||||
return io.LimitReader(&nr.Reader{}, int64(size))
|
||||
}
|
||||
|
||||
func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) {
|
||||
if len(sizes) == 0 {
|
||||
return nil, nil
|
||||
@ -24,7 +17,7 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, exist
|
||||
|
||||
out := make([]abi.PieceInfo, len(sizes))
|
||||
for i, size := range sizes {
|
||||
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, m.pledgeReader(size))
|
||||
ppi, err := m.sealer.AddPiece(ctx, sectorID, existingPieceSizes, size, NewNullReader(size))
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("add piece: %w", err)
|
||||
}
|
||||
|
20
extern/storage-fsm/nullreader.go
vendored
Normal file
20
extern/storage-fsm/nullreader.go
vendored
Normal file
@ -0,0 +1,20 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
nr "github.com/filecoin-project/storage-fsm/lib/nullreader"
|
||||
)
|
||||
|
||||
type NullReader struct {
|
||||
*io.LimitedReader
|
||||
}
|
||||
|
||||
func NewNullReader(size abi.UnpaddedPieceSize) io.Reader {
|
||||
return &NullReader{(io.LimitReader(&nr.Reader{}, int64(size))).(*io.LimitedReader)}
|
||||
}
|
||||
|
||||
func (m NullReader) NullBytes() int64 {
|
||||
return m.N
|
||||
}
|
2
extern/storage-fsm/sealing.go
vendored
2
extern/storage-fsm/sealing.go
vendored
@ -146,7 +146,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
|
||||
}
|
||||
|
||||
for _, p := range pads {
|
||||
err = m.addPiece(ctx, sid, p.Unpadded(), m.pledgeReader(p.Unpadded()), nil)
|
||||
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
|
||||
if err != nil {
|
||||
m.unsealedInfoMap.mux.Unlock()
|
||||
return 0, 0, xerrors.Errorf("writing pads: %w", err)
|
||||
|
2
go.mod
2
go.mod
@ -28,7 +28,7 @@ require (
|
||||
github.com/filecoin-project/go-data-transfer v0.6.1
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f
|
||||
github.com/filecoin-project/go-fil-markets v0.5.6
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c
|
||||
github.com/filecoin-project/go-multistore v0.0.3
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6
|
||||
github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261
|
||||
|
5
go.sum
5
go.sum
@ -242,8 +242,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f h1
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
||||
github.com/filecoin-project/go-fil-markets v0.5.6 h1:WmBbV0qBU4NvLJ64xROpzrKUbkZxZqszZiEiCGmCEIY=
|
||||
github.com/filecoin-project/go-fil-markets v0.5.6/go.mod h1:SJApXAKr5jyGpbzDEOhvemui0pih7hhT8r2MXJxCP1E=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24 h1:Jc7vkplmZYVuaEcSXGHDwefvZIdoyyaoGDLqSr8Svms=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.1-0.20200602181149-522144ab4e24/go.mod h1:j6zV//WXIIY5kky873Q3iIKt/ViOE8rcijovmpxrXzM=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c h1:FB8d8m2qbmdOovxEu40m4yvpLuHDDpLJKQyck9IPFSg=
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.2-0.20200814204552-b33a1ad44a4c/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4=
|
||||
github.com/filecoin-project/go-multistore v0.0.3 h1:vaRBY4YiA2UZFPK57RNuewypB8u0DzzQwqsL0XarpnI=
|
||||
github.com/filecoin-project/go-multistore v0.0.3/go.mod h1:kaNqCC4IhU4B1uyr7YWFHd23TL4KM32aChS0jNkyUvQ=
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200210211231-548257017ca6 h1:92PET+sx1Hb4W/8CgFwGuxaKbttwY+UNspYZTvXY0vs=
|
||||
@ -385,6 +385,7 @@ github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORR
|
||||
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA/KrfX8Bi1LQSO4pzoVjTiL3h4Jk+Zk=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
|
||||
github.com/gorilla/handlers v1.4.2 h1:0QniY0USkHQ1RGCLfKxeNHK9bkDHGRYGNDFBCS+YARg=
|
||||
github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/OAirnOIQ=
|
||||
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
|
186
lib/rpcenc/reader.go
Normal file
186
lib/rpcenc/reader.go
Normal file
@ -0,0 +1,186 @@
|
||||
package rpcenc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
sealing "github.com/filecoin-project/storage-fsm"
|
||||
)
|
||||
|
||||
var log = logging.Logger("rpcenc")
|
||||
|
||||
var Timeout = 30 * time.Second
|
||||
|
||||
type StreamType string
|
||||
|
||||
const (
|
||||
Null StreamType = "null"
|
||||
PushStream StreamType = "push"
|
||||
// TODO: Data transfer handoff to workers?
|
||||
)
|
||||
|
||||
type ReaderStream struct {
|
||||
Type StreamType
|
||||
Info string
|
||||
}
|
||||
|
||||
func ReaderParamEncoder(addr string) jsonrpc.Option {
|
||||
return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) {
|
||||
r := value.Interface().(io.Reader)
|
||||
|
||||
if r, ok := r.(*sealing.NullReader); ok {
|
||||
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
|
||||
}
|
||||
|
||||
reqID := uuid.New()
|
||||
u, _ := url.Parse(addr)
|
||||
u.Path = path.Join(u.Path, reqID.String())
|
||||
|
||||
go func() {
|
||||
// TODO: figure out errors here
|
||||
|
||||
resp, err := http.Post(u.String(), "application/octet-stream", r)
|
||||
if err != nil {
|
||||
log.Errorf("sending reader param: %+v", err)
|
||||
return
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
log.Errorf("sending reader param: non-200 status: ", resp.Status)
|
||||
return
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil
|
||||
})
|
||||
}
|
||||
|
||||
type waitReadCloser struct {
|
||||
io.ReadCloser
|
||||
wait chan struct{}
|
||||
}
|
||||
|
||||
func (w *waitReadCloser) Read(p []byte) (int, error) {
|
||||
n, err := w.ReadCloser.Read(p)
|
||||
if err != nil {
|
||||
close(w.wait)
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (w *waitReadCloser) Close() error {
|
||||
close(w.wait)
|
||||
return w.ReadCloser.Close()
|
||||
}
|
||||
|
||||
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
|
||||
var readersLk sync.Mutex
|
||||
readers := map[uuid.UUID]chan *waitReadCloser{}
|
||||
|
||||
hnd := func(resp http.ResponseWriter, req *http.Request) {
|
||||
strId := path.Base(req.URL.Path)
|
||||
u, err := uuid.Parse(strId)
|
||||
if err != nil {
|
||||
http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400)
|
||||
return
|
||||
}
|
||||
|
||||
readersLk.Lock()
|
||||
ch, found := readers[u]
|
||||
if !found {
|
||||
ch = make(chan *waitReadCloser)
|
||||
readers[u] = ch
|
||||
}
|
||||
readersLk.Unlock()
|
||||
|
||||
wr := &waitReadCloser{
|
||||
ReadCloser: req.Body,
|
||||
wait: make(chan struct{}),
|
||||
}
|
||||
|
||||
tctx, cancel := context.WithTimeout(req.Context(), Timeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case ch <- wr:
|
||||
case <-tctx.Done():
|
||||
close(ch)
|
||||
log.Error("context error in reader stream handler (1): %v", tctx.Err())
|
||||
resp.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-wr.wait:
|
||||
case <-req.Context().Done():
|
||||
log.Error("context error in reader stream handler (2): %v", req.Context().Err())
|
||||
resp.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
|
||||
resp.WriteHeader(200)
|
||||
}
|
||||
|
||||
dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) {
|
||||
var rs ReaderStream
|
||||
if err := json.Unmarshal(b, &rs); err != nil {
|
||||
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
|
||||
}
|
||||
|
||||
if rs.Type == Null {
|
||||
n, err := strconv.ParseInt(rs.Info, 10, 64)
|
||||
if err != nil {
|
||||
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
|
||||
}
|
||||
|
||||
return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil
|
||||
}
|
||||
|
||||
u, err := uuid.Parse(rs.Info)
|
||||
if err != nil {
|
||||
return reflect.Value{}, xerrors.Errorf("parsing reader UUDD: %w", err)
|
||||
}
|
||||
|
||||
readersLk.Lock()
|
||||
ch, found := readers[u]
|
||||
if !found {
|
||||
ch = make(chan *waitReadCloser)
|
||||
readers[u] = ch
|
||||
}
|
||||
readersLk.Unlock()
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, Timeout)
|
||||
defer cancel()
|
||||
|
||||
select {
|
||||
case wr, ok := <-ch:
|
||||
if !ok {
|
||||
return reflect.Value{}, xerrors.Errorf("handler timed out")
|
||||
}
|
||||
|
||||
return reflect.ValueOf(wr), nil
|
||||
case <-ctx.Done():
|
||||
return reflect.Value{}, ctx.Err()
|
||||
}
|
||||
})
|
||||
|
||||
return hnd, dec
|
||||
}
|
90
lib/rpcenc/reader_test.go
Normal file
90
lib/rpcenc/reader_test.go
Normal file
@ -0,0 +1,90 @@
|
||||
package rpcenc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
sealing "github.com/filecoin-project/storage-fsm"
|
||||
)
|
||||
|
||||
type ReaderHandler struct {
|
||||
}
|
||||
|
||||
func (h *ReaderHandler) ReadAll(ctx context.Context, r io.Reader) ([]byte, error) {
|
||||
return ioutil.ReadAll(r)
|
||||
}
|
||||
|
||||
func (h *ReaderHandler) ReadNullLen(ctx context.Context, r io.Reader) (int64, error) {
|
||||
return r.(*sealing.NullReader).N, nil
|
||||
}
|
||||
|
||||
func (h *ReaderHandler) ReadUrl(ctx context.Context, u string) (string, error) {
|
||||
return u, nil
|
||||
}
|
||||
|
||||
func TestReaderProxy(t *testing.T) {
|
||||
var client struct {
|
||||
ReadAll func(ctx context.Context, r io.Reader) ([]byte, error)
|
||||
}
|
||||
|
||||
serverHandler := &ReaderHandler{}
|
||||
|
||||
readerHandler, readerServerOpt := ReaderParamDecoder()
|
||||
rpcServer := jsonrpc.NewServer(readerServerOpt)
|
||||
rpcServer.Register("ReaderHandler", serverHandler)
|
||||
|
||||
mux := mux.NewRouter()
|
||||
mux.Handle("/rpc/v0", rpcServer)
|
||||
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
|
||||
|
||||
testServ := httptest.NewServer(mux)
|
||||
defer testServ.Close()
|
||||
|
||||
re := ReaderParamEncoder("http://" + testServ.Listener.Addr().String() + "/rpc/streams/v0/push")
|
||||
closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer closer()
|
||||
|
||||
read, err := client.ReadAll(context.TODO(), strings.NewReader("pooooootato"))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "pooooootato", string(read), "potatos weren't equal")
|
||||
}
|
||||
|
||||
func TestNullReaderProxy(t *testing.T) {
|
||||
var client struct {
|
||||
ReadAll func(ctx context.Context, r io.Reader) ([]byte, error)
|
||||
ReadNullLen func(ctx context.Context, r io.Reader) (int64, error)
|
||||
}
|
||||
|
||||
serverHandler := &ReaderHandler{}
|
||||
|
||||
readerHandler, readerServerOpt := ReaderParamDecoder()
|
||||
rpcServer := jsonrpc.NewServer(readerServerOpt)
|
||||
rpcServer.Register("ReaderHandler", serverHandler)
|
||||
|
||||
mux := mux.NewRouter()
|
||||
mux.Handle("/rpc/v0", rpcServer)
|
||||
mux.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
|
||||
|
||||
testServ := httptest.NewServer(mux)
|
||||
defer testServ.Close()
|
||||
|
||||
re := ReaderParamEncoder("http://" + testServ.Listener.Addr().String() + "/rpc/streams/v0/push")
|
||||
closer, err := jsonrpc.NewMergeClient("ws://"+testServ.Listener.Addr().String()+"/rpc/v0", "ReaderHandler", []interface{}{&client}, nil, re)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer closer()
|
||||
|
||||
n, err := client.ReadNullLen(context.TODO(), sealing.NewNullReader(1016))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1016), n)
|
||||
}
|
@ -9,7 +9,6 @@ import (
|
||||
"github.com/filecoin-project/go-jsonrpc"
|
||||
"github.com/filecoin-project/go-jsonrpc/auth"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
storage2 "github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/api/client"
|
||||
@ -25,10 +24,6 @@ func (r *remoteWorker) NewSector(ctx context.Context, sector abi.SectorID) error
|
||||
return xerrors.New("unsupported")
|
||||
}
|
||||
|
||||
func (r *remoteWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage2.Data) (abi.PieceInfo, error) {
|
||||
return abi.PieceInfo{}, xerrors.New("unsupported")
|
||||
}
|
||||
|
||||
func connectRemoteWorker(ctx context.Context, fa api.Common, url string) (*remoteWorker, error) {
|
||||
token, err := fa.AuthNew(ctx, []auth.Permission{"admin"})
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user