diff --git a/api/api_full.go b/api/api_full.go index 320a20687..d4ed14cc6 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/google/uuid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" @@ -1012,8 +1013,12 @@ type RetrievalOrder struct { Client address.Address Miner address.Address MinerPeer *retrievalmarket.RetrievalPeer + + RemoteStore *RemoteStoreID `json:"RemoteStore,omitempty"` } +type RemoteStoreID = uuid.UUID + type InvocResult struct { MsgCid cid.Cid Msg *types.Message diff --git a/api/docgen/docgen.go b/api/docgen/docgen.go index 5bddef2ec..28c4e8b85 100644 --- a/api/docgen/docgen.go +++ b/api/docgen/docgen.go @@ -361,6 +361,7 @@ func init() { Headers: nil, }, }) + addExample(&uuid.UUID{}) } func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) { diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 978fbb730..bd7b14d30 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/cli/client_retr.go b/cli/client_retr.go index 23acf1f08..9aa893f3c 100644 --- a/cli/client_retr.go +++ b/cli/client_retr.go @@ -337,23 +337,16 @@ Examples: }, } -func ClientExportStream(apiAddr string, apiAuth http.Header, eref lapi.ExportRef, car bool) (io.ReadCloser, error) { - rj, err := json.Marshal(eref) - if err != nil { - return nil, xerrors.Errorf("marshaling export ref: %w", err) - } - +func ApiAddrToUrl(apiAddr string) (*url.URL, error) { ma, err := multiaddr.NewMultiaddr(apiAddr) if err == nil { _, addr, err := manet.DialArgs(ma) if err != nil { return nil, err } - // todo: make cliutil helpers for this apiAddr = "http://" + addr } - aa, err := url.Parse(apiAddr) if err != nil { return nil, xerrors.Errorf("parsing api address: %w", err) @@ -365,6 +358,20 @@ func ClientExportStream(apiAddr string, apiAuth http.Header, eref lapi.ExportRef aa.Scheme = "https" } + return aa, nil +} + +func ClientExportStream(apiAddr string, apiAuth http.Header, eref lapi.ExportRef, car bool) (io.ReadCloser, error) { + rj, err := json.Marshal(eref) + if err != nil { + return nil, xerrors.Errorf("marshaling export ref: %w", err) + } + + aa, err := ApiAddrToUrl(apiAddr) + if err != nil { + return nil, err + } + aa.Path = path.Join(aa.Path, "rest/v0/export") req, err := http.NewRequest("GET", fmt.Sprintf("%s?car=%t&export=%s", aa, car, url.QueryEscape(string(rj))), nil) if err != nil { @@ -583,6 +590,7 @@ var clientRetrieveLsCmd = &cli.Command{ dserv, roots[0], sel, + nil, func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { if r == traversal.VisitReason_SelectionMatch { fmt.Println(p.Path) diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 5f96eddd4..6d7c43149 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -1992,7 +1992,8 @@ Inputs: "Address": "f01234", "ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "PieceCID": null - } + }, + "RemoteStore": "00000000-0000-0000-0000-000000000000" } ] ``` diff --git a/markets/retrievaladapter/client_blockstore.go b/markets/retrievaladapter/client_blockstore.go index 84c75fdbd..35cfa387b 100644 --- a/markets/retrievaladapter/client_blockstore.go +++ b/markets/retrievaladapter/client_blockstore.go @@ -8,8 +8,12 @@ import ( "github.com/ipfs/go-cid" bstore "github.com/ipfs/go-ipfs-blockstore" "github.com/ipld/go-car/v2/blockstore" + "golang.org/x/xerrors" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + + "github.com/filecoin-project/lotus/api" + lbstore "github.com/filecoin-project/lotus/blockstore" ) // ProxyBlockstoreAccessor is an accessor that returns a fixed blockstore. @@ -32,6 +36,68 @@ func (p *ProxyBlockstoreAccessor) Done(_ retrievalmarket.DealID) error { return nil } +func NewAPIBlockstoreAdapter(sub retrievalmarket.BlockstoreAccessor) *APIBlockstoreAccessor { + return &APIBlockstoreAccessor{ + sub: sub, + retrStores: map[retrievalmarket.DealID]api.RemoteStoreID{}, + remoteStores: map[api.RemoteStoreID]bstore.Blockstore{}, + } +} + +// APIBlockstoreAccessor adds support to API-specified remote blockstores +type APIBlockstoreAccessor struct { + sub retrievalmarket.BlockstoreAccessor + + retrStores map[retrievalmarket.DealID]api.RemoteStoreID + remoteStores map[api.RemoteStoreID]bstore.Blockstore +} + +func (a *APIBlockstoreAccessor) Get(id retrievalmarket.DealID, payloadCID retrievalmarket.PayloadCID) (bstore.Blockstore, error) { + as, has := a.retrStores[id] + if !has { + return a.sub.Get(id, payloadCID) + } + + return a.remoteStores[as], nil +} + +func (a *APIBlockstoreAccessor) Done(id retrievalmarket.DealID) error { + if _, has := a.retrStores[id]; has { + delete(a.retrStores, id) + return nil + } + return a.sub.Done(id) +} + +func (a *APIBlockstoreAccessor) UseRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error { + if _, has := a.retrStores[id]; has { + return xerrors.Errorf("apistore for deal %d already registered", id) + } + if _, has := a.remoteStores[sid]; !has { + return xerrors.Errorf("remote store not found") + } + + a.retrStores[id] = sid + return nil +} + +func (a *APIBlockstoreAccessor) RegisterApiStore(sid api.RemoteStoreID, st *lbstore.NetworkStore) error { + if _, has := a.remoteStores[sid]; has { + return xerrors.Errorf("remote store already registered with this uuid") + } + + a.remoteStores[sid] = st + + st.OnClose(func() { + if _, has := a.remoteStores[sid]; has { + delete(a.remoteStores, sid) + } + }) + return nil +} + +var _ retrievalmarket.BlockstoreAccessor = &APIBlockstoreAccessor{} + type CARBlockstoreAccessor struct { rootdir string lk sync.Mutex diff --git a/markets/utils/selectors.go b/markets/utils/selectors.go index e1009d1ff..1b8a62401 100644 --- a/markets/utils/selectors.go +++ b/markets/utils/selectors.go @@ -26,6 +26,7 @@ func TraverseDag( ds mdagipld.DAGService, startFrom cid.Cid, optionalSelector ipld.Node, + onOpen func(node mdagipld.Node) error, visitCallback traversal.AdvVisitFn, ) error { @@ -61,6 +62,12 @@ func TraverseDag( return nil, err } + if onOpen != nil { + if err := onOpen(node); err != nil { + return nil, err + } + } + return bytes.NewBuffer(node.RawData()), nil } unixfsnode.AddUnixFSReificationToLinkSystem(&linkSystem) diff --git a/node/builder_chain.go b/node/builder_chain.go index 7a96e163c..6dbd542d6 100644 --- a/node/builder_chain.go +++ b/node/builder_chain.go @@ -29,6 +29,7 @@ import ( ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger" "github.com/filecoin-project/lotus/chain/wallet/remotewallet" "github.com/filecoin-project/lotus/lib/peermgr" + "github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/hello" @@ -129,6 +130,7 @@ var ChainNode = Options( Override(new(*market.FundManager), market.NewFundManager), Override(new(dtypes.ClientDatastore), modules.NewClientDatastore), Override(new(storagemarket.BlockstoreAccessor), modules.StorageBlockstoreAccessor), + Override(new(*retrievaladapter.APIBlockstoreAccessor), retrievaladapter.NewAPIBlockstoreAdapter), Override(new(storagemarket.StorageClient), modules.StorageClient), Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter), Override(HandleMigrateClientFundsKey, modules.HandleMigrateClientFunds), diff --git a/node/impl/client/client.go b/node/impl/client/client.go index f69691e41..7bfa41496 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -10,6 +10,7 @@ import ( "os" "sort" "strings" + "sync" "time" "github.com/ipfs/go-blockservice" @@ -97,6 +98,7 @@ type API struct { Imports dtypes.ClientImportMgr StorageBlockstoreAccessor storagemarket.BlockstoreAccessor RtvlBlockstoreAccessor rm.BlockstoreAccessor + ApiBlockstoreAccessor *retrievaladapter.APIBlockstoreAccessor DataTransfer dtypes.ClientDataTransfer Host host.Host @@ -845,6 +847,13 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat } id := a.Retrieval.NextID() + + if order.RemoteStore != nil { + if err := a.ApiBlockstoreAccessor.UseRetrievalStore(id, *order.RemoteStore); err != nil { + return 0, xerrors.Errorf("registering api store: %w", err) + } + } + id, err = a.Retrieval.Retrieve( ctx, id, @@ -999,6 +1008,8 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo roots[i] = dag.root } + var lk sync.Mutex + return dest.doWrite(func(w io.Writer) error { if err := car.WriteHeader(&car.CarHeader{ @@ -1016,8 +1027,21 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo ds, root, dagSpec.selector, + func(node format.Node) error { + if dagSpec.exportAll { + lk.Lock() + defer lk.Unlock() + if cs.Visit(node.Cid()) { + err := util.LdWrite(w, node.Cid().Bytes(), node.RawData()) + if err != nil { + return xerrors.Errorf("writing block data: %w", err) + } + } + } + return nil + }, func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { - if r == traversal.VisitReason_SelectionMatch { + if !dagSpec.exportAll && r == traversal.VisitReason_SelectionMatch { var c cid.Cid if p.LastBlock.Link == nil { c = root @@ -1082,8 +1106,9 @@ func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGServi } type dagSpec struct { - root cid.Cid - selector ipld.Node + root cid.Cid + selector ipld.Node + exportAll bool } func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, car bool) ([]dagSpec, error) { @@ -1098,6 +1123,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma out := make([]dagSpec, len(dsp)) for i, spec := range dsp { + out[i].exportAll = spec.ExportMerkleProof if spec.DataSelector == nil { return nil, xerrors.Errorf("invalid DagSpec at position %d: `DataSelector` can not be nil", i) @@ -1131,6 +1157,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma ds, root, rsn, + nil, func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { if r == traversal.VisitReason_SelectionMatch { if !car && p.LastBlock.Path.String() != p.Path.String() { diff --git a/node/modules/client.go b/node/modules/client.go index 22fcbb00d..69f8db559 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -202,9 +202,9 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT // RetrievalClient creates a new retrieval client attached to the client blockstore func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, - ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { + ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor *retrievaladapter.APIBlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, - ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { + ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor *retrievaladapter.APIBlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI) network := rmnet.NewFromLibp2pHost(h) ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client")) diff --git a/node/rpc.go b/node/rpc.go index 2c85c71be..96a81a383 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -3,13 +3,16 @@ package node import ( "context" "encoding/json" + "fmt" "net" "net/http" _ "net/http/pprof" "runtime" "strconv" + "github.com/google/uuid" "github.com/gorilla/mux" + "github.com/gorilla/websocket" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/multiformats/go-multiaddr" @@ -23,6 +26,7 @@ import ( "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v1api" + bstore "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/lib/rpcenc" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" @@ -92,6 +96,7 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server // Import handler handleImportFunc := handleImport(a.(*impl.FullNodeAPI)) handleExportFunc := handleExport(a.(*impl.FullNodeAPI)) + handleRemoteStoreFunc := handleRemoteStore(a.(*impl.FullNodeAPI)) if permissioned { importAH := &auth.Handler{ Verify: a.AuthVerify, @@ -104,9 +109,16 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server Next: handleExportFunc, } m.Handle("/rest/v0/export", exportAH) + + storeAH := &auth.Handler{ + Verify: a.AuthVerify, + Next: handleRemoteStoreFunc, + } + m.Handle("/rest/v0/store/{uuid}", storeAH) } else { m.HandleFunc("/rest/v0/import", handleImportFunc) m.HandleFunc("/rest/v0/export", handleExportFunc) + m.HandleFunc("/rest/v0/store/{uuid}", handleRemoteStoreFunc) } // debugging @@ -256,3 +268,34 @@ func handleFractionOpt(name string, setter func(int)) http.HandlerFunc { setter(fr) } } + +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, +} + +func handleRemoteStore(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + id, err := uuid.Parse(vars["uuid"]) + if err != nil { + http.Error(w, fmt.Sprintf("parse uuid: %s", err), http.StatusBadRequest) + return + } + + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Error(err) + w.WriteHeader(500) + return + } + + nstore := bstore.NewNetworkStoreWS(c) + if err := a.ApiBlockstoreAccessor.RegisterApiStore(id, nstore); err != nil { + log.Errorw("registering api bstore", "error", err) + _ = c.Close() + return + } + } +}