retrieval: REST export endpoint

This commit is contained in:
Łukasz Magiera 2021-11-11 17:17:11 +01:00
parent b83a9b902a
commit 450d0687da
2 changed files with 93 additions and 24 deletions

View File

@ -910,7 +910,34 @@ func (a *API) ClientRetrieveWait(ctx context.Context, deal rm.DealID) error {
} }
} }
type ExportDest struct {
Writer io.Writer
Path string
}
func (ed *ExportDest) doWrite(cb func(io.Writer) error) error {
if ed.Writer != nil {
return cb(ed.Writer)
}
f, err := os.OpenFile(ed.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
if err := cb(f); err != nil {
_ = f.Close()
return err
}
return f.Close()
}
func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api.FileRef) error { func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api.FileRef) error {
return a.ClientExportInto(ctx, exportRef, ref.IsCAR, ExportDest{Path: ref.Path})
}
func (a *API) ClientExportInto(ctx context.Context, exportRef api.ExportRef, car bool, dest ExportDest) error {
proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor) proxyBss, retrieveIntoIPFS := a.RtvlBlockstoreAccessor.(*retrievaladapter.ProxyBlockstoreAccessor)
carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor) carBss, retrieveIntoCAR := a.RtvlBlockstoreAccessor.(*retrievaladapter.CARBlockstoreAccessor)
carPath := exportRef.FromLocalCAR carPath := exportRef.FromLocalCAR
@ -944,29 +971,24 @@ func (a *API) ClientExport(ctx context.Context, exportRef api.ExportRef, ref api
} }
// Are we outputting a CAR? // Are we outputting a CAR?
if ref.IsCAR { if car {
// not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in // not IPFS and we do full selection - just extract the CARv1 from the CARv2 we stored the retrieval in
if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 { if !retrieveIntoIPFS && len(exportRef.DAGs) == 0 && dest.Writer == nil {
return carv2.ExtractV1File(carPath, ref.Path) return carv2.ExtractV1File(carPath, dest.Path)
} }
return a.outputCAR(ctx, roots, retrievalBs, ref) return a.outputCAR(ctx, roots, retrievalBs, dest)
} }
if len(roots) != 1 { if len(roots) != 1 {
return xerrors.Errorf("unixfs retrieval requires one root node, got %d", len(roots)) return xerrors.Errorf("unixfs retrieval requires one root node, got %d", len(roots))
} }
return a.outputUnixFS(ctx, roots[0].root, dserv, ref) return a.outputUnixFS(ctx, roots[0].root, dserv, dest)
} }
func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, ref api.FileRef) error { func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstore, dest ExportDest) error {
// generating a CARv1 from the configured blockstore // generating a CARv1 from the configured blockstore
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
carDags := make([]car.Dag, len(dags)) carDags := make([]car.Dag, len(dags))
for i, dag := range dags { for i, dag := range dags {
carDags[i] = car.Dag{ carDags[i] = car.Dag{
@ -975,17 +997,14 @@ func (a *API) outputCAR(ctx context.Context, dags []dagSpec, bs bstore.Blockstor
} }
} }
err = car.NewSelectiveCar( return dest.doWrite(func(w io.Writer) error {
ctx, return car.NewSelectiveCar(
bs, ctx,
carDags, bs,
car.MaxTraversalLinks(config.MaxTraversalLinks), carDags,
).Write(f) car.MaxTraversalLinks(config.MaxTraversalLinks),
if err != nil { ).Write(w)
return err })
}
return f.Close()
} }
type dagSpec struct { type dagSpec struct {
@ -1069,7 +1088,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
return out, nil return out, nil
} }
func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, ref api.FileRef) error { func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGService, dest ExportDest) error {
nd, err := ds.Get(ctx, root) nd, err := ds.Get(ctx, root)
if err != nil { if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err) return xerrors.Errorf("ClientRetrieve: %w", err)
@ -1079,7 +1098,20 @@ func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGServi
return xerrors.Errorf("ClientRetrieve: %w", err) return xerrors.Errorf("ClientRetrieve: %w", err)
} }
return files.WriteTo(file, ref.Path) if dest.Writer == nil {
return files.WriteTo(file, dest.Path)
}
switch f := file.(type) {
case files.File:
_, err = io.Copy(dest.Writer, f)
if err != nil {
return err
}
return nil
default:
return fmt.Errorf("file type %T is not supported", nd)
}
} }
func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) { func (a *API) ClientListRetrievals(ctx context.Context) ([]api.RetrievalInfo, error) {

View File

@ -27,6 +27,7 @@ import (
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy" "github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/impl"
"github.com/filecoin-project/lotus/node/impl/client"
) )
var rpclog = logging.Logger("rpc") var rpclog = logging.Logger("rpc")
@ -89,14 +90,22 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
// Import handler // Import handler
handleImportFunc := handleImport(a.(*impl.FullNodeAPI)) handleImportFunc := handleImport(a.(*impl.FullNodeAPI))
handleExportFunc := handleExport(a.(*impl.FullNodeAPI))
if permissioned { if permissioned {
importAH := &auth.Handler{ importAH := &auth.Handler{
Verify: a.AuthVerify, Verify: a.AuthVerify,
Next: handleImportFunc, Next: handleImportFunc,
} }
m.Handle("/rest/v0/import", importAH) m.Handle("/rest/v0/import", importAH)
exportAH := &auth.Handler{
Verify: a.AuthVerify,
Next: handleExportFunc,
}
m.Handle("/rest/v0/export", exportAH)
} else { } else {
m.HandleFunc("/rest/v0/import", handleImportFunc) m.HandleFunc("/rest/v0/import", handleImportFunc)
m.HandleFunc("/rest/v0/export", handleExportFunc)
} }
// debugging // debugging
@ -169,6 +178,34 @@ func handleImport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Reque
} }
} }
func handleExport(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
w.WriteHeader(404)
return
}
if !auth.HasPerm(r.Context(), nil, api.PermWrite) {
w.WriteHeader(401)
_ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing write permission"})
return
}
var eref api.ExportRef
if err := json.Unmarshal([]byte(r.FormValue("export")), &eref); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
car := r.FormValue("car") == "true"
err := a.ClientExportInto(r.Context(), eref, car, client.ExportDest{Writer: w})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func handleFractionOpt(name string, setter func(int)) http.HandlerFunc { func handleFractionOpt(name string, setter func(int)) http.HandlerFunc {
return func(rw http.ResponseWriter, r *http.Request) { return func(rw http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost { if r.Method != http.MethodPost {