lotus/lib/rpcenc/reader.go

185 lines
3.9 KiB
Go
Raw Normal View History

2020-08-14 14:06:53 +00:00
package rpcenc
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"
"reflect"
"strconv"
"sync"
2020-08-14 21:12:37 +00:00
"time"
2020-08-14 14:06:53 +00:00
"github.com/google/uuid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/specs-actors/actors/abi"
sealing "github.com/filecoin-project/storage-fsm"
)
2020-08-14 21:12:37 +00:00
var log = logging.Logger("rpcenc")
var Timeout = 30 * time.Second
2020-08-14 14:06:53 +00:00
type StreamType string
const (
Null StreamType = "null"
PushStream StreamType = "push"
// TODO: Data transfer handoff to workers?
)
type ReaderStream struct {
Type StreamType
Info string
}
func ReaderParamEncoder(addr string) jsonrpc.Option {
return jsonrpc.WithParamEncoder(new(io.Reader), func(value reflect.Value) (reflect.Value, error) {
r := value.Interface().(io.Reader)
if r, ok := r.(*sealing.NullReader); ok {
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
}
reqID := uuid.New()
u, _ := url.Parse(addr)
u.Path = path.Join(u.Path, reqID.String())
go func() {
// TODO: figure out errors here
resp, err := http.Post(u.String(), "application/octet-stream", r)
if err != nil {
log.Errorf("sending reader param: %+v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Errorf("sending reader param: non-200 status: ", resp.Status)
return
}
}()
return reflect.ValueOf(ReaderStream{Type: PushStream, Info: reqID.String()}), nil
})
}
type waitReadCloser struct {
io.ReadCloser
wait chan struct{}
}
func (w *waitReadCloser) Read(p []byte) (int, error) {
n, err := w.ReadCloser.Read(p)
if err != nil {
close(w.wait)
}
return n, err
}
func (w *waitReadCloser) Close() error {
close(w.wait)
return w.ReadCloser.Close()
}
func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
var readersLk sync.Mutex
readers := map[uuid.UUID]chan *waitReadCloser{}
hnd := func(resp http.ResponseWriter, req *http.Request) {
strId := path.Base(req.URL.Path)
u, err := uuid.Parse(strId)
if err != nil {
http.Error(resp, fmt.Sprintf("parsing reader uuid: %s", err), 400)
}
readersLk.Lock()
ch, found := readers[u]
if !found {
ch = make(chan *waitReadCloser)
readers[u] = ch
}
readersLk.Unlock()
wr := &waitReadCloser{
ReadCloser: req.Body,
wait: make(chan struct{}),
}
2020-08-14 21:12:37 +00:00
tctx, cancel := context.WithTimeout(req.Context(), Timeout)
defer cancel()
2020-08-14 14:06:53 +00:00
select {
case ch <- wr:
2020-08-14 21:12:37 +00:00
case <-tctx.Done():
close(ch)
log.Error("context error in reader stream handler (1): %v", tctx.Err())
2020-08-14 14:06:53 +00:00
resp.WriteHeader(500)
return
}
select {
case <-wr.wait:
case <-req.Context().Done():
log.Error("context error in reader stream handler (2): %v", req.Context().Err())
resp.WriteHeader(500)
return
}
resp.WriteHeader(200)
}
dec := jsonrpc.WithParamDecoder(new(io.Reader), func(ctx context.Context, b []byte) (reflect.Value, error) {
var rs ReaderStream
if err := json.Unmarshal(b, &rs); err != nil {
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
}
if rs.Type == Null {
n, err := strconv.ParseInt(rs.Info, 10, 64)
if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
}
return reflect.ValueOf(sealing.NewNullReader(abi.UnpaddedPieceSize(n))), nil
}
u, err := uuid.Parse(rs.Info)
if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing reader UUDD: %w", err)
}
readersLk.Lock()
ch, found := readers[u]
if !found {
ch = make(chan *waitReadCloser)
readers[u] = ch
}
readersLk.Unlock()
2020-08-14 21:12:37 +00:00
ctx, cancel := context.WithTimeout(ctx, Timeout)
defer cancel()
2020-08-14 14:06:53 +00:00
select {
2020-08-14 21:12:37 +00:00
case wr, ok := <-ch:
if !ok {
return reflect.Value{}, xerrors.Errorf("handler timed out")
}
2020-08-14 14:06:53 +00:00
return reflect.ValueOf(wr), nil
case <-ctx.Done():
return reflect.Value{}, ctx.Err()
}
})
return hnd, dec
}