diff --git a/api/api_full.go b/api/api_full.go index 3dc503f46..f8013a304 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -719,9 +719,10 @@ type Import struct { Key multistore.StoreID Err string - Root *cid.Cid - Source string - FilePath string + Root *cid.Cid + Source string + FilePath string + CARv2FilePath string } type DealInfo struct { @@ -904,7 +905,7 @@ type RetrievalOrder struct { Piece *cid.Cid Size uint64 - LocalStore *multistore.StoreID // if specified, get data from local store + LocalCARV2FilePath string // if specified, get data from a local CARv2 file. // TODO: support offset Total types.BigInt UnsealPrice types.BigInt diff --git a/cli/client.go b/cli/client.go index dc925b72f..a8846eeae 100644 --- a/cli/client.go +++ b/cli/client.go @@ -1104,8 +1104,8 @@ var clientRetrieveCmd = &cli.Command{ for _, i := range imports { if i.Root != nil && i.Root.Equals(file) { order = &lapi.RetrievalOrder{ - Root: file, - LocalStore: &i.Key, + Root: file, + LocalCARV2FilePath: i.CARv2FilePath, Total: big.Zero(), UnsealPrice: big.Zero(), diff --git a/go.mod b/go.mod index 0d2664f09..a43d1c8e3 100644 --- a/go.mod +++ b/go.mod @@ -28,14 +28,13 @@ require ( github.com/fatih/color v1.9.0 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.5 - github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20201006184820-924ee87a1349 // indirect github.com/filecoin-project/go-bitfield v0.2.4 github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v1.7.0 github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a - github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210701085128-940bb3f8c536 + github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210702071641-2b0175bfe1ed github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 @@ -134,7 +133,6 @@ require ( github.com/raulk/clock v1.1.0 github.com/raulk/go-watchdog v1.0.1 github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 - github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/testify v1.7.0 github.com/syndtr/goleveldb v1.0.0 github.com/urfave/cli/v2 v2.2.0 diff --git a/go.sum b/go.sum index 872360c87..7973174d6 100644 --- a/go.sum +++ b/go.sum @@ -256,6 +256,8 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= +github.com/filecoin-project/dagstore v0.0.0-20210624112130-607a5a2a5bd4 h1:AcL0EUgq37GqpsP085Ib+mJGBhNqAydeS/C1u59Prbg= +github.com/filecoin-project/dagstore v0.0.0-20210624112130-607a5a2a5bd4/go.mod h1:b8ZvtSY5ggpvS2AkeF+Hbdcu5/Xm1P6RIR5NKJj7OhE= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= @@ -271,7 +273,7 @@ github.com/filecoin-project/go-bitfield v0.2.4 h1:uZ7MeE+XfM5lqrHJZ93OnhQKc/rveW github.com/filecoin-project/go-bitfield v0.2.4/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= -github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U= +github.com/filecoin-project/go-commp-utils v0.1.0/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U= github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7 h1:U9Z+76pHCKBmtdxFV7JFZJj7OVm12I6dEKwtMVbq5p0= github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= @@ -285,8 +287,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210701085128-940bb3f8c536 h1:jp/LnSNTrxAeVIMAIM06U1qN3m0SpPxigX6/SIAXnTQ= -github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210701085128-940bb3f8c536/go.mod h1:Arrf7syON79ni4amsGtzV3Pl3qtUrCV5dMX6U21Gxzw= +github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210702071641-2b0175bfe1ed h1:nX5MS1+yE0gmkFXdPaPYCVKjYrKCDmMc3BINC2QZRWY= +github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210702071641-2b0175bfe1ed/go.mod h1:bxcU7Y+88x0yToWbNGtiH6JrWMklCMIiF/LluKua1eo= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= @@ -322,7 +324,6 @@ github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK github.com/filecoin-project/specs-actors v0.9.14 h1:68PVstg2UB3ZsMLF+DKFTAs/YKsqhKWynkr0IqmVRQY= github.com/filecoin-project/specs-actors v0.9.14/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao= github.com/filecoin-project/specs-actors/v2 v2.0.1/go.mod h1:v2NZVYinNIKA9acEMBm5wWXxqv5+frFEbekBFemYghY= -github.com/filecoin-project/specs-actors/v2 v2.3.2/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y= github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb/go.mod h1:LljnY2Mn2homxZsmokJZCpRuhOPxfXhvcek5gWkmqAc= github.com/filecoin-project/specs-actors/v2 v2.3.5 h1:PbT4tPlSXZ8sRgajhb4D8AOEmiaaZ+jg6tc6BBv8VQc= github.com/filecoin-project/specs-actors/v2 v2.3.5/go.mod h1:LljnY2Mn2homxZsmokJZCpRuhOPxfXhvcek5gWkmqAc= @@ -996,6 +997,7 @@ github.com/libp2p/go-libp2p-netutil v0.0.1/go.mod h1:GdusFvujWZI9Vt0X5BKqwWWmZFx github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLKcKF72EAMQ= github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCThNdbQD54k3TqjpbFU= github.com/libp2p/go-libp2p-noise v0.1.1/go.mod h1:QDFLdKX7nluB7DEnlVPbz7xlLHdwHFA9HiohJRr3vwM= +github.com/libp2p/go-libp2p-noise v0.1.2/go.mod h1:9B10b7ueo7TIxZHHcjcDCo5Hd6kfKT2m77by82SFRfE= github.com/libp2p/go-libp2p-noise v0.2.0 h1:wmk5nhB9a2w2RxMOyvsoKjizgJOEaJdfAakr0jN8gds= github.com/libp2p/go-libp2p-noise v0.2.0/go.mod h1:IEbYhBBzGyvdLBoxxULL/SGbJARhUeqlO8lVSREYu2Q= github.com/libp2p/go-libp2p-peer v0.0.1/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo= @@ -1317,6 +1319,7 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/ngdinhtoan/glide-cleanup v0.2.0/go.mod h1:UQzsmiDOb8YV3nOsCxK/c9zPpCZVNoHScRE3EO9pVMM= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c h1:5bFTChQxSKNwy8ALwOebjekYExl9HTT9urdawqC95tA= github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c/go.mod h1:7qN3Y0BvzRUf4LofcoJplQL10lsFDb4PYlePTVwrP28= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg= @@ -1522,7 +1525,6 @@ github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 h1:7z3LSn867ex6 github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -1656,6 +1658,7 @@ go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= @@ -1835,6 +1838,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180202135801-37707fdb30a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -2057,6 +2061,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= diff --git a/node/impl/client/client.go b/node/impl/client/client.go index 8aca85131..cc34ef34b 100644 --- a/node/impl/client/client.go +++ b/node/impl/client/client.go @@ -5,12 +5,13 @@ import ( "context" "fmt" "io" - "io/ioutil" "os" "sort" "time" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" "golang.org/x/xerrors" @@ -60,7 +61,6 @@ import ( "github.com/filecoin-project/lotus/node/impl/paych" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo/importmgr" - "github.com/filecoin-project/lotus/node/repo/retrievalstoremgr" ) var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31) @@ -128,7 +128,11 @@ func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isSt return nil, xerrors.New("stateless storage deals can only be initiated with storage price of 0") } } else if params.Data.TransferType == storagemarket.TTGraphsync { - importIDs := a.imgr().List() + importIDs, err := a.imgr().List() + if err != nil { + return nil, xerrors.Errorf("failed to fetch import IDs: %w", err) + } + for _, importID := range importIDs { info, err := a.imgr().Info(importID) if err != nil { @@ -491,10 +495,10 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor // we don't need the store any more after we return from here. clean it up completely. // we only need to retain the metadata related to this import which is identified by the storeID/importID. defer func() { - //_ = ((*multistore.MultiStore)(a.Mds)).Delete(id) + _ = ((*multistore.MultiStore)(a.Mds)).Delete(id) }() - carV2File, err := a.newTempFilePath(id) + carV2File, err := a.imgr().NewTempFile(id) if err != nil { return nil, xerrors.Errorf("failed to create temp CARv2 file: %w", err) } @@ -527,7 +531,6 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor if err := a.imgr().AddLabel(id, importmgr.LCARv2FilePath, carV2File); err != nil { return nil, err } - if err := a.imgr().AddLabel(id, importmgr.LRootCid, root.String()); err != nil { return nil, err } @@ -538,19 +541,6 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor }, nil } -func (a *API) newTempFilePath(id multistore.StoreID) (string, error) { - // TODO Get the repo path here. - file, err := ioutil.TempFile("", fmt.Sprintf("%d", id)) - if err != nil { - return "nil", xerrors.Errorf("failed to create temp CARv2 file: %w", err) - } - if err := file.Close(); err != nil { - return "", xerrors.Errorf("failed to close CARv2 file") - } - - return file.Name(), nil -} - func (a *API) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error { info, err := a.imgr().Info(importID) if err != nil { @@ -577,7 +567,7 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (c cid.Cid, fi _ = ((*multistore.MultiStore)(a.Mds)).Delete(id) }() - carV2File, err := a.newTempFilePath(id) + carV2File, err := a.imgr().NewTempFile(id) if err != nil { return cid.Undef, xerrors.Errorf("failed to create temp CARv2 file: %w", err) } @@ -608,7 +598,10 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (c cid.Cid, fi } func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { - importIDs := a.imgr().List() + importIDs, err := a.imgr().List() + if err != nil { + return nil, xerrors.Errorf("failed to fetch imports: %w", err) + } out := make([]api.Import, len(importIDs)) for i, id := range importIDs { @@ -622,9 +615,10 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) { } ai := api.Import{ - Key: id, - Source: info.Labels[importmgr.LSource], - FilePath: info.Labels[importmgr.LFileName], + Key: id, + Source: info.Labels[importmgr.LSource], + FilePath: info.Labels[importmgr.LFileName], + CARv2FilePath: info.Labels[importmgr.LCARv2FilePath], } if info.Labels[importmgr.LRootCid] != "" { @@ -666,6 +660,10 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmar } func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error { + if ref == nil || ref.Path == "" { + return xerrors.New("must pass output file path for the retrieval deal") + } + events := make(chan marketevents.RetrievalEvent) go a.clientRetrieve(ctx, order, ref, events) @@ -686,6 +684,10 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref } func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) { + if ref == nil || ref.Path == "" { + return nil, xerrors.New("must pass output file path for the retrieval deal") + } + events := make(chan marketevents.RetrievalEvent) go a.clientRetrieve(ctx, order, ref, events) return events, nil @@ -745,9 +747,8 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref } } - var store retrievalstoremgr.RetrievalStore - - if order.LocalStore == nil { + var carV2FilePath string + if order.LocalCARV2FilePath == "" { if order.MinerPeer == nil || order.MinerPeer.ID == "" { mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK) if err != nil { @@ -771,14 +772,6 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } - /*id, st, err := a.imgr().NewStore() - if err != nil { - return err - } - if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil { - return err - }*/ - ppb := types.BigDiv(order.Total, types.NewInt(order.Size)) params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice) @@ -787,16 +780,6 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } - store, err = a.RetrievalStoreMgr.NewStore() - if err != nil { - finish(xerrors.Errorf("Error setting up new store: %w", err)) - return - } - - defer func() { - _ = a.RetrievalStoreMgr.ReleaseStore(store) - }() - // Subscribe to events before retrieving to avoid losing events. subscribeEvents := make(chan retrievalSubscribeEvent, 1) subscribeCtx, cancel := context.WithCancel(ctx) @@ -811,15 +794,14 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref } }) - dealID, err := a.Retrieval.Retrieve( + resp, err := a.Retrieval.Retrieve( ctx, order.Root, params, order.Total, *order.MinerPeer, order.Client, - order.Miner, - store.StoreID()) + order.Miner) if err != nil { unsubscribe() @@ -827,56 +809,59 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref return } - err = readSubscribeEvents(ctx, dealID, subscribeEvents, events) + err = readSubscribeEvents(ctx, resp.DealID, subscribeEvents, events) unsubscribe() if err != nil { finish(xerrors.Errorf("Retrieve: %w", err)) return } + + carV2FilePath = resp.CarFilePath + // remove the temp CARv2 fil when retrieval is complete + defer os.Remove(carV2FilePath) } else { - // local retrieval - st, err := ((*multistore.MultiStore)(a.Mds)).Get(*order.LocalStore) - if err != nil { - finish(xerrors.Errorf("Retrieve: %w", err)) - return - } - - store = &multiStoreRetrievalStore{ - storeID: *order.LocalStore, - store: st, - } + carV2FilePath = order.LocalCARV2FilePath } - // If ref is nil, it only fetches the data into the configured blockstore. - if ref == nil { - finish(nil) - return - } - - rdag := store.DAGService() - if ref.IsCAR { + // user wants a CAR file, transform the CARv2 to a CARv1 and write it out. f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644) if err != nil { finish(err) return } - err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f) + + carv2Reader, err := carv2.NewReaderMmap(carV2FilePath) if err != nil { finish(err) return } + defer carv2Reader.Close() + if _, err := io.Copy(f, carv2Reader.CarV1Reader()); err != nil { + finish(err) + return + } + finish(f.Close()) return } - nd, err := rdag.Get(ctx, order.Root) + rw, err := blockstore.OpenReadOnly(carV2FilePath, false) + if err != nil { + finish(err) + return + } + defer rw.Close() + bsvc := blockservice.New(rw, offline.Exchange(rw)) + dag := merkledag.NewDAGService(bsvc) + + nd, err := dag.Get(ctx, order.Root) if err != nil { finish(xerrors.Errorf("ClientRetrieve: %w", err)) return } - file, err := unixfile.NewUnixfsFile(ctx, rdag, nd) + file, err := unixfile.NewUnixfsFile(ctx, dag, nd) if err != nil { finish(xerrors.Errorf("ClientRetrieve: %w", err)) return @@ -1107,7 +1092,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri return err } if err = sc.Write(f); err != nil { - return err + return xerrors.Errorf("failed to write CAR to output file: %w", err) } return f.Close() diff --git a/node/impl/client/import.go b/node/impl/client/import.go index f49297011..d34aa1830 100644 --- a/node/impl/client/import.go +++ b/node/impl/client/import.go @@ -47,10 +47,8 @@ func importNormalFileToCARv2(ctx context.Context, st *multistore.Store, inputFil // and releases the file handle it acquires. defer func() { err := rw.Finalize() - if finalErr != nil { - finalErr = xerrors.Errorf("failed to import file to CARv2, err=%w, also failed to finalize rw CARv2 blockstore, err=%w", finalErr, - err) + finalErr = xerrors.Errorf("failed to import file to CARv2, err=%w", finalErr) } else { finalErr = err } diff --git a/node/modules/client.go b/node/modules/client.go index 79b20a8ce..fe6560120 100644 --- a/node/modules/client.go +++ b/node/modules/client.go @@ -99,8 +99,8 @@ func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locke return mds, nil } -func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes.ClientImportMgr { - return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client"))) +func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS, r repo.LockedRepo) dtypes.ClientImportMgr { + return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client")), r.Path()) } func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore { @@ -173,7 +173,7 @@ func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore { return namespace.Wrap(ds, datastore.NewKey("/deals/client")) } -func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, j journal.Journal) (storagemarket.StorageClient, error) { +func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, j journal.Journal) (storagemarket.StorageClient, error) { // go-fil-markets protocol retries: // 1s, 5s, 25s, 2m5s, 5m x 11 ~= 1 hour marketsRetryParams := smnet.RetryParameters(time.Second, 5*time.Minute, 15, 5) @@ -200,11 +200,22 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md return c, nil } +type CARStore struct { + dir string +} + +func (c *CARStore) Path(key string) string { + return filepath.Join(c.dir, key) +} + // RetrievalClient creates a new retrieval client attached to the client blockstore -func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) { +func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, + ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) { + adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI) network := rmnet.NewFromLibp2pHost(h) - client, err := retrievalimpl.NewClient(network, mds, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))) + client, err := retrievalimpl.NewClient(network, + &CARStore{r.Path()}, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client"))) if err != nil { return nil, err } diff --git a/node/repo/importmgr/mgr.go b/node/repo/importmgr/mgr.go index b37536e18..7ae84c4f0 100644 --- a/node/repo/importmgr/mgr.go +++ b/node/repo/importmgr/mgr.go @@ -3,7 +3,10 @@ package importmgr import ( "encoding/json" "fmt" + "io/ioutil" + "strconv" + "github.com/ipfs/go-datastore/query" "golang.org/x/xerrors" "github.com/filecoin-project/go-multistore" @@ -13,8 +16,9 @@ import ( ) type Mgr struct { - mds *multistore.MultiStore - ds datastore.Batching + mds *multistore.MultiStore + ds datastore.Batching + repoPath string Blockstore blockstore.BasicBlockstore } @@ -25,14 +29,14 @@ const ( LSource = "source" // Function which created the import LRootCid = "root" // Root CID LFileName = "filename" // Local file path - LMTime = "mtime" // File modification timestamp LCARv2FilePath = "CARv2Path" // path of the CARv2 file. ) -func New(mds *multistore.MultiStore, ds datastore.Batching) *Mgr { +func New(mds *multistore.MultiStore, ds datastore.Batching, repoPath string) *Mgr { return &Mgr{ mds: mds, Blockstore: blockstore.Adapt(mds.MultiReadBlockstore()), + repoPath: repoPath, ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"), } @@ -81,8 +85,29 @@ func (m *Mgr) AddLabel(id multistore.StoreID, key, value string) error { // sour return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta) } -func (m *Mgr) List() []multistore.StoreID { - return m.mds.List() +func (m *Mgr) List() ([]multistore.StoreID, error) { + var keys []multistore.StoreID + + qres, err := m.ds.Query(query.Query{KeysOnly: true}) + if err != nil { + return nil, xerrors.Errorf("query error: %w", err) + } + defer qres.Close() //nolint:errcheck + + for r := range qres.Next() { + k := r.Key + if string(k[0]) == "/" { + k = k[1:] + } + + id, err := strconv.ParseUint(k, 10, 64) + if err != nil { + return nil, xerrors.Errorf("failed to parse key %s to uint64, err=%w", r.Key, err) + } + keys = append(keys, multistore.StoreID(id)) + } + + return keys, nil } func (m *Mgr) Info(id multistore.StoreID) (*StoreMeta, error) { @@ -110,3 +135,17 @@ func (m *Mgr) Remove(id multistore.StoreID) error { return nil } + +func (a *Mgr) NewTempFile(id multistore.StoreID) (string, error) { + file, err := ioutil.TempFile(a.repoPath, fmt.Sprintf("%d", id)) + if err != nil { + return "", xerrors.Errorf("failed to create temp file: %w", err) + } + + // close the file as we need to return the path here. + if err := file.Close(); err != nil { + return "", xerrors.Errorf("failed to close temp file: %w", err) + } + + return file.Name(), nil +}