Merge pull request #1843 from jsign/jsign/2stackedprs

Allow remote IPFS node support & IPFS for data retrieval & automatic env wiring
This commit is contained in:
Łukasz Magiera 2020-05-27 14:59:18 +02:00 committed by GitHub
commit af1d54501f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 68 additions and 18 deletions

View File

@ -117,7 +117,7 @@ type FullNode interface {
ClientListDeals(ctx context.Context) ([]DealInfo, error)
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error)
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref FileRef) error
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref *FileRef) error
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error)
ClientCalcCommP(ctx context.Context, inpath string, miner address.Address) (*CommPRet, error)
ClientGenCar(ctx context.Context, ref FileRef, outpath string) error

View File

@ -110,7 +110,7 @@ type FullNodeStruct struct {
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref api.FileRef) error `perm:"admin"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
ClientCalcCommP func(ctx context.Context, inpath string, miner address.Address) (*api.CommPRet, error) `perm:"read"`
ClientGenCar func(ctx context.Context, ref api.FileRef, outpath string) error `perm:"write"`
@ -321,7 +321,7 @@ func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]api.DealInfo, e
return c.Internal.ClientListDeals(ctx)
}
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref api.FileRef) error {
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
return c.Internal.ClientRetrieve(ctx, order, ref)
}

View File

@ -200,7 +200,7 @@ func testRetrieval(t *testing.T, ctx context.Context, err error, client *impl.Fu
t.Fatal(err)
}
ref := api.FileRef{
ref := &api.FileRef{
Path: filepath.Join(rpath, "ret"),
IsCAR: carExport,
}

View File

@ -384,7 +384,7 @@ var clientRetrieveCmd = &cli.Command{
return nil
}
ref := lapi.FileRef{
ref := &lapi.FileRef{
Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"),
}

1
go.mod
View File

@ -68,6 +68,7 @@ require (
github.com/ipfs/interface-go-ipfs-core v0.2.3
github.com/ipld/go-car v0.1.1-0.20200430185908-8ff2e52a4c88
github.com/ipld/go-ipld-prime v0.0.2-0.20200428162820-8b59dc292b8e
github.com/kelseyhightower/envconfig v1.4.0
github.com/lib/pq v1.2.0
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.9.2

2
go.sum
View File

@ -482,6 +482,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/kabukky/httpscerts v0.0.0-20150320125433-617593d7dcb3 h1:Iy7Ifq2ysilWU4QlCx/97OoI4xT1IV7i8byT/EyIT/M=
github.com/kabukky/httpscerts v0.0.0-20150320125433-617593d7dcb3/go.mod h1:BYpt4ufZiIGv2nXn4gMxnfKV306n3mWXgNu/d2TqdTU=
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d/go.mod h1:P2viExyCEfeWGU259JnaQ34Inuec4R38JCyBx2edgD0=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=

View File

@ -7,6 +7,7 @@ import (
"golang.org/x/xerrors"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
blocks "github.com/ipfs/go-block-format"
@ -35,6 +36,18 @@ func NewIpfsBstore(ctx context.Context) (*IpfsBstore, error) {
}, nil
}
func NewRemoteIpfsBstore(ctx context.Context, maddr multiaddr.Multiaddr) (*IpfsBstore, error) {
api, err := httpapi.NewApi(maddr)
if err != nil {
return nil, xerrors.Errorf("getting remote ipfs api: %w", err)
}
return &IpfsBstore{
ctx: ctx,
api: api,
}, nil
}
func (i *IpfsBstore) DeleteBlock(cid cid.Cid) error {
return xerrors.Errorf("not supported")
}

View File

@ -378,12 +378,13 @@ func ConfigFullNode(c interface{}) Option {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
}
ipfsMaddr := cfg.Client.IpfsMAddr
useForRetrieval := cfg.Client.IpfsUseForRetrieval
return Options(
ConfigCommon(&cfg.Common),
If(cfg.Client.UseIpfs,
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore),
Override(new(dtypes.ClientBlockstore), modules.IpfsClientBlockstore(ipfsMaddr, useForRetrieval)),
),
If(cfg.Metrics.HeadNotifs,
Override(HeadMetricsKey, metrics.SendHeadNotifs(cfg.Metrics.Nickname)),
),

View File

@ -62,7 +62,9 @@ type Metrics struct {
}
type Client struct {
UseIpfs bool
UseIpfs bool
IpfsMAddr string
IpfsUseForRetrieval bool
}
func defCommon() Common {

View File

@ -2,10 +2,12 @@ package config
import (
"bytes"
"fmt"
"io"
"os"
"github.com/BurntSushi/toml"
"github.com/kelseyhightower/envconfig"
"golang.org/x/xerrors"
)
@ -32,6 +34,11 @@ func FromReader(reader io.Reader, def interface{}) (interface{}, error) {
return nil, err
}
err = envconfig.Process("LOTUS", cfg)
if err != nil {
return nil, fmt.Errorf("processing env vars overrides: %s", err)
}
return cfg, nil
}

View File

@ -302,7 +302,7 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
}
}
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref api.FileRef) error {
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
if order.MinerPeerID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil {
@ -353,6 +353,11 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
unsubscribe()
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {
return nil
}
if ref.IsCAR {
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {

View File

@ -6,6 +6,7 @@ import (
"github.com/ipfs/go-filestore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/multiformats/go-multiaddr"
"github.com/filecoin-project/lotus/lib/bufbstore"
"github.com/filecoin-project/lotus/lib/ipfsbstore"
@ -13,14 +14,32 @@ import (
"github.com/filecoin-project/lotus/node/modules/helpers"
)
func IpfsClientBlockstore(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) {
ipfsbs, err := ipfsbstore.NewIpfsBstore(helpers.LifecycleCtx(mctx, lc))
if err != nil {
return nil, xerrors.Errorf("constructing ipfs blockstore: %w", err)
// IpfsClientBlockstore returns a ClientBlockstore implementation backed by an IPFS node.
// If ipfsMaddr is empty, a local IPFS node is assumed considering IPFS_PATH configuration.
// If ipfsMaddr is not empty, it will connect to the remote IPFS node with the provided multiaddress.
// The flag useForRetrieval indicates if the IPFS node will also be used for storing retrieving deals.
func IpfsClientBlockstore(ipfsMaddr string, useForRetrieval bool) func(helpers.MetricsCtx, fx.Lifecycle, dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, fstore dtypes.ClientFilestore) (dtypes.ClientBlockstore, error) {
var err error
var ipfsbs *ipfsbstore.IpfsBstore
if ipfsMaddr != "" {
var ma multiaddr.Multiaddr
ma, err = multiaddr.NewMultiaddr(ipfsMaddr)
if err != nil {
return nil, xerrors.Errorf("parsing ipfs multiaddr: %w", err)
}
ipfsbs, err = ipfsbstore.NewRemoteIpfsBstore(helpers.LifecycleCtx(mctx, lc), ma)
} else {
ipfsbs, err = ipfsbstore.NewIpfsBstore(helpers.LifecycleCtx(mctx, lc))
}
if err != nil {
return nil, xerrors.Errorf("constructing ipfs blockstore: %w", err)
}
var ws blockstore.Blockstore
ws = ipfsbs
if !useForRetrieval {
ws = blockstore.NewIdStore((*filestore.Filestore)(fstore))
}
return bufbstore.NewTieredBstore(ipfsbs, ws), nil
}
return bufbstore.NewTieredBstore(
ipfsbs,
blockstore.NewIdStore((*filestore.Filestore)(fstore)),
), nil
}