lotus changes for the DAG Store

This commit is contained in:
aarshkshah1992 2021-07-03 10:10:56 +05:30
parent 31ecce3a72
commit dae6c757b5
8 changed files with 137 additions and 100 deletions

View File

@ -719,9 +719,10 @@ type Import struct {
Key multistore.StoreID
Err string
Root *cid.Cid
Source string
FilePath string
Root *cid.Cid
Source string
FilePath string
CARv2FilePath string
}
type DealInfo struct {
@ -904,7 +905,7 @@ type RetrievalOrder struct {
Piece *cid.Cid
Size uint64
LocalStore *multistore.StoreID // if specified, get data from local store
LocalCARV2FilePath string // if specified, get data from a local CARv2 file.
// TODO: support offset
Total types.BigInt
UnsealPrice types.BigInt

View File

@ -1104,8 +1104,8 @@ var clientRetrieveCmd = &cli.Command{
for _, i := range imports {
if i.Root != nil && i.Root.Equals(file) {
order = &lapi.RetrievalOrder{
Root: file,
LocalStore: &i.Key,
Root: file,
LocalCARV2FilePath: i.CARv2FilePath,
Total: big.Zero(),
UnsealPrice: big.Zero(),

4
go.mod
View File

@ -28,14 +28,13 @@ require (
github.com/fatih/color v1.9.0
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20201006184820-924ee87a1349 // indirect
github.com/filecoin-project/go-bitfield v0.2.4
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.7.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210701085128-940bb3f8c536
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210702071641-2b0175bfe1ed
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
@ -134,7 +133,6 @@ require (
github.com/raulk/clock v1.1.0
github.com/raulk/go-watchdog v1.0.1
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.0
github.com/urfave/cli/v2 v2.2.0

15
go.sum
View File

@ -256,6 +256,8 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
github.com/filecoin-project/dagstore v0.0.0-20210624112130-607a5a2a5bd4 h1:AcL0EUgq37GqpsP085Ib+mJGBhNqAydeS/C1u59Prbg=
github.com/filecoin-project/dagstore v0.0.0-20210624112130-607a5a2a5bd4/go.mod h1:b8ZvtSY5ggpvS2AkeF+Hbdcu5/Xm1P6RIR5NKJj7OhE=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
@ -271,7 +273,7 @@ github.com/filecoin-project/go-bitfield v0.2.4 h1:uZ7MeE+XfM5lqrHJZ93OnhQKc/rveW
github.com/filecoin-project/go-bitfield v0.2.4/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U=
github.com/filecoin-project/go-commp-utils v0.1.0/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U=
github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7 h1:U9Z+76pHCKBmtdxFV7JFZJj7OVm12I6dEKwtMVbq5p0=
github.com/filecoin-project/go-commp-utils v0.1.1-0.20210427191551-70bf140d31c7/go.mod h1:6s95K91mCyHY51RPWECZieD3SGWTqIFLf1mPOes9l5U=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
@ -285,8 +287,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210701085128-940bb3f8c536 h1:jp/LnSNTrxAeVIMAIM06U1qN3m0SpPxigX6/SIAXnTQ=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210701085128-940bb3f8c536/go.mod h1:Arrf7syON79ni4amsGtzV3Pl3qtUrCV5dMX6U21Gxzw=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210702071641-2b0175bfe1ed h1:nX5MS1+yE0gmkFXdPaPYCVKjYrKCDmMc3BINC2QZRWY=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210702071641-2b0175bfe1ed/go.mod h1:bxcU7Y+88x0yToWbNGtiH6JrWMklCMIiF/LluKua1eo=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
@ -322,7 +324,6 @@ github.com/filecoin-project/specs-actors v0.9.13/go.mod h1:TS1AW/7LbG+615j4NsjMK
github.com/filecoin-project/specs-actors v0.9.14 h1:68PVstg2UB3ZsMLF+DKFTAs/YKsqhKWynkr0IqmVRQY=
github.com/filecoin-project/specs-actors v0.9.14/go.mod h1:TS1AW/7LbG+615j4NsjMK1qlpAwaFsG9w0V2tg2gSao=
github.com/filecoin-project/specs-actors/v2 v2.0.1/go.mod h1:v2NZVYinNIKA9acEMBm5wWXxqv5+frFEbekBFemYghY=
github.com/filecoin-project/specs-actors/v2 v2.3.2/go.mod h1:UuJQLoTx/HPvvWeqlIFmC/ywlOLHNe8SNQ3OunFbu2Y=
github.com/filecoin-project/specs-actors/v2 v2.3.5-0.20210114162132-5b58b773f4fb/go.mod h1:LljnY2Mn2homxZsmokJZCpRuhOPxfXhvcek5gWkmqAc=
github.com/filecoin-project/specs-actors/v2 v2.3.5 h1:PbT4tPlSXZ8sRgajhb4D8AOEmiaaZ+jg6tc6BBv8VQc=
github.com/filecoin-project/specs-actors/v2 v2.3.5/go.mod h1:LljnY2Mn2homxZsmokJZCpRuhOPxfXhvcek5gWkmqAc=
@ -996,6 +997,7 @@ github.com/libp2p/go-libp2p-netutil v0.0.1/go.mod h1:GdusFvujWZI9Vt0X5BKqwWWmZFx
github.com/libp2p/go-libp2p-netutil v0.1.0 h1:zscYDNVEcGxyUpMd0JReUZTrpMfia8PmLKcKF72EAMQ=
github.com/libp2p/go-libp2p-netutil v0.1.0/go.mod h1:3Qv/aDqtMLTUyQeundkKsA+YCThNdbQD54k3TqjpbFU=
github.com/libp2p/go-libp2p-noise v0.1.1/go.mod h1:QDFLdKX7nluB7DEnlVPbz7xlLHdwHFA9HiohJRr3vwM=
github.com/libp2p/go-libp2p-noise v0.1.2/go.mod h1:9B10b7ueo7TIxZHHcjcDCo5Hd6kfKT2m77by82SFRfE=
github.com/libp2p/go-libp2p-noise v0.2.0 h1:wmk5nhB9a2w2RxMOyvsoKjizgJOEaJdfAakr0jN8gds=
github.com/libp2p/go-libp2p-noise v0.2.0/go.mod h1:IEbYhBBzGyvdLBoxxULL/SGbJARhUeqlO8lVSREYu2Q=
github.com/libp2p/go-libp2p-peer v0.0.1/go.mod h1:nXQvOBbwVqoP+T5Y5nCjeH4sP9IX/J0AMzcDUVruVoo=
@ -1317,6 +1319,7 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/ngdinhtoan/glide-cleanup v0.2.0/go.mod h1:UQzsmiDOb8YV3nOsCxK/c9zPpCZVNoHScRE3EO9pVMM=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c h1:5bFTChQxSKNwy8ALwOebjekYExl9HTT9urdawqC95tA=
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c/go.mod h1:7qN3Y0BvzRUf4LofcoJplQL10lsFDb4PYlePTVwrP28=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg=
@ -1522,7 +1525,6 @@ github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 h1:7z3LSn867ex6
github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@ -1656,6 +1658,7 @@ go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
@ -1835,6 +1838,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180202135801-37707fdb30a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -2057,6 +2061,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=

View File

@ -5,12 +5,13 @@ import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"time"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"
@ -60,7 +61,6 @@ import (
"github.com/filecoin-project/lotus/node/impl/paych"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/lotus/node/repo/retrievalstoremgr"
)
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
@ -128,7 +128,11 @@ func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isSt
return nil, xerrors.New("stateless storage deals can only be initiated with storage price of 0")
}
} else if params.Data.TransferType == storagemarket.TTGraphsync {
importIDs := a.imgr().List()
importIDs, err := a.imgr().List()
if err != nil {
return nil, xerrors.Errorf("failed to fetch import IDs: %w", err)
}
for _, importID := range importIDs {
info, err := a.imgr().Info(importID)
if err != nil {
@ -491,10 +495,10 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
// we don't need the store any more after we return from here. clean it up completely.
// we only need to retain the metadata related to this import which is identified by the storeID/importID.
defer func() {
//_ = ((*multistore.MultiStore)(a.Mds)).Delete(id)
_ = ((*multistore.MultiStore)(a.Mds)).Delete(id)
}()
carV2File, err := a.newTempFilePath(id)
carV2File, err := a.imgr().NewTempFile(id)
if err != nil {
return nil, xerrors.Errorf("failed to create temp CARv2 file: %w", err)
}
@ -527,7 +531,6 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
if err := a.imgr().AddLabel(id, importmgr.LCARv2FilePath, carV2File); err != nil {
return nil, err
}
if err := a.imgr().AddLabel(id, importmgr.LRootCid, root.String()); err != nil {
return nil, err
}
@ -538,19 +541,6 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
}, nil
}
func (a *API) newTempFilePath(id multistore.StoreID) (string, error) {
// TODO Get the repo path here.
file, err := ioutil.TempFile("", fmt.Sprintf("%d", id))
if err != nil {
return "nil", xerrors.Errorf("failed to create temp CARv2 file: %w", err)
}
if err := file.Close(); err != nil {
return "", xerrors.Errorf("failed to close CARv2 file")
}
return file.Name(), nil
}
func (a *API) ClientRemoveImport(ctx context.Context, importID multistore.StoreID) error {
info, err := a.imgr().Info(importID)
if err != nil {
@ -577,7 +567,7 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (c cid.Cid, fi
_ = ((*multistore.MultiStore)(a.Mds)).Delete(id)
}()
carV2File, err := a.newTempFilePath(id)
carV2File, err := a.imgr().NewTempFile(id)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create temp CARv2 file: %w", err)
}
@ -608,7 +598,10 @@ func (a *API) ClientImportLocal(ctx context.Context, f io.Reader) (c cid.Cid, fi
}
func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
importIDs := a.imgr().List()
importIDs, err := a.imgr().List()
if err != nil {
return nil, xerrors.Errorf("failed to fetch imports: %w", err)
}
out := make([]api.Import, len(importIDs))
for i, id := range importIDs {
@ -622,9 +615,10 @@ func (a *API) ClientListImports(ctx context.Context) ([]api.Import, error) {
}
ai := api.Import{
Key: id,
Source: info.Labels[importmgr.LSource],
FilePath: info.Labels[importmgr.LFileName],
Key: id,
Source: info.Labels[importmgr.LSource],
FilePath: info.Labels[importmgr.LFileName],
CARv2FilePath: info.Labels[importmgr.LCARv2FilePath],
}
if info.Labels[importmgr.LRootCid] != "" {
@ -666,6 +660,10 @@ func (a *API) ClientCancelRetrievalDeal(ctx context.Context, dealID retrievalmar
}
func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) error {
if ref == nil || ref.Path == "" {
return xerrors.New("must pass output file path for the retrieval deal")
}
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
@ -686,6 +684,10 @@ func (a *API) ClientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
}
func (a *API) ClientRetrieveWithEvents(ctx context.Context, order api.RetrievalOrder, ref *api.FileRef) (<-chan marketevents.RetrievalEvent, error) {
if ref == nil || ref.Path == "" {
return nil, xerrors.New("must pass output file path for the retrieval deal")
}
events := make(chan marketevents.RetrievalEvent)
go a.clientRetrieve(ctx, order, ref, events)
return events, nil
@ -745,9 +747,8 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
}
}
var store retrievalstoremgr.RetrievalStore
if order.LocalStore == nil {
var carV2FilePath string
if order.LocalCARV2FilePath == "" {
if order.MinerPeer == nil || order.MinerPeer.ID == "" {
mi, err := a.StateMinerInfo(ctx, order.Miner, types.EmptyTSK)
if err != nil {
@ -771,14 +772,6 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
/*id, st, err := a.imgr().NewStore()
if err != nil {
return err
}
if err := a.imgr().AddLabel(id, "source", "retrieval"); err != nil {
return err
}*/
ppb := types.BigDiv(order.Total, types.NewInt(order.Size))
params, err := rm.NewParamsV1(ppb, order.PaymentInterval, order.PaymentIntervalIncrease, shared.AllSelector(), order.Piece, order.UnsealPrice)
@ -787,16 +780,6 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
store, err = a.RetrievalStoreMgr.NewStore()
if err != nil {
finish(xerrors.Errorf("Error setting up new store: %w", err))
return
}
defer func() {
_ = a.RetrievalStoreMgr.ReleaseStore(store)
}()
// Subscribe to events before retrieving to avoid losing events.
subscribeEvents := make(chan retrievalSubscribeEvent, 1)
subscribeCtx, cancel := context.WithCancel(ctx)
@ -811,15 +794,14 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
}
})
dealID, err := a.Retrieval.Retrieve(
resp, err := a.Retrieval.Retrieve(
ctx,
order.Root,
params,
order.Total,
*order.MinerPeer,
order.Client,
order.Miner,
store.StoreID())
order.Miner)
if err != nil {
unsubscribe()
@ -827,56 +809,59 @@ func (a *API) clientRetrieve(ctx context.Context, order api.RetrievalOrder, ref
return
}
err = readSubscribeEvents(ctx, dealID, subscribeEvents, events)
err = readSubscribeEvents(ctx, resp.DealID, subscribeEvents, events)
unsubscribe()
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
carV2FilePath = resp.CarFilePath
// remove the temp CARv2 fil when retrieval is complete
defer os.Remove(carV2FilePath)
} else {
// local retrieval
st, err := ((*multistore.MultiStore)(a.Mds)).Get(*order.LocalStore)
if err != nil {
finish(xerrors.Errorf("Retrieve: %w", err))
return
}
store = &multiStoreRetrievalStore{
storeID: *order.LocalStore,
store: st,
}
carV2FilePath = order.LocalCARV2FilePath
}
// If ref is nil, it only fetches the data into the configured blockstore.
if ref == nil {
finish(nil)
return
}
rdag := store.DAGService()
if ref.IsCAR {
// user wants a CAR file, transform the CARv2 to a CARv1 and write it out.
f, err := os.OpenFile(ref.Path, os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
finish(err)
return
}
err = car.WriteCar(ctx, rdag, []cid.Cid{order.Root}, f)
carv2Reader, err := carv2.NewReaderMmap(carV2FilePath)
if err != nil {
finish(err)
return
}
defer carv2Reader.Close()
if _, err := io.Copy(f, carv2Reader.CarV1Reader()); err != nil {
finish(err)
return
}
finish(f.Close())
return
}
nd, err := rdag.Get(ctx, order.Root)
rw, err := blockstore.OpenReadOnly(carV2FilePath, false)
if err != nil {
finish(err)
return
}
defer rw.Close()
bsvc := blockservice.New(rw, offline.Exchange(rw))
dag := merkledag.NewDAGService(bsvc)
nd, err := dag.Get(ctx, order.Root)
if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
}
file, err := unixfile.NewUnixfsFile(ctx, rdag, nd)
file, err := unixfile.NewUnixfsFile(ctx, dag, nd)
if err != nil {
finish(xerrors.Errorf("ClientRetrieve: %w", err))
return
@ -1107,7 +1092,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri
return err
}
if err = sc.Write(f); err != nil {
return err
return xerrors.Errorf("failed to write CAR to output file: %w", err)
}
return f.Close()

View File

@ -47,10 +47,8 @@ func importNormalFileToCARv2(ctx context.Context, st *multistore.Store, inputFil
// and releases the file handle it acquires.
defer func() {
err := rw.Finalize()
if finalErr != nil {
finalErr = xerrors.Errorf("failed to import file to CARv2, err=%w, also failed to finalize rw CARv2 blockstore, err=%w", finalErr,
err)
finalErr = xerrors.Errorf("failed to import file to CARv2, err=%w", finalErr)
} else {
finalErr = err
}

View File

@ -99,8 +99,8 @@ func ClientMultiDatastore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locke
return mds, nil
}
func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS) dtypes.ClientImportMgr {
return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client")))
func ClientImportMgr(mds dtypes.ClientMultiDstore, ds dtypes.MetadataDS, r repo.LockedRepo) dtypes.ClientImportMgr {
return importmgr.New(mds, namespace.Wrap(ds, datastore.NewKey("/client")), r.Path())
}
func ClientBlockstore(imgr dtypes.ClientImportMgr) dtypes.ClientBlockstore {
@ -173,7 +173,7 @@ func NewClientDatastore(ds dtypes.MetadataDS) dtypes.ClientDatastore {
return namespace.Wrap(ds, datastore.NewKey("/deals/client"))
}
func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, mds dtypes.ClientMultiDstore, r repo.LockedRepo, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, j journal.Journal) (storagemarket.StorageClient, error) {
func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataTransfer, discovery *discoveryimpl.Local, deals dtypes.ClientDatastore, scn storagemarket.StorageClientNode, j journal.Journal) (storagemarket.StorageClient, error) {
// go-fil-markets protocol retries:
// 1s, 5s, 25s, 2m5s, 5m x 11 ~= 1 hour
marketsRetryParams := smnet.RetryParameters(time.Second, 5*time.Minute, 15, 5)
@ -200,11 +200,22 @@ func StorageClient(lc fx.Lifecycle, h host.Host, ibs dtypes.ClientBlockstore, md
return c, nil
}
type CARStore struct {
dir string
}
func (c *CARStore) Path(key string) string {
return filepath.Join(c.dir, key)
}
// RetrievalClient creates a new retrieval client attached to the client blockstore
func RetrievalClient(lc fx.Lifecycle, h host.Host, mds dtypes.ClientMultiDstore, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver, ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
adapter := retrievaladapter.NewRetrievalClientNode(payAPI, chainAPI, stateAPI)
network := rmnet.NewFromLibp2pHost(h)
client, err := retrievalimpl.NewClient(network, mds, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client")))
client, err := retrievalimpl.NewClient(network,
&CARStore{r.Path()}, dt, adapter, resolver, namespace.Wrap(ds, datastore.NewKey("/retrievals/client")))
if err != nil {
return nil, err
}

View File

@ -3,7 +3,10 @@ package importmgr
import (
"encoding/json"
"fmt"
"io/ioutil"
"strconv"
"github.com/ipfs/go-datastore/query"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-multistore"
@ -13,8 +16,9 @@ import (
)
type Mgr struct {
mds *multistore.MultiStore
ds datastore.Batching
mds *multistore.MultiStore
ds datastore.Batching
repoPath string
Blockstore blockstore.BasicBlockstore
}
@ -25,14 +29,14 @@ const (
LSource = "source" // Function which created the import
LRootCid = "root" // Root CID
LFileName = "filename" // Local file path
LMTime = "mtime" // File modification timestamp
LCARv2FilePath = "CARv2Path" // path of the CARv2 file.
)
func New(mds *multistore.MultiStore, ds datastore.Batching) *Mgr {
func New(mds *multistore.MultiStore, ds datastore.Batching, repoPath string) *Mgr {
return &Mgr{
mds: mds,
Blockstore: blockstore.Adapt(mds.MultiReadBlockstore()),
repoPath: repoPath,
ds: datastore.NewLogDatastore(namespace.Wrap(ds, datastore.NewKey("/stores")), "storess"),
}
@ -81,8 +85,29 @@ func (m *Mgr) AddLabel(id multistore.StoreID, key, value string) error { // sour
return m.ds.Put(datastore.NewKey(fmt.Sprintf("%d", id)), meta)
}
func (m *Mgr) List() []multistore.StoreID {
return m.mds.List()
func (m *Mgr) List() ([]multistore.StoreID, error) {
var keys []multistore.StoreID
qres, err := m.ds.Query(query.Query{KeysOnly: true})
if err != nil {
return nil, xerrors.Errorf("query error: %w", err)
}
defer qres.Close() //nolint:errcheck
for r := range qres.Next() {
k := r.Key
if string(k[0]) == "/" {
k = k[1:]
}
id, err := strconv.ParseUint(k, 10, 64)
if err != nil {
return nil, xerrors.Errorf("failed to parse key %s to uint64, err=%w", r.Key, err)
}
keys = append(keys, multistore.StoreID(id))
}
return keys, nil
}
func (m *Mgr) Info(id multistore.StoreID) (*StoreMeta, error) {
@ -110,3 +135,17 @@ func (m *Mgr) Remove(id multistore.StoreID) error {
return nil
}
func (a *Mgr) NewTempFile(id multistore.StoreID) (string, error) {
file, err := ioutil.TempFile(a.repoPath, fmt.Sprintf("%d", id))
if err != nil {
return "", xerrors.Errorf("failed to create temp file: %w", err)
}
// close the file as we need to return the path here.
if err := file.Close(); err != nil {
return "", xerrors.Errorf("failed to close temp file: %w", err)
}
return file.Name(), nil
}