lib: support http readers passing over jsonrpc

This commit is contained in:
Łukasz Magiera 2022-05-24 15:47:23 +02:00
parent bacaccc378
commit 6095aba4dd
2 changed files with 56 additions and 1 deletions

View File

@ -0,0 +1,47 @@
package httpreader
import (
"io"
"net/http"
"golang.org/x/xerrors"
)
// HttpReader is a reader which will read a http resource with a simple get request.
// Before first Read it will be passed over JsonRPC as a URL.
type HttpReader struct {
URL string
reader io.ReadCloser
}
func (h *HttpReader) Close() error {
h.URL = ""
if h.reader != nil {
return h.reader.Close()
}
return nil
}
func (h *HttpReader) Read(p []byte) (n int, err error) {
if h.reader == nil {
res, err := http.Get(h.URL)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, xerrors.Errorf("unexpected http status %d", res.StatusCode)
}
// mark the reader as reading
h.URL = ""
h.reader = res.Body
}
if h.reader == nil {
return 0, xerrors.Errorf("http reader closed")
}
return h.reader.Read(p)
}
var _ io.ReadCloser = &HttpReader{}

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/filecoin-project/lotus/lib/httpreader"
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
@ -34,6 +35,7 @@ type StreamType string
const ( const (
Null StreamType = "null" Null StreamType = "null"
PushStream StreamType = "push" PushStream StreamType = "push"
HttpUrl StreamType = "http"
// TODO: Data transfer handoff to workers? // TODO: Data transfer handoff to workers?
) )
@ -105,6 +107,9 @@ func ReaderParamEncoder(addr string) jsonrpc.Option {
if r, ok := r.(*nullreader.NullReader); ok { if r, ok := r.(*nullreader.NullReader); ok {
return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil
} }
if r, ok := r.(*httpreader.HttpReader); ok && r.URL != "" {
return reflect.ValueOf(ReaderStream{Type: HttpUrl, Info: r.URL}), nil
}
reqID := uuid.New() reqID := uuid.New()
u, err := url.Parse(addr) u, err := url.Parse(addr)
@ -413,13 +418,16 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) {
return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err) return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err)
} }
if rs.Type == Null { switch rs.Type {
case Null:
n, err := strconv.ParseInt(rs.Info, 10, 64) n, err := strconv.ParseInt(rs.Info, 10, 64)
if err != nil { if err != nil {
return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err) return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err)
} }
return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil
case HttpUrl:
return reflect.ValueOf(&httpreader.HttpReader{URL: rs.Info}), nil
} }
u, err := uuid.Parse(rs.Info) u, err := uuid.Parse(rs.Info)