extract unixfs filestore into lib
This commit is contained in:
parent
a94e47c6be
commit
0ed8647b6f
@ -1,4 +1,4 @@
|
|||||||
package client
|
package unixfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -19,12 +19,15 @@ import (
|
|||||||
"github.com/ipfs/go-merkledag"
|
"github.com/ipfs/go-merkledag"
|
||||||
"github.com/ipfs/go-unixfs/importer/balanced"
|
"github.com/ipfs/go-unixfs/importer/balanced"
|
||||||
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
|
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
|
||||||
|
mh "github.com/multiformats/go-multihash"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
)
|
)
|
||||||
|
|
||||||
func unixFSCidBuilder() (cid.Builder, error) {
|
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
|
||||||
|
|
||||||
|
func CidBuilder() (cid.Builder, error) {
|
||||||
prefix, err := merkledag.PrefixForCidVersion(1)
|
prefix, err := merkledag.PrefixForCidVersion(1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to initialize UnixFS CID Builder: %w", err)
|
return nil, fmt.Errorf("failed to initialize UnixFS CID Builder: %w", err)
|
||||||
@ -37,9 +40,9 @@ func unixFSCidBuilder() (cid.Builder, error) {
|
|||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// createUnixFSFilestore takes a standard file whose path is src, forms a UnixFS DAG, and
|
// CreateFilestore takes a standard file whose path is src, forms a UnixFS DAG, and
|
||||||
// writes a CARv2 file with positional mapping (backed by the go-filestore library).
|
// writes a CARv2 file with positional mapping (backed by the go-filestore library).
|
||||||
func (a *API) createUnixFSFilestore(ctx context.Context, srcPath string, dstPath string) (cid.Cid, error) {
|
func CreateFilestore(ctx context.Context, srcPath string, dstPath string) (cid.Cid, error) {
|
||||||
// This method uses a two-phase approach with a staging CAR blockstore and
|
// This method uses a two-phase approach with a staging CAR blockstore and
|
||||||
// a final CAR blockstore.
|
// a final CAR blockstore.
|
||||||
//
|
//
|
||||||
@ -80,7 +83,7 @@ func (a *API) createUnixFSFilestore(ctx context.Context, srcPath string, dstPath
|
|||||||
return cid.Undef, xerrors.Errorf("failed to create temporary filestore: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to create temporary filestore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
finalRoot1, err := buildUnixFS(ctx, file, fstore, true)
|
finalRoot1, err := Build(ctx, file, fstore, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = fstore.Close()
|
_ = fstore.Close()
|
||||||
return cid.Undef, xerrors.Errorf("failed to import file to store to compute root: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to import file to store to compute root: %w", err)
|
||||||
@ -102,7 +105,7 @@ func (a *API) createUnixFSFilestore(ctx context.Context, srcPath string, dstPath
|
|||||||
return cid.Undef, xerrors.Errorf("failed to rewind file: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to rewind file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
finalRoot2, err := buildUnixFS(ctx, file, bs, true)
|
finalRoot2, err := Build(ctx, file, bs, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = bs.Close()
|
_ = bs.Close()
|
||||||
return cid.Undef, xerrors.Errorf("failed to create UnixFS DAG with carv2 blockstore: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to create UnixFS DAG with carv2 blockstore: %w", err)
|
||||||
@ -119,10 +122,10 @@ func (a *API) createUnixFSFilestore(ctx context.Context, srcPath string, dstPath
|
|||||||
return finalRoot1, nil
|
return finalRoot1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// buildUnixFS builds a UnixFS DAG out of the supplied reader,
|
// Build builds a UnixFS DAG out of the supplied reader,
|
||||||
// and imports the DAG into the supplied service.
|
// and imports the DAG into the supplied service.
|
||||||
func buildUnixFS(ctx context.Context, reader io.Reader, into bstore.Blockstore, filestore bool) (cid.Cid, error) {
|
func Build(ctx context.Context, reader io.Reader, into bstore.Blockstore, filestore bool) (cid.Cid, error) {
|
||||||
b, err := unixFSCidBuilder()
|
b, err := CidBuilder()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
@ -1,5 +1,5 @@
|
|||||||
//stm: #unit
|
//stm: #unit
|
||||||
package client
|
package unixfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
@ -21,8 +21,6 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-fil-markets/stores"
|
"github.com/filecoin-project/go-fil-markets/stores"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/node/repo/imports"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// This test uses a full "dense" CARv2, and not a filestore (positional mapping).
|
// This test uses a full "dense" CARv2, and not a filestore (positional mapping).
|
||||||
@ -42,7 +40,7 @@ func TestRoundtripUnixFS_Dense(t *testing.T) {
|
|||||||
blockstore.UseWholeCIDs(true))
|
blockstore.UseWholeCIDs(true))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
root, err := buildUnixFS(ctx, bytes.NewBuffer(inputContents), bs, false)
|
root, err := Build(ctx, bytes.NewBuffer(inputContents), bs, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEqual(t, cid.Undef, root)
|
require.NotEqual(t, cid.Undef, root)
|
||||||
require.NoError(t, bs.Finalize())
|
require.NoError(t, bs.Finalize())
|
||||||
@ -78,9 +76,6 @@ func TestRoundtripUnixFS_Dense(t *testing.T) {
|
|||||||
func TestRoundtripUnixFS_Filestore(t *testing.T) {
|
func TestRoundtripUnixFS_Filestore(t *testing.T) {
|
||||||
//stm: @CLIENT_DATA_IMPORT_001
|
//stm: @CLIENT_DATA_IMPORT_001
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
a := &API{
|
|
||||||
Imports: &imports.Manager{},
|
|
||||||
}
|
|
||||||
|
|
||||||
inputPath, inputContents := genInputFile(t)
|
inputPath, inputContents := genInputFile(t)
|
||||||
defer os.Remove(inputPath) //nolint:errcheck
|
defer os.Remove(inputPath) //nolint:errcheck
|
||||||
@ -88,7 +83,7 @@ func TestRoundtripUnixFS_Filestore(t *testing.T) {
|
|||||||
dst := newTmpFile(t)
|
dst := newTmpFile(t)
|
||||||
defer os.Remove(dst) //nolint:errcheck
|
defer os.Remove(dst) //nolint:errcheck
|
||||||
|
|
||||||
root, err := a.createUnixFSFilestore(ctx, inputPath, dst)
|
root, err := CreateFilestore(ctx, inputPath, dst)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEqual(t, cid.Undef, root)
|
require.NotEqual(t, cid.Undef, root)
|
||||||
|
|
@ -42,7 +42,6 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/multiformats/go-multibase"
|
"github.com/multiformats/go-multibase"
|
||||||
mh "github.com/multiformats/go-multihash"
|
|
||||||
"go.uber.org/fx"
|
"go.uber.org/fx"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
@ -56,6 +55,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
|
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
|
||||||
"github.com/filecoin-project/go-fil-markets/stores"
|
"github.com/filecoin-project/go-fil-markets/stores"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/lib/unixfs"
|
||||||
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
"github.com/filecoin-project/lotus/markets/retrievaladapter"
|
||||||
"github.com/filecoin-project/lotus/markets/storageadapter"
|
"github.com/filecoin-project/lotus/markets/storageadapter"
|
||||||
|
|
||||||
@ -79,7 +79,7 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("client")
|
var log = logging.Logger("client")
|
||||||
|
|
||||||
var DefaultHashFunction = uint64(mh.BLAKE2B_MIN + 31)
|
var DefaultHashFunction = unixfs.DefaultHashFunction
|
||||||
|
|
||||||
// 8 days ~= SealDuration + PreCommit + MaxProveCommitDuration + 8 hour buffer
|
// 8 days ~= SealDuration + PreCommit + MaxProveCommitDuration + 8 hour buffer
|
||||||
const dealStartBufferHours uint64 = 8 * 24
|
const dealStartBufferHours uint64 = 8 * 24
|
||||||
@ -548,7 +548,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// perform the unixfs chunking.
|
// perform the unixfs chunking.
|
||||||
root, err = a.createUnixFSFilestore(ctx, ref.Path, carPath)
|
root, err = unixfs.CreateFilestore(ctx, ref.Path, carPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("failed to import file using unixfs: %w", err)
|
return nil, xerrors.Errorf("failed to import file using unixfs: %w", err)
|
||||||
}
|
}
|
||||||
@ -618,7 +618,7 @@ func (a *API) ClientImportLocal(ctx context.Context, r io.Reader) (cid.Cid, erro
|
|||||||
// once the DAG is formed and the root is calculated, we overwrite the
|
// once the DAG is formed and the root is calculated, we overwrite the
|
||||||
// inner carv1 header with the final root.
|
// inner carv1 header with the final root.
|
||||||
|
|
||||||
b, err := unixFSCidBuilder()
|
b, err := unixfs.CidBuilder()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, err
|
return cid.Undef, err
|
||||||
}
|
}
|
||||||
@ -635,7 +635,7 @@ func (a *API) ClientImportLocal(ctx context.Context, r io.Reader) (cid.Cid, erro
|
|||||||
return cid.Undef, xerrors.Errorf("failed to create carv2 read/write blockstore: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to create carv2 read/write blockstore: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
root, err := buildUnixFS(ctx, file, bs, false)
|
root, err := unixfs.Build(ctx, file, bs, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Undef, xerrors.Errorf("failed to build unixfs dag: %w", err)
|
return cid.Undef, xerrors.Errorf("failed to build unixfs dag: %w", err)
|
||||||
}
|
}
|
||||||
@ -1364,7 +1364,7 @@ func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath stri
|
|||||||
defer os.Remove(tmp) //nolint:errcheck
|
defer os.Remove(tmp) //nolint:errcheck
|
||||||
|
|
||||||
// generate and import the UnixFS DAG into a filestore (positional reference) CAR.
|
// generate and import the UnixFS DAG into a filestore (positional reference) CAR.
|
||||||
root, err := a.createUnixFSFilestore(ctx, ref.Path, tmp)
|
root, err := unixfs.CreateFilestore(ctx, ref.Path, tmp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("failed to import file using unixfs: %w", err)
|
return xerrors.Errorf("failed to import file using unixfs: %w", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user