diff --git a/lib/httpreader/httpreader.go b/lib/httpreader/httpreader.go new file mode 100644 index 000000000..62338e76e --- /dev/null +++ b/lib/httpreader/httpreader.go @@ -0,0 +1,47 @@ +package httpreader + +import ( + "io" + "net/http" + + "golang.org/x/xerrors" +) + +// HttpReader is a reader which will read a http resource with a simple get request. +// Before first Read it will be passed over JsonRPC as a URL. +type HttpReader struct { + URL string + + reader io.ReadCloser +} + +func (h *HttpReader) Close() error { + h.URL = "" + if h.reader != nil { + return h.reader.Close() + } + return nil +} + +func (h *HttpReader) Read(p []byte) (n int, err error) { + if h.reader == nil { + res, err := http.Get(h.URL) + if err != nil { + return 0, err + } + if res.StatusCode != http.StatusOK { + return 0, xerrors.Errorf("unexpected http status %d", res.StatusCode) + } + + // mark the reader as reading + h.URL = "" + h.reader = res.Body + } + if h.reader == nil { + return 0, xerrors.Errorf("http reader closed") + } + + return h.reader.Read(p) +} + +var _ io.ReadCloser = &HttpReader{} diff --git a/lib/rpcenc/reader.go b/lib/rpcenc/reader.go index 4e3ebb8c2..0f268aadf 100644 --- a/lib/rpcenc/reader.go +++ b/lib/rpcenc/reader.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/filecoin-project/lotus/lib/httpreader" "io" "io/ioutil" "net/http" @@ -34,6 +35,7 @@ type StreamType string const ( Null StreamType = "null" PushStream StreamType = "push" + HttpUrl StreamType = "http" // TODO: Data transfer handoff to workers? ) @@ -105,6 +107,9 @@ func ReaderParamEncoder(addr string) jsonrpc.Option { if r, ok := r.(*nullreader.NullReader); ok { return reflect.ValueOf(ReaderStream{Type: Null, Info: fmt.Sprint(r.N)}), nil } + if r, ok := r.(*httpreader.HttpReader); ok && r.URL != "" { + return reflect.ValueOf(ReaderStream{Type: HttpUrl, Info: r.URL}), nil + } reqID := uuid.New() u, err := url.Parse(addr) @@ -413,13 +418,16 @@ func ReaderParamDecoder() (http.HandlerFunc, jsonrpc.ServerOption) { return reflect.Value{}, xerrors.Errorf("unmarshaling reader id: %w", err) } - if rs.Type == Null { + switch rs.Type { + case Null: n, err := strconv.ParseInt(rs.Info, 10, 64) if err != nil { return reflect.Value{}, xerrors.Errorf("parsing null byte count: %w", err) } return reflect.ValueOf(nullreader.NewNullReader(abi.UnpaddedPieceSize(n))), nil + case HttpUrl: + return reflect.ValueOf(&httpreader.HttpReader{URL: rs.Info}), nil } u, err := uuid.Parse(rs.Info)