Merge pull request #1317 from filecoin-project/feat/car-import

Add ability to import CAR files and retrieve to a CAR file
This commit is contained in:
Whyrusleeping 2020-03-06 13:27:17 -08:00 committed by GitHub
commit 8a6b3cff57
8 changed files with 119 additions and 32 deletions

View File

@ -89,13 +89,13 @@ type FullNode interface {
// Other
// ClientImport imports file under the specified path into filestore
ClientImport(ctx context.Context, path string) (cid.Cid, error)
ClientImport(ctx context.Context, ref FileRef) (cid.Cid, error)
ClientStartDeal(ctx context.Context, params *StartDealParams) (*cid.Cid, error)
ClientGetDealInfo(context.Context, cid.Cid) (*DealInfo, error)
ClientListDeals(ctx context.Context) ([]DealInfo, error)
ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error)
ClientFindData(ctx context.Context, root cid.Cid) ([]QueryOffer, error)
ClientRetrieve(ctx context.Context, order RetrievalOrder, path string) error
ClientRetrieve(ctx context.Context, order RetrievalOrder, ref FileRef) error
ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error)
// ClientUnimport removes references to the specified file from filestore
@ -155,6 +155,11 @@ type FullNode interface {
PaychVoucherSubmit(context.Context, address.Address, *paych.SignedVoucher) (cid.Cid, error)
}
type FileRef struct {
Path string
IsCAR bool
}
type MinerSectors struct {
Pset uint64
Sset uint64

View File

@ -93,14 +93,14 @@ type FullNodeStruct struct {
WalletExport func(context.Context, address.Address) (*types.KeyInfo, error) `perm:"admin"`
WalletImport func(context.Context, *types.KeyInfo) (address.Address, error) `perm:"admin"`
ClientImport func(ctx context.Context, path string) (cid.Cid, error) `perm:"admin"`
ClientImport func(ctx context.Context, ref api.FileRef) (cid.Cid, error) `perm:"admin"`
ClientListImports func(ctx context.Context) ([]api.Import, error) `perm:"write"`
ClientHasLocal func(ctx context.Context, root cid.Cid) (bool, error) `perm:"write"`
ClientFindData func(ctx context.Context, root cid.Cid) ([]api.QueryOffer, error) `perm:"read"`
ClientStartDeal func(ctx context.Context, params *api.StartDealParams) (*cid.Cid, error) `perm:"admin"`
ClientGetDealInfo func(context.Context, cid.Cid) (*api.DealInfo, error) `perm:"read"`
ClientListDeals func(ctx context.Context) ([]api.DealInfo, error) `perm:"write"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, path string) error `perm:"admin"`
ClientRetrieve func(ctx context.Context, order api.RetrievalOrder, ref api.FileRef) error `perm:"admin"`
ClientQueryAsk func(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) `perm:"read"`
StateMinerSectors func(context.Context, address.Address, types.TipSetKey) ([]*api.ChainSectorInfo, error) `perm:"read"`
@ -241,8 +241,8 @@ func (c *FullNodeStruct) ClientListImports(ctx context.Context) ([]api.Import, e
return c.Internal.ClientListImports(ctx)
}
func (c *FullNodeStruct) ClientImport(ctx context.Context, path string) (cid.Cid, error) {
return c.Internal.ClientImport(ctx, path)
func (c *FullNodeStruct) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) {
return c.Internal.ClientImport(ctx, ref)
}
func (c *FullNodeStruct) ClientHasLocal(ctx context.Context, root cid.Cid) (bool, error) {
@ -264,8 +264,8 @@ func (c *FullNodeStruct) ClientListDeals(ctx context.Context) ([]api.DealInfo, e
return c.Internal.ClientListDeals(ctx)
}
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error {
return c.Internal.ClientRetrieve(ctx, order, path)
func (c *FullNodeStruct) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref api.FileRef) error {
return c.Internal.ClientRetrieve(ctx, order, ref)
}
func (c *FullNodeStruct) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {

View File

@ -131,7 +131,11 @@ loop:
t.Fatal(err)
}
err = client.ClientRetrieve(ctx, offers[0].Order(caddr), filepath.Join(rpath, "ret"))
ref := api.FileRef{
Path: filepath.Join(rpath, "ret"),
IsCAR: false,
}
err = client.ClientRetrieve(ctx, offers[0].Order(caddr), ref)
if err != nil {
t.Fatalf("%+v", err)
}

View File

@ -38,6 +38,12 @@ var clientImportCmd = &cli.Command{
Name: "import",
Usage: "Import data",
ArgsUsage: "[inputPath]",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "car",
Usage: "export to a car file instead of a regular file",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
@ -50,7 +56,11 @@ var clientImportCmd = &cli.Command{
return err
}
c, err := api.ClientImport(ctx, absPath)
ref := lapi.FileRef{
Path: absPath,
IsCAR: cctx.Bool("car"),
}
c, err := api.ClientImport(ctx, ref)
if err != nil {
return err
}
@ -251,6 +261,10 @@ var clientRetrieveCmd = &cli.Command{
Name: "address",
Usage: "address to use for transactions",
},
&cli.BoolFlag{
Name: "car",
Usage: "export to a car file instead of a regular file",
},
},
Action: func(cctx *cli.Context) error {
if cctx.NArg() != 2 {
@ -304,7 +318,11 @@ var clientRetrieveCmd = &cli.Command{
return nil
}
if err := api.ClientRetrieve(ctx, offers[0].Order(payer), cctx.Args().Get(1)); err != nil {
ref := lapi.FileRef{
Path: cctx.Args().Get(1),
IsCAR: cctx.Bool("car"),
}
if err := api.ClientRetrieve(ctx, offers[0].Order(payer), ref); err != nil {
return xerrors.Errorf("Retrieval Failed: %w", err)
}

6
go.mod
View File

@ -37,7 +37,7 @@ require (
github.com/ipfs/go-bitswap v0.1.8
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c
github.com/ipfs/go-car v0.0.3-0.20200221191037-3762780fa84e
github.com/ipfs/go-car v0.0.3-0.20200304012825-b6769248bfef
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-datastore v0.4.2
github.com/ipfs/go-ds-badger2 v0.0.0-20200211201106-609c9d2a39c7
@ -45,9 +45,9 @@ require (
github.com/ipfs/go-fs-lock v0.0.1
github.com/ipfs/go-graphsync v0.0.4
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242
github.com/ipfs/go-ipfs-blockstore v0.1.3
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-chunker v0.0.1
github.com/ipfs/go-ipfs-ds-help v0.0.1
github.com/ipfs/go-ipfs-ds-help v0.1.1
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4

9
go.sum
View File

@ -229,8 +229,8 @@ github.com/ipfs/go-blockservice v0.1.0/go.mod h1:hzmMScl1kXHg3M2BjTymbVPjv627N7s
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c h1:lN5IQA07VtLiTLAp/Scezp1ljFhXErC6yq4O1cu+yJ0=
github.com/ipfs/go-blockservice v0.1.3-0.20190908200855-f22eea50656c/go.mod h1:t+411r7psEUhLueM8C7aPA7cxCclv4O3VsUVxt9kz2I=
github.com/ipfs/go-car v0.0.3-0.20200131220434-3f68f6ebd093/go.mod h1:rEkw0S1sHd5kHL3rUSGEhwNanYqTwwNhjtpp0rwjrr4=
github.com/ipfs/go-car v0.0.3-0.20200221191037-3762780fa84e h1:+wPt/tQHMOl00ekuf+5+ekWpyGQ6JOnF9vzEXjW7LtA=
github.com/ipfs/go-car v0.0.3-0.20200221191037-3762780fa84e/go.mod h1:rEkw0S1sHd5kHL3rUSGEhwNanYqTwwNhjtpp0rwjrr4=
github.com/ipfs/go-car v0.0.3-0.20200304012825-b6769248bfef h1:Zn2PZSkX8Go+SZpQmjVKNrkcgbNuIxUC/3MOQRDTIVw=
github.com/ipfs/go-car v0.0.3-0.20200304012825-b6769248bfef/go.mod h1:7BMxYRi5cbR/GJ1A8mYSHvMLXLkHgYdrJ6VlNGobd0o=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
@ -266,8 +266,9 @@ github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242 h1:OYVGeYkGSR
github.com/ipfs/go-hamt-ipld v0.0.15-0.20200204200533-99b8553ef242/go.mod h1:kq3Pi+UP3oHhAdKexE+kHHYRKMoFNuGero0R7q3hWGg=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.3 h1:Nc/Uwc8KZgo5PsMUZT/Zt7zc0u/s+b/3c64ChQNttHc=
github.com/ipfs/go-ipfs-blockstore v0.1.3/go.mod h1:iNWVBoSQ7eMcaGo8+L3pKZABGTdWcqj1/hpoUu5bDps=
github.com/ipfs/go-ipfs-blockstore v0.1.4 h1:2SGI6U1B44aODevza8Rde3+dY30Pb+lbcObe1LETxOQ=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1 h1:Eh/H4pc1hsvhzsQoMEP3Bke/aW5P5rVM1IWFJMcGIPQ=
github.com/ipfs/go-ipfs-blocksutil v0.0.1/go.mod h1:Yq4M86uIOmxmGPUHv/uI7uKqZNtLb449gwKqXjIsnRk=
github.com/ipfs/go-ipfs-chunker v0.0.1 h1:cHUUxKFQ99pozdahi+uSC/3Y6HeRpi9oTeUHbE27SEw=
@ -277,6 +278,8 @@ github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1I
github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw=
github.com/ipfs/go-ipfs-ds-help v0.0.1 h1:QBg+Ts2zgeemK/dB0saiF/ykzRGgfoFMT90Rzo0OnVU=
github.com/ipfs/go-ipfs-ds-help v0.0.1/go.mod h1:gtP9xRaZXqIQRh1HRpp595KbBEdgqWFxefeVKOV8sxo=
github.com/ipfs/go-ipfs-ds-help v0.1.1 h1:IW/bXGeaAZV2VH0Kuok+Ohva/zHkHmeLFBxC1k7mNPc=
github.com/ipfs/go-ipfs-ds-help v0.1.1/go.mod h1:SbBafGJuGsPI/QL3j9Fc5YPLeAu+SzOkI0gFwAg+mOs=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1 h1:LJXIo9W7CAmugqI+uofioIpRb6rY30GUu7G6LUfpMvM=
github.com/ipfs/go-ipfs-exchange-interface v0.0.1/go.mod h1:c8MwfHjtQjPoDyiy9cFquVtVHkO9b9Ob3FG91qJnWCM=
github.com/ipfs/go-ipfs-exchange-offline v0.0.1 h1:P56jYKZF7lDDOLx5SotVh5KFxoY6C81I1NSHW1FxGew=

View File

@ -71,7 +71,12 @@ class Client extends React.Component {
let file = await this.props.pondClient.call('Pond.CreateRandomFile', [
this.state.kbs * 1000
]) // 1024 won't fit in 1k blocks :(
let cid = await this.props.client.call('Filecoin.ClientImport', [file])
let cid = await this.props.client.call('Filecoin.ClientImport', [
{
Path: file,
IsCar: false
}
])
let dealcid = await this.props.client.call('Filecoin.ClientStartDeal', [
cid,
this.state.miner,
@ -100,7 +105,10 @@ class Client extends React.Component {
await this.props.client.call('Filecoin.ClientRetrieve', [
order,
'/dev/null'
{
Path: '/dev/null',
IsCAR: false
}
])
}

View File

@ -10,6 +10,7 @@ import (
"golang.org/x/xerrors"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-car"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-filestore"
chunker "github.com/ipfs/go-ipfs-chunker"
@ -194,20 +195,39 @@ func (a *API) ClientFindData(ctx context.Context, root cid.Cid) ([]api.QueryOffe
return out, nil
}
func (a *API) ClientImport(ctx context.Context, path string) (cid.Cid, error) {
f, err := os.Open(path)
func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (cid.Cid, error) {
f, err := os.Open(ref.Path)
if err != nil {
return cid.Undef, err
}
stat, err := f.Stat()
if err != nil {
return cid.Undef, err
}
file, err := files.NewReaderPathFile(path, f, stat)
file, err := files.NewReaderPathFile(ref.Path, f, stat)
if err != nil {
return cid.Undef, err
}
if ref.IsCAR {
var store car.Store
if a.Filestore == nil {
store = a.Blockstore
} else {
store = (*filestore.Filestore)(a.Filestore)
}
result, err := car.LoadCar(store, file)
if err != nil {
return cid.Undef, err
}
if len(result.Roots) != 1 {
return cid.Undef, xerrors.New("cannot import car with more than one root")
}
return result.Roots[0], nil
}
bufferedDS := ipld.NewBufferedDAG(ctx, a.LocalDAG)
@ -271,24 +291,41 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
// TODO: make this less very bad by tracking root cids instead of using ListAll
out := make([]api.Import, 0)
lowest := make([]uint64, 0)
for {
r := next()
if r == nil {
return out, nil
}
if r.Offset != 0 {
continue
matched := false
for i := range out {
if out[i].FilePath == r.FilePath {
matched = true
if lowest[i] > r.Offset {
lowest[i] = r.Offset
out[i] = api.Import{
Status: r.Status,
Key: r.Key,
FilePath: r.FilePath,
Size: r.Size,
}
}
break
}
}
if !matched {
out = append(out, api.Import{
Status: r.Status,
Key: r.Key,
FilePath: r.FilePath,
Size: r.Size,
})
lowest = append(lowest, r.Offset)
}
out = append(out, api.Import{
Status: r.Status,
Key: r.Key,
FilePath: r.FilePath,
Size: r.Size,
})
}
}
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path string) error {
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref api.FileRef) error {
if order.MinerPeerID == "" {
pid, err := a.StateMinerPeerID(ctx, order.Miner, types.EmptyTSK)
if err != nil {
@ -336,6 +373,18 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
unsubscribe()
if ref.IsCAR {
f, err := os.Open(ref.Path)
if err != nil {
return err
}
err = car.WriteCar(ctx, a.LocalDAG, []cid.Cid{order.Root}, f)
if err != nil {
return err
}
return f.Close()
}
nd, err := a.LocalDAG.Get(ctx, order.Root)
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
@ -344,7 +393,7 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, path
if err != nil {
return xerrors.Errorf("ClientRetrieve: %w", err)
}
return files.WriteTo(file, path)
return files.WriteTo(file, ref.Path)
}
func (a *API) ClientQueryAsk(ctx context.Context, p peer.ID, miner address.Address) (*storagemarket.SignedStorageAsk, error) {