diff --git a/node/impl/client/client.go b/node/impl/client/client.go index a32f13d06..e72ca7eae 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -910,7 +910,34 @@ func (a *API) ClientRetrieveWait(ctx context.Context, deal rm.DealID) error { } } +type ExportDest struct { + Writer io.Writer + Path string +} + +func (ed *ExportDest) doWrite(cb func(io.Writer) error) error { + if ed.Writer != nil { + return cb(ed.Writer) + } + + f, err := os.OpenFile(ed.Path, os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + + if err := cb(f); err != nil { + _ = f.Close() + return err + } + + return f.Close() +} + func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api.FileRef) error { + return a.ClientExportInto(ctx, exportRef, ref.IsCAR, ExportDest{Path: ref.Path}) +} + +func (a *API) ClientExportInto(ctx context.Context, exportRef api.ExportRef, car bool, dest ExportDest) error { proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor) carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) carPath := exportRef.FromLocalCAR @@ -944,29 +971,24 @@ func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api } // Are we outputting a CAR? - if ref.IsCAR { + if car { // not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in - if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 { - return carv2.ExtractV1File(carPath, ref.Path) + if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 && dest.Writer == nil { + return carv2.ExtractV1File(carPath, dest.Path) } - return a.outputCAR(ctx, roots, retrievalBs, ref) + return a.outputCAR(ctx, roots, retrievalBs, dest) } if len(roots) != 1 { return xerrors.Errorf("unixfs retrieval requires one root node, got %d", len(roots)) } - return a.outputUnixFS(ctx, roots[0].root, dserv, ref) + return a.outputUnixFS(ctx, roots[0].root, dserv, dest) } -func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, ref api.FileRef) error { +func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, dest ExportDest) error { // generating a CARv1 from the configured blockstore - f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) - if err != nil { - return err - } - carDags := make([]car.Dag, len(dags)) for i, dag := range dags { carDags[i] = car.Dag{ @@ -975,17 +997,14 @@ func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstor } } - err = car.NewSelectiveCar( - ctx, - bs, - carDags, - car.MaxTraversalLinks(config.MaxTraversalLinks), - ).Write(f) - if err != nil { - return err - } - - return f.Close() + return dest.doWrite(func(w io.Writer) error { + return car.NewSelectiveCar( + ctx, + bs, + carDags, + car.MaxTraversalLinks(config.MaxTraversalLinks), + ).Write(w) + }) } type dagSpec struct { @@ -1069,7 +1088,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma return out, nil } -func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, ref api.FileRef) error { +func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, dest ExportDest) error { nd, err := ds.Get(ctx, root) if err != nil { return xerrors.Errorf("ClientRetrieve: %w", err) @@ -1079,7 +1098,20 @@ func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGServi return xerrors.Errorf("ClientRetrieve: %w", err) } - return files.WriteTo(file, ref.Path) + if dest.Writer == nil { + return files.WriteTo(file, dest.Path) + } + + switch f := file.(type) { + case files.File: + _, err = io.Copy(dest.Writer, f) + if err != nil { + return err + } + return nil + default: + return fmt.Errorf("file type %T is not supported", nd) + } } func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) { diff --git a/node/rpc.go b/node/rpc.go index 9bcdb7388..6a3e55115 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -27,6 +27,7 @@ import ( "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" "github.com/filecoin-project/lotus/node/impl" + "github.com/filecoin-project/lotus/node/impl/client" ) var rpclog = logging.Logger("rpc") @@ -89,14 +90,22 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server // Import handler handleImportFunc := handleImport(a.(*impl.FullNodeAPI)) + handleExportFunc := handleExport(a.(*impl.FullNodeAPI)) if permissioned { importAH := &auth.Handler{ Verify: a.AuthVerify, Next: handleImportFunc, } m.Handle("/rest/v0/import", importAH) + + exportAH := &auth.Handler{ + Verify: a.AuthVerify, + Next: handleExportFunc, + } + m.Handle("/rest/v0/export", exportAH) } else { m.HandleFunc("/rest/v0/import", handleImportFunc) + m.HandleFunc("/rest/v0/export", handleExportFunc) } // debugging @@ -169,6 +178,34 @@ func handleImport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Reque } } +func handleExport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + w.WriteHeader(404) + return + } + if !auth.HasPerm(r.Context(), nil, api.PermWrite) { + w.WriteHeader(401) + _ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing write permission"}) + return + } + + var eref api.ExportRef + if err := json.Unmarshal([]byte(r.FormValue("export")), &eref); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + car := r.FormValue("car") == "true" + + err := a.ClientExportInto(r.Context(), eref, car, client.ExportDest{Writer: w}) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + } +} + func handleFractionOpt(name string, setter func(int)) http.HandlerFunc { return func(rw http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost {