retrieval: Support retrievals into remote stores

This commit is contained in:
Łukasz Magiera 2022-10-28 16:56:22 +01:00
parent 1577740bc4
commit 2c89b3240f
11 changed files with 174 additions and 14 deletions

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
@ -1012,8 +1013,12 @@ type RetrievalOrder struct {
Client address.Address Client address.Address
Miner address.Address Miner address.Address
MinerPeer *retrievalmarket.RetrievalPeer MinerPeer *retrievalmarket.RetrievalPeer
RemoteStore *RemoteStoreID `json:"RemoteStore,omitempty"`
} }
type RemoteStoreID = uuid.UUID
type InvocResult struct { type InvocResult struct {
MsgCid cid.Cid MsgCid cid.Cid
Msg *types.Message Msg *types.Message

View File

@ -361,6 +361,7 @@ func init() {
Headers: nil, Headers: nil,
}, },
}) })
addExample(&uuid.UUID{})
} }
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) { func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

Binary file not shown.

View File

@ -337,23 +337,16 @@ Examples:
}, },
} }
func ClientExportStream(apiAddr string, apiAuth http.Header, eref lapi.ExportRef, car bool) (io.ReadCloser, error) { func ApiAddrToUrl(apiAddr string) (*url.URL, error) {
rj, err := json.Marshal(eref)
if err != nil {
return nil, xerrors.Errorf("marshaling export ref: %w", err)
}
ma, err := multiaddr.NewMultiaddr(apiAddr) ma, err := multiaddr.NewMultiaddr(apiAddr)
if err == nil { if err == nil {
_, addr, err := manet.DialArgs(ma) _, addr, err := manet.DialArgs(ma)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// todo: make cliutil helpers for this // todo: make cliutil helpers for this
apiAddr = "http://" + addr apiAddr = "http://" + addr
} }
aa, err := url.Parse(apiAddr) aa, err := url.Parse(apiAddr)
if err != nil { if err != nil {
return nil, xerrors.Errorf("parsing api address: %w", err) return nil, xerrors.Errorf("parsing api address: %w", err)
@ -365,6 +358,20 @@ func ClientExportStream(apiAddr string, apiAuth http.Header, eref lapi.ExportRef
aa.Scheme = "https" aa.Scheme = "https"
} }
return aa, nil
}
func ClientExportStream(apiAddr string, apiAuth http.Header, eref lapi.ExportRef, car bool) (io.ReadCloser, error) {
rj, err := json.Marshal(eref)
if err != nil {
return nil, xerrors.Errorf("marshaling export ref: %w", err)
}
aa, err := ApiAddrToUrl(apiAddr)
if err != nil {
return nil, err
}
aa.Path = path.Join(aa.Path, "rest/v0/export") aa.Path = path.Join(aa.Path, "rest/v0/export")
req, err := http.NewRequest("GET", fmt.Sprintf("%s?car=%t&export=%s", aa, car, url.QueryEscape(string(rj))), nil) req, err := http.NewRequest("GET", fmt.Sprintf("%s?car=%t&export=%s", aa, car, url.QueryEscape(string(rj))), nil)
if err != nil { if err != nil {
@ -583,6 +590,7 @@ var clientRetrieveLsCmd = &cli.Command{
dserv, dserv,
roots[0], roots[0],
sel, sel,
nil,
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch { if r == traversal.VisitReason_SelectionMatch {
fmt.Println(p.Path) fmt.Println(p.Path)

View File

@ -1992,7 +1992,8 @@ Inputs:
"Address": "f01234", "Address": "f01234",
"ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf", "ID": "12D3KooWGzxzKZYveHXtpG6AsrUJBcWxHBFS2HsEoGTxrMLvKXtf",
"PieceCID": null "PieceCID": null
} },
"RemoteStore": "00000000-0000-0000-0000-000000000000"
} }
] ]
``` ```

View File

@ -8,8 +8,12 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore" bstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipld/go-car/v2/blockstore" "github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/retrievalmarket" "github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/lotus/api"
lbstore "github.com/filecoin-project/lotus/blockstore"
) )
// ProxyBlockstoreAccessor is an accessor that returns a fixed blockstore. // ProxyBlockstoreAccessor is an accessor that returns a fixed blockstore.
@ -32,6 +36,68 @@ func (p *ProxyBlockstoreAccessor) Done(_ retrievalmarket.DealID) error {
return nil return nil
} }
func NewAPIBlockstoreAdapter(sub retrievalmarket.BlockstoreAccessor) *APIBlockstoreAccessor {
return &APIBlockstoreAccessor{
sub: sub,
retrStores: map[retrievalmarket.DealID]api.RemoteStoreID{},
remoteStores: map[api.RemoteStoreID]bstore.Blockstore{},
}
}
// APIBlockstoreAccessor adds support to API-specified remote blockstores
type APIBlockstoreAccessor struct {
sub retrievalmarket.BlockstoreAccessor
retrStores map[retrievalmarket.DealID]api.RemoteStoreID
remoteStores map[api.RemoteStoreID]bstore.Blockstore
}
func (a *APIBlockstoreAccessor) Get(id retrievalmarket.DealID, payloadCID retrievalmarket.PayloadCID) (bstore.Blockstore, error) {
as, has := a.retrStores[id]
if !has {
return a.sub.Get(id, payloadCID)
}
return a.remoteStores[as], nil
}
func (a *APIBlockstoreAccessor) Done(id retrievalmarket.DealID) error {
if _, has := a.retrStores[id]; has {
delete(a.retrStores, id)
return nil
}
return a.sub.Done(id)
}
func (a *APIBlockstoreAccessor) UseRetrievalStore(id retrievalmarket.DealID, sid api.RemoteStoreID) error {
if _, has := a.retrStores[id]; has {
return xerrors.Errorf("apistore for deal %d already registered", id)
}
if _, has := a.remoteStores[sid]; !has {
return xerrors.Errorf("remote store not found")
}
a.retrStores[id] = sid
return nil
}
func (a *APIBlockstoreAccessor) RegisterApiStore(sid api.RemoteStoreID, st *lbstore.NetworkStore) error {
if _, has := a.remoteStores[sid]; has {
return xerrors.Errorf("remote store already registered with this uuid")
}
a.remoteStores[sid] = st
st.OnClose(func() {
if _, has := a.remoteStores[sid]; has {
delete(a.remoteStores, sid)
}
})
return nil
}
var _ retrievalmarket.BlockstoreAccessor = &APIBlockstoreAccessor{}
type CARBlockstoreAccessor struct { type CARBlockstoreAccessor struct {
rootdir string rootdir string
lk sync.Mutex lk sync.Mutex

View File

@ -26,6 +26,7 @@ func TraverseDag(
ds mdagipld.DAGService, ds mdagipld.DAGService,
startFrom cid.Cid, startFrom cid.Cid,
optionalSelector ipld.Node, optionalSelector ipld.Node,
onOpen func(node mdagipld.Node) error,
visitCallback traversal.AdvVisitFn, visitCallback traversal.AdvVisitFn,
) error { ) error {
@ -61,6 +62,12 @@ func TraverseDag(
return nil, err return nil, err
} }
if onOpen != nil {
if err := onOpen(node); err != nil {
return nil, err
}
}
return bytes.NewBuffer(node.RawData()), nil return bytes.NewBuffer(node.RawData()), nil
} }
unixfsnode.AddUnixFSReificationToLinkSystem(&linkSystem) unixfsnode.AddUnixFSReificationToLinkSystem(&linkSystem)

View File

@ -29,6 +29,7 @@ import (
ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger" ledgerwallet "github.com/filecoin-project/lotus/chain/wallet/ledger"
"github.com/filecoin-project/lotus/chain/wallet/remotewallet" "github.com/filecoin-project/lotus/chain/wallet/remotewallet"
"github.com/filecoin-project/lotus/lib/peermgr" "github.com/filecoin-project/lotus/lib/peermgr"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/markets/storageadapter" "github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node/config" "github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/hello"
@ -129,6 +130,7 @@ var ChainNode = Options(
Override(new(*market.FundManager), market.NewFundManager), Override(new(*market.FundManager), market.NewFundManager),
Override(new(dtypes.ClientDatastore), modules.NewClientDatastore), Override(new(dtypes.ClientDatastore), modules.NewClientDatastore),
Override(new(storagemarket.BlockstoreAccessor), modules.StorageBlockstoreAccessor), Override(new(storagemarket.BlockstoreAccessor), modules.StorageBlockstoreAccessor),
Override(new(*retrievaladapter.APIBlockstoreAccessor), retrievaladapter.NewAPIBlockstoreAdapter),
Override(new(storagemarket.StorageClient), modules.StorageClient), Override(new(storagemarket.StorageClient), modules.StorageClient),
Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter), Override(new(storagemarket.StorageClientNode), storageadapter.NewClientNodeAdapter),
Override(HandleMigrateClientFundsKey, modules.HandleMigrateClientFunds), Override(HandleMigrateClientFundsKey, modules.HandleMigrateClientFunds),

View File

@ -10,6 +10,7 @@ import (
"os" "os"
"sort" "sort"
"strings" "strings"
"sync"
"time" "time"
"github.com/ipfs/go-blockservice" "github.com/ipfs/go-blockservice"
@ -97,6 +98,7 @@ type API struct {
Imports dtypes.ClientImportMgr Imports dtypes.ClientImportMgr
StorageBlockstoreAccessor storagemarket.BlockstoreAccessor StorageBlockstoreAccessor storagemarket.BlockstoreAccessor
RtvlBlockstoreAccessor rm.BlockstoreAccessor RtvlBlockstoreAccessor rm.BlockstoreAccessor
ApiBlockstoreAccessor *retrievaladapter.APIBlockstoreAccessor
DataTransfer dtypes.ClientDataTransfer DataTransfer dtypes.ClientDataTransfer
Host host.Host Host host.Host
@ -845,6 +847,13 @@ func (a *API) doRetrieval(ctx context.Context, order api.RetrievalOrder, sel dat
} }
id := a.Retrieval.NextID() id := a.Retrieval.NextID()
if order.RemoteStore != nil {
if err := a.ApiBlockstoreAccessor.UseRetrievalStore(id, *order.RemoteStore); err != nil {
return 0, xerrors.Errorf("registering api store: %w", err)
}
}
id, err = a.Retrieval.Retrieve( id, err = a.Retrieval.Retrieve(
ctx, ctx,
id, id,
@ -999,6 +1008,8 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo
roots[i] = dag.root roots[i] = dag.root
} }
var lk sync.Mutex
return dest.doWrite(func(w io.Writer) error { return dest.doWrite(func(w io.Writer) error {
if err := car.WriteHeader(&car.CarHeader{ if err := car.WriteHeader(&car.CarHeader{
@ -1016,8 +1027,21 @@ func (a *API) outputCAR(ctx context.Context, ds format.DAGService, bs bstore.Blo
ds, ds,
root, root,
dagSpec.selector, dagSpec.selector,
func(node format.Node) error {
if dagSpec.exportAll {
lk.Lock()
defer lk.Unlock()
if cs.Visit(node.Cid()) {
err := util.LdWrite(w, node.Cid().Bytes(), node.RawData())
if err != nil {
return xerrors.Errorf("writing block data: %w", err)
}
}
}
return nil
},
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch { if !dagSpec.exportAll && r == traversal.VisitReason_SelectionMatch {
var c cid.Cid var c cid.Cid
if p.LastBlock.Link == nil { if p.LastBlock.Link == nil {
c = root c = root
@ -1084,6 +1108,7 @@ func (a *API) outputUnixFS(ctx context.Context, root cid.Cid, ds format.DAGServi
type dagSpec struct { type dagSpec struct {
root cid.Cid root cid.Cid
selector ipld.Node selector ipld.Node
exportAll bool
} }
func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, car bool) ([]dagSpec, error) { func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds format.DAGService, car bool) ([]dagSpec, error) {
@ -1098,6 +1123,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
out := make([]dagSpec, len(dsp)) out := make([]dagSpec, len(dsp))
for i, spec := range dsp { for i, spec := range dsp {
out[i].exportAll = spec.ExportMerkleProof
if spec.DataSelector == nil { if spec.DataSelector == nil {
return nil, xerrors.Errorf("invalid DagSpec at position %d: `DataSelector` can not be nil", i) return nil, xerrors.Errorf("invalid DagSpec at position %d: `DataSelector` can not be nil", i)
@ -1131,6 +1157,7 @@ func parseDagSpec(ctx context.Context, root cid.Cid, dsp []api.DagSpec, ds forma
ds, ds,
root, root,
rsn, rsn,
nil,
func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error { func(p traversal.Progress, n ipld.Node, r traversal.VisitReason) error {
if r == traversal.VisitReason_SelectionMatch { if r == traversal.VisitReason_SelectionMatch {
if !car && p.LastBlock.Path.String() != p.Path.String() { if !car && p.LastBlock.Path.String() != p.Path.String() {

View File

@ -202,9 +202,9 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT
// RetrievalClient creates a new retrieval client attached to the client blockstore // RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, func RetrievalClient(forceOffChain bool) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor *retrievaladapter.APIBlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor retrievalmarket.BlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) { ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, accessor *retrievaladapter.APIBlockstoreAccessor, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI) adapter := retrievaladapter.NewRetrievalClientNode(forceOffChain, payAPI, chainAPI, stateAPI)
network := rmnet.NewFromLibp2pHost(h) network := rmnet.NewFromLibp2pHost(h)
ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client")) ds = namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))

View File

@ -3,13 +3,16 @@ package node
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"net" "net"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"runtime" "runtime"
"strconv" "strconv"
"github.com/google/uuid"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
@ -23,6 +26,7 @@ import (
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/api/v0api" "github.com/filecoin-project/lotus/api/v0api"
"github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/api/v1api"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/lib/rpcenc" "github.com/filecoin-project/lotus/lib/rpcenc"
"github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy" "github.com/filecoin-project/lotus/metrics/proxy"
@ -92,6 +96,7 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
// Import handler // Import handler
handleImportFunc := handleImport(a.(*impl.FullNodeAPI)) handleImportFunc := handleImport(a.(*impl.FullNodeAPI))
handleExportFunc := handleExport(a.(*impl.FullNodeAPI)) handleExportFunc := handleExport(a.(*impl.FullNodeAPI))
handleRemoteStoreFunc := handleRemoteStore(a.(*impl.FullNodeAPI))
if permissioned { if permissioned {
importAH := &auth.Handler{ importAH := &auth.Handler{
Verify: a.AuthVerify, Verify: a.AuthVerify,
@ -104,9 +109,16 @@ func FullNodeHandler(a v1api.FullNode, permissioned bool, opts ...jsonrpc.Server
Next: handleExportFunc, Next: handleExportFunc,
} }
m.Handle("/rest/v0/export", exportAH) m.Handle("/rest/v0/export", exportAH)
storeAH := &auth.Handler{
Verify: a.AuthVerify,
Next: handleRemoteStoreFunc,
}
m.Handle("/rest/v0/store/{uuid}", storeAH)
} else { } else {
m.HandleFunc("/rest/v0/import", handleImportFunc) m.HandleFunc("/rest/v0/import", handleImportFunc)
m.HandleFunc("/rest/v0/export", handleExportFunc) m.HandleFunc("/rest/v0/export", handleExportFunc)
m.HandleFunc("/rest/v0/store/{uuid}", handleRemoteStoreFunc)
} }
// debugging // debugging
@ -256,3 +268,34 @@ func handleFractionOpt(name string, setter func(int)) http.HandlerFunc {
setter(fr) setter(fr)
} }
} }
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func handleRemoteStore(a *impl.FullNodeAPI) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, err := uuid.Parse(vars["uuid"])
if err != nil {
http.Error(w, fmt.Sprintf("parse uuid: %s", err), http.StatusBadRequest)
return
}
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Error(err)
w.WriteHeader(500)
return
}
nstore := bstore.NewNetworkStoreWS(c)
if err := a.ApiBlockstoreAccessor.RegisterApiStore(id, nstore); err != nil {
log.Errorw("registering api bstore", "error", err)
_ = c.Close()
return
}
}
}