192 lines
4.1 KiB
Go
192 lines
4.1 KiB
Go
package rpcenc
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"path"
|
|
"reflect"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-jsonrpc"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
|
)
|
|
|
|
var log = logging.Logger("rpcenc")
|
|
|
|
var Timeout = 30 * time.Second
|
|
|
|
type StreamType string
|
|
|
|
const (
|
|
Null StreamType = "null"
|
|
PushStream StreamType = "push"
|
|
// TODO: Data transfer handoff to workers?
|
|
)
|
|
|
|
type ReaderStream struct {
|
|
Type StreamType
|
|
Info string
|
|
}
|
|
|
|
func ReaderParamEncoder(addr string) jsonrpc.Option {
|
|
return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) {
|
|
r := value.Interface().(io.Reader)
|
|
|
|
if r, ok := r.(*sealing.NullReader); ok {
|
|
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
|
|
}
|
|
|
|
reqID := uuid.New()
|
|
u, err := url.Parse(addr)
|
|
if err != nil {
|
|
return reflect.Value{}, xerrors.Errorf("parsing push address: %w", err)
|
|
}
|
|
u.Path = path.Join(u.Path, reqID.String())
|
|
|
|
go func() {
|
|
// TODO: figure out errors here
|
|
|
|
resp, err := http.Post(u.String(), "application/octet-stream", r)
|
|
if err != nil {
|
|
log.Errorf("sending reader param: %+v", err)
|
|
return
|
|
}
|
|
|
|
defer resp.Body.Close() //nolint:errcheck
|
|
|
|
if resp.StatusCode != 200 {
|
|
b, _ := ioutil.ReadAll(resp.Body)
|
|
log.Errorf("sending reader param (%s): non-200 status: %s, msg: '%s'", u.String(), resp.Status, string(b))
|
|
return
|
|
}
|
|
|
|
}()
|
|
|
|
return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil
|
|
})
|
|
}
|
|
|
|
type waitReadCloser struct {
|
|
io.ReadCloser
|
|
wait chan struct{}
|
|
}
|
|
|
|
func (w *waitReadCloser) Read(p []byte) (int, error) {
|
|
n, err := w.ReadCloser.Read(p)
|
|
if err != nil {
|
|
close(w.wait)
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (w *waitReadCloser) Close() error {
|
|
close(w.wait)
|
|
return w.ReadCloser.Close()
|
|
}
|
|
|
|
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
|
|
var readersLk sync.Mutex
|
|
readers := map[uuid.UUID]chan *waitReadCloser{}
|
|
|
|
hnd := func(resp http.ResponseWriter, req *http.Request) {
|
|
strId := path.Base(req.URL.Path)
|
|
u, err := uuid.Parse(strId)
|
|
if err != nil {
|
|
http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400)
|
|
return
|
|
}
|
|
|
|
readersLk.Lock()
|
|
ch, found := readers[u]
|
|
if !found {
|
|
ch = make(chan *waitReadCloser)
|
|
readers[u] = ch
|
|
}
|
|
readersLk.Unlock()
|
|
|
|
wr := &waitReadCloser{
|
|
ReadCloser: req.Body,
|
|
wait: make(chan struct{}),
|
|
}
|
|
|
|
tctx, cancel := context.WithTimeout(req.Context(), Timeout)
|
|
defer cancel()
|
|
|
|
select {
|
|
case ch <- wr:
|
|
case <-tctx.Done():
|
|
close(ch)
|
|
log.Errorf("context error in reader stream handler (1): %v", tctx.Err())
|
|
resp.WriteHeader(500)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-wr.wait:
|
|
case <-req.Context().Done():
|
|
log.Errorf("context error in reader stream handler (2): %v", req.Context().Err())
|
|
resp.WriteHeader(500)
|
|
return
|
|
}
|
|
|
|
resp.WriteHeader(200)
|
|
}
|
|
|
|
dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) {
|
|
var rs ReaderStream
|
|
if err := json.Unmarshal(b, &rs); err != nil {
|
|
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
|
|
}
|
|
|
|
if rs.Type == Null {
|
|
n, err := strconv.ParseInt(rs.Info, 10, 64)
|
|
if err != nil {
|
|
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
|
|
}
|
|
|
|
return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil
|
|
}
|
|
|
|
u, err := uuid.Parse(rs.Info)
|
|
if err != nil {
|
|
return reflect.Value{}, xerrors.Errorf("parsing reader UUDD: %w", err)
|
|
}
|
|
|
|
readersLk.Lock()
|
|
ch, found := readers[u]
|
|
if !found {
|
|
ch = make(chan *waitReadCloser)
|
|
readers[u] = ch
|
|
}
|
|
readersLk.Unlock()
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, Timeout)
|
|
defer cancel()
|
|
|
|
select {
|
|
case wr, ok := <-ch:
|
|
if !ok {
|
|
return reflect.Value{}, xerrors.Errorf("handler timed out")
|
|
}
|
|
|
|
return reflect.ValueOf(wr), nil
|
|
case <-ctx.Done():
|
|
return reflect.Value{}, ctx.Err()
|
|
}
|
|
})
|
|
|
|
return hnd, dec
|
|
}
|