Merge pull request #6979 from filecoin-project/raulk/review-dagstore

This commit is contained in:
raulk 2021-08-04 13:22:39 +01:00 committed by GitHub
commit e8e73e5374
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 593 additions and 415 deletions

Binary file not shown.

View File

@ -709,7 +709,7 @@ Response:
"ID": 3
},
"SectorNumber": 9,
"CARv2FilePath": "string value"
"InboundCAR": "string value"
}
```

4
go.mod
View File

@ -26,7 +26,7 @@ require (
github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.9.0
github.com/filecoin-project/dagstore v0.4.0
github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-bitfield v0.2.4
@ -36,7 +36,7 @@ require (
github.com/filecoin-project/go-data-transfer v1.7.2
github.com/filecoin-project/go-fil-commcid v0.1.0
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20210723183308-812a16dc01b1

8
go.sum
View File

@ -257,8 +257,8 @@ github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
github.com/filecoin-project/dagstore v0.4.0 h1:ZeoL5Gbgn4BSzKgghobfAO9a5nVqfVC4fQ/lNYn1GQo=
github.com/filecoin-project/dagstore v0.4.0/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18 h1:wn+3OyRwQzS+iYEkc3sO33XU3g5fEzP81Guh73ZHthw=
github.com/filecoin-project/dagstore v0.4.1-0.20210803144142-2ef26aaf8e18/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
@ -290,8 +290,8 @@ github.com/filecoin-project/go-fil-commcid v0.1.0/go.mod h1:Eaox7Hvus1JgPrL5+M3+
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 h1:imrrpZWEHRnNqqv0tN7LXep5bFEVOVmQWHJvl2mgsGo=
github.com/filecoin-project/go-fil-commp-hashhash v0.1.0/go.mod h1:73S8WSEWh9vr0fDJVnKADhfIv/d6dCbAGaAGWbdJEI8=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1 h1:xXgCsfEaA3wdof/N5mkD7PphDolhc9MAsIrO/QBH5Pg=
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210729173742-a44f98a4f8c1/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA=
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581 h1:zubk4E8s5KLw5Y2Or39A3Ob8c7DAT6FL/mJBs1dMkrQ=
github.com/filecoin-project/go-fil-markets v1.6.3-0.20210804120345-cdd492e1a581/go.mod h1:13+DUe7AaHekzgpQPbacdppRoqz0SyPlx48g0f/pRmA=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

View File

@ -0,0 +1,33 @@
package dagstore
import (
"io"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
)
// Blockstore promotes a dagstore.ReadBlockstore to a full closeable Blockstore,
// stubbing out the write methods with erroring implementations.
type Blockstore struct {
dagstore.ReadBlockstore
io.Closer
}
var _ bstore.Blockstore = (*Blockstore)(nil)
func (b *Blockstore) DeleteBlock(c cid.Cid) error {
return xerrors.Errorf("DeleteBlock called but not implemented")
}
func (b *Blockstore) Put(block blocks.Block) error {
return xerrors.Errorf("Put called but not implemented")
}
func (b *Blockstore) PutMany(blocks []blocks.Block) error {
return xerrors.Errorf("PutMany called but not implemented")
}

View File

@ -4,8 +4,6 @@ import (
"context"
"fmt"
"io"
"os"
"strconv"
"github.com/filecoin-project/dagstore/throttle"
"github.com/ipfs/go-cid"
@ -16,50 +14,36 @@ import (
"github.com/filecoin-project/go-fil-markets/shared"
)
// MaxConcurrentStorageCalls caps the amount of concurrent calls to the
// storage, so that we don't spam it during heavy processes like bulk migration.
var MaxConcurrentStorageCalls = func() int {
// TODO replace env with config.toml attribute.
v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY")
if ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
return concurrency
}
}
return 100
}()
type LotusAccessor interface {
type MinerAPI interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error)
IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error)
Start(ctx context.Context) error
}
type lotusAccessor struct {
type minerAPI struct {
pieceStore piecestore.PieceStore
rm retrievalmarket.RetrievalProviderNode
throttle throttle.Throttler
readyMgr *shared.ReadyManager
}
var _ LotusAccessor = (*lotusAccessor)(nil)
var _ MinerAPI = (*minerAPI)(nil)
func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) LotusAccessor {
return &lotusAccessor{
func NewMinerAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode, concurrency int) MinerAPI {
return &minerAPI{
pieceStore: store,
rm: rm,
throttle: throttle.Fixed(MaxConcurrentStorageCalls),
throttle: throttle.Fixed(concurrency),
readyMgr: shared.NewReadyManager(),
}
}
func (m *lotusAccessor) Start(_ context.Context) error {
func (m *minerAPI) Start(_ context.Context) error {
return m.readyMgr.FireReady(nil)
}
func (m *lotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
func (m *minerAPI) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool, error) {
err := m.readyMgr.AwaitReady()
if err != nil {
return false, xerrors.Errorf("failed while waiting for accessor to start: %w", err)
@ -107,7 +91,7 @@ func (m *lotusAccessor) IsUnsealed(ctx context.Context, pieceCid cid.Cid) (bool,
return false, nil
}
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
func (m *minerAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
err := m.readyMgr.AwaitReady()
if err != nil {
return nil, err
@ -179,7 +163,7 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid
return nil, lastErr
}
func (m *lotusAccessor) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
func (m *minerAPI) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) {
err := m.readyMgr.AwaitReady()
if err != nil {
return 0, err

View File

@ -74,7 +74,7 @@ func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
rpn := &mockRPN{
sectors: mockData,
}
api := NewLotusAccessor(ps, rpn)
api := NewMinerAPI(ps, rpn, 100)
require.NoError(t, api.Start(ctx))
// Add deals to piece store
@ -114,7 +114,7 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
ps := getPieceStore(t)
rpn := &mockRPN{}
api := NewLotusAccessor(ps, rpn)
api := NewMinerAPI(ps, rpn, 100)
require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10
@ -131,8 +131,6 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
}
func TestThrottle(t *testing.T) {
MaxConcurrentStorageCalls = 3
ctx := context.Background()
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
@ -143,7 +141,7 @@ func TestThrottle(t *testing.T) {
unsealedSectorID: "foo",
},
}
api := NewLotusAccessor(ps, rpn)
api := NewMinerAPI(ps, rpn, 3)
require.NoError(t, api.Start(ctx))
// Add a deal with data Length 10
@ -170,7 +168,7 @@ func TestThrottle(t *testing.T) {
}
time.Sleep(500 * time.Millisecond)
require.EqualValues(t, MaxConcurrentStorageCalls, atomic.LoadInt32(&rpn.calls)) // throttled
require.EqualValues(t, 3, atomic.LoadInt32(&rpn.calls)) // throttled
// allow to proceed.
rpn.lk.Unlock()

View File

@ -15,28 +15,29 @@ const lotusScheme = "lotus"
var _ mount.Mount = (*LotusMount)(nil)
// LotusMount is the Lotus implementation of a Sharded DAG Store Mount.
// A Filecoin Piece is treated as a Shard by this implementation.
type LotusMount struct {
Api LotusAccessor
PieceCid cid.Cid
}
// NewLotusMountTemplate is called when registering a mount with
// the DAG store registry.
// mountTemplate returns a templated LotusMount containing the supplied API.
//
// It is called when registering a mount type with the mount registry
// of the DAG store. It is used to reinstantiate mounts after a restart.
//
// The DAG store registry receives an instance of the mount (a "template").
// When the registry needs to deserialize a mount it clones the template then
// calls Deserialize on the cloned instance, which will have a reference to the
// lotus mount API supplied here.
func NewLotusMountTemplate(api LotusAccessor) *LotusMount {
return &LotusMount{Api: api}
func mountTemplate(api MinerAPI) *LotusMount {
return &LotusMount{API: api}
}
func NewLotusMount(pieceCid cid.Cid, api LotusAccessor) (*LotusMount, error) {
// LotusMount is a DAGStore mount implementation that fetches deal data
// from a PieceCID.
type LotusMount struct {
API MinerAPI
PieceCid cid.Cid
}
func NewLotusMount(pieceCid cid.Cid, api MinerAPI) (*LotusMount, error) {
return &LotusMount{
PieceCid: pieceCid,
Api: api,
API: api,
}, nil
}
@ -51,13 +52,12 @@ func (l *LotusMount) Deserialize(u *url.URL) error {
if err != nil {
return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err)
}
l.PieceCid = pieceCid
return nil
}
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
r, err := l.Api.FetchUnsealedPiece(ctx, l.PieceCid)
r, err := l.API.FetchUnsealedPiece(ctx, l.PieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.PieceCid, err)
}
@ -78,11 +78,11 @@ func (l *LotusMount) Close() error {
}
func (l *LotusMount) Stat(ctx context.Context) (mount.Stat, error) {
size, err := l.Api.GetUnpaddedCARSize(ctx, l.PieceCid)
size, err := l.API.GetUnpaddedCARSize(ctx, l.PieceCid)
if err != nil {
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.PieceCid, err)
}
isUnsealed, err := l.Api.IsUnsealed(ctx, l.PieceCid)
isUnsealed, err := l.API.IsUnsealed(ctx, l.PieceCid)
if err != nil {
return mount.Stat{}, xerrors.Errorf("failed to verify if we have the unsealed piece %s: %w", l.PieceCid, err)
}

View File

@ -55,7 +55,7 @@ func TestLotusMount(t *testing.T) {
// serialize url then deserialize from mount template -> should get back
// the same mount
url := mnt.Serialize()
mnt2 := NewLotusMountTemplate(mockLotusMountAPI)
mnt2 := mountTemplate(mockLotusMountAPI)
err = mnt2.Deserialize(url)
require.NoError(t, err)
@ -69,7 +69,7 @@ func TestLotusMount(t *testing.T) {
}
func TestLotusMountDeserialize(t *testing.T) {
api := &lotusAccessor{}
api := &minerAPI{}
bgen := blocksutil.NewBlockGenerator()
cid := bgen.Next().Cid()
@ -79,12 +79,12 @@ func TestLotusMountDeserialize(t *testing.T) {
u, err := url.Parse(us)
require.NoError(t, err)
mnt := NewLotusMountTemplate(api)
mnt := mountTemplate(api)
err = mnt.Deserialize(u)
require.NoError(t, err)
require.Equal(t, cid, mnt.PieceCid)
require.Equal(t, api, mnt.Api)
require.Equal(t, api, mnt.API)
// fails if cid is not valid
us = lotusScheme + "://" + "rand"
@ -111,7 +111,7 @@ func TestLotusMountRegistration(t *testing.T) {
mockLotusMountAPI := mock_dagstore.NewMockLotusAccessor(mockCtrl)
registry := mount.NewRegistry()
err = registry.Register(lotusScheme, NewLotusMountTemplate(mockLotusMountAPI))
err = registry.Register(lotusScheme, mountTemplate(mockLotusMountAPI))
require.NoError(t, err)
mnt, err := registry.Instantiate(u)

View File

@ -1,33 +0,0 @@
package dagstore
import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
)
// ReadOnlyBlockstore stubs out Blockstore mutators with methods that error out
type ReadOnlyBlockstore struct {
dagstore.ReadBlockstore
}
func NewReadOnlyBlockstore(rbs dagstore.ReadBlockstore) bstore.Blockstore {
return ReadOnlyBlockstore{ReadBlockstore: rbs}
}
func (r ReadOnlyBlockstore) DeleteBlock(c cid.Cid) error {
return xerrors.Errorf("DeleteBlock called but not implemented")
}
func (r ReadOnlyBlockstore) Put(block blocks.Block) error {
return xerrors.Errorf("Put called but not implemented")
}
func (r ReadOnlyBlockstore) PutMany(blocks []blocks.Block) error {
return xerrors.Errorf("PutMany called but not implemented")
}
var _ bstore.Blockstore = (*ReadOnlyBlockstore)(nil)

View File

@ -3,89 +3,73 @@ package dagstore
import (
"context"
"errors"
"io"
"os"
"sync"
"time"
"github.com/filecoin-project/dagstore/index"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
levelds "github.com/ipfs/go-ds-leveldb"
measure "github.com/ipfs/go-ds-measure"
logging "github.com/ipfs/go-log/v2"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/index"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-fil-markets/carstore"
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/stores"
)
const maxRecoverAttempts = 1
var log = logging.Logger("dagstore-wrapper")
// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
type MarketDAGStoreConfig struct {
TransientsDir string
IndexDir string
Datastore ds.Datastore
MaxConcurrentIndex int
MaxConcurrentReadyFetches int
GCInterval time.Duration
}
// DAGStore provides an interface for the DAG store that can be mocked out
// by tests
type DAGStore interface {
RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error
AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error
RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error
GC(ctx context.Context) (*dagstore.GCResult, error)
Close() error
Start(ctx context.Context) error
}
type closableBlockstore struct {
bstore.Blockstore
io.Closer
}
type Wrapper struct {
ctx context.Context
cancel context.CancelFunc
backgroundWg sync.WaitGroup
dagStore DAGStore
mountApi LotusAccessor
dagst dagstore.Interface
mountApi MinerAPI
failureCh chan dagstore.ShardResult
traceCh chan dagstore.Trace
gcInterval time.Duration
}
var _ shared.DagStoreWrapper = (*Wrapper)(nil)
var _ stores.DAGStoreWrapper = (*Wrapper)(nil)
func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrapper, error) {
func NewDAGStore(cfg config.DAGStoreConfig, mountApi MinerAPI) (*dagstore.DAGStore, *Wrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil {
return nil, xerrors.Errorf("failed to create registry: %w", err)
if err := registry.Register(lotusScheme, mountTemplate(mountApi)); err != nil {
return nil, nil, xerrors.Errorf("failed to create registry: %w", err)
}
// The dagstore will write Shard failures to the `failureCh` here.
failureCh := make(chan dagstore.ShardResult, 1)
// The dagstore will write Trace events to the `traceCh` here.
traceCh := make(chan dagstore.Trace, 32)
dstore, err := newDatastore(cfg.DatastoreDir)
if err != nil {
return nil, nil, xerrors.Errorf("failed to create dagstore datastore in %s: %w", cfg.DatastoreDir, err)
}
irepo, err := index.NewFSRepo(cfg.IndexDir)
if err != nil {
return nil, xerrors.Errorf("failed to initialise dagstore index repo")
return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo")
}
dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir,
IndexRepo: irepo,
Datastore: cfg.Datastore,
Datastore: dstore,
MountRegistry: registry,
FailureCh: failureCh,
TraceCh: traceCh,
@ -95,82 +79,109 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
MaxConcurrentReadyFetches: cfg.MaxConcurrentReadyFetches,
RecoverOnStart: dagstore.RecoverOnAcquire,
}
dagStore, err := dagstore.NewDAGStore(dcfg)
dagst, err := dagstore.NewDAGStore(dcfg)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store: %w", err)
return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err)
}
return &Wrapper{
dagStore: dagStore,
w := &Wrapper{
dagst: dagst,
mountApi: mountApi,
failureCh: failureCh,
traceCh: traceCh,
gcInterval: cfg.GCInterval,
}, nil
gcInterval: time.Duration(cfg.GCInterval),
}
func (ds *Wrapper) Start(ctx context.Context) error {
ds.ctx, ds.cancel = context.WithCancel(ctx)
return dagst, w, nil
}
// newDatastore creates a datastore under the given base directory
// for dagstore metadata.
func newDatastore(dir string) (ds.Batching, error) {
// Create the datastore directory if it doesn't exist yet.
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dir, err)
}
// Create a new LevelDB datastore
dstore, err := levelds.NewDatastore(dir, &levelds.Options{
Compression: ldbopts.NoCompression,
NoSync: false,
Strict: ldbopts.StrictAll,
ReadOnly: false,
})
if err != nil {
return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err)
}
// Keep statistics about the datastore
mds := measure.New("measure.", dstore)
return mds, nil
}
func (w *Wrapper) Start(ctx context.Context) error {
w.ctx, w.cancel = context.WithCancel(ctx)
// Run a go-routine to do DagStore GC.
ds.backgroundWg.Add(1)
go ds.dagStoreGCLoop()
w.backgroundWg.Add(1)
go w.gcLoop()
// run a go-routine to read the trace for debugging.
ds.backgroundWg.Add(1)
go ds.traceLoop()
w.backgroundWg.Add(1)
go w.traceLoop()
// Run a go-routine for shard recovery
if dss, ok := ds.dagStore.(*dagstore.DAGStore); ok {
ds.backgroundWg.Add(1)
go dagstore.RecoverImmediately(ds.ctx, dss, ds.failureCh, maxRecoverAttempts, ds.backgroundWg.Done)
if dss, ok := w.dagst.(*dagstore.DAGStore); ok {
w.backgroundWg.Add(1)
go dagstore.RecoverImmediately(w.ctx, dss, w.failureCh, maxRecoverAttempts, w.backgroundWg.Done)
}
return ds.dagStore.Start(ctx)
return w.dagst.Start(ctx)
}
func (ds *Wrapper) traceLoop() {
defer ds.backgroundWg.Done()
func (w *Wrapper) traceLoop() {
defer w.backgroundWg.Done()
for ds.ctx.Err() == nil {
for w.ctx.Err() == nil {
select {
// Log trace events from the DAG store
case tr := <-ds.traceCh:
case tr := <-w.traceCh:
log.Debugw("trace",
"shard-key", tr.Key.String(),
"op-type", tr.Op.String(),
"after", tr.After.String())
case <-ds.ctx.Done():
case <-w.ctx.Done():
return
}
}
}
func (ds *Wrapper) dagStoreGCLoop() {
defer ds.backgroundWg.Done()
func (w *Wrapper) gcLoop() {
defer w.backgroundWg.Done()
gcTicker := time.NewTicker(ds.gcInterval)
defer gcTicker.Stop()
ticker := time.NewTicker(w.gcInterval)
defer ticker.Stop()
for ds.ctx.Err() == nil {
for w.ctx.Err() == nil {
select {
// GC the DAG store on every tick
case <-gcTicker.C:
_, _ = ds.dagStore.GC(ds.ctx)
case <-ticker.C:
_, _ = w.dagst.GC(w.ctx)
// Exit when the DAG store wrapper is shutdown
case <-ds.ctx.Done():
case <-w.ctx.Done():
return
}
}
}
func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
func (w *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (stores.ClosableBlockstore, error) {
log.Debugf("acquiring shard for piece CID %s", pieceCid)
key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
log.Debugf("sent message to acquire shard for piece CID %s", pieceCid)
if err != nil {
@ -185,13 +196,13 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl
// we already have a transient file. However, we don't have it here
// and therefore we pass an empty file path.
carPath := ""
if err := shared.RegisterShardSync(ctx, ds, pieceCid, carPath, false); err != nil {
if err := stores.RegisterShardSync(ctx, w, pieceCid, carPath, false); err != nil {
return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err)
}
log.Warnw("successfully re-registered shard", "pieceCID", pieceCid)
resch = make(chan dagstore.ShardResult, 1)
if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
if err := w.dagst.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err)
}
}
@ -216,13 +227,13 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl
}
log.Debugf("successfully loaded blockstore for piece CID %s", pieceCid)
return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil
return &Blockstore{ReadBlockstore: bs, Closer: res.Accessor}, nil
}
func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
func (w *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
// Create a lotus mount with the piece CID
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, ds.mountApi)
mt, err := NewLotusMount(pieceCid, w.mountApi)
if err != nil {
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err)
}
@ -232,7 +243,7 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath
ExistingTransient: carPath,
LazyInitialization: !eagerInit,
}
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts)
err = w.dagst.RegisterShard(ctx, key, mt, resch, opts)
if err != nil {
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
}
@ -241,20 +252,20 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath
return nil
}
func (ds *Wrapper) Close() error {
func (w *Wrapper) Close() error {
// Cancel the context
ds.cancel()
w.cancel()
// Close the DAG store
log.Info("will close the dagstore")
if err := ds.dagStore.Close(); err != nil {
if err := w.dagst.Close(); err != nil {
return xerrors.Errorf("failed to close DAG store: %w", err)
}
log.Info("dagstore closed")
// Wait for the background go routine to exit
log.Info("waiting for dagstore background wrapper routines to exist")
ds.backgroundWg.Wait()
w.backgroundWg.Wait()
log.Info("exited dagstore background warpper routines")
return nil

View File

@ -10,6 +10,8 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
@ -26,13 +28,16 @@ func TestWrapperAcquireRecovery(t *testing.T) {
require.NoError(t, err)
// Create a DAG store wrapper
w, err := NewDagStoreWrapper(MarketDAGStoreConfig{
dagst, w, err := NewDAGStore(config.DAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
GCInterval: time.Millisecond,
DatastoreDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
}, mockLotusMount{})
require.NoError(t, err)
defer dagst.Close() //nolint:errcheck
// Return an error from acquire shard the first time
acquireShardErr := make(chan error, 1)
acquireShardErr <- xerrors.Errorf("unknown shard: %w", dagstore.ErrShardUnknown)
@ -45,7 +50,7 @@ func TestWrapperAcquireRecovery(t *testing.T) {
},
register: make(chan shard.Key, 1),
}
w.dagStore = mock
w.dagst = mock
mybs, err := w.LoadShard(ctx, pieceCid)
require.NoError(t, err)
@ -76,20 +81,23 @@ func TestWrapperBackground(t *testing.T) {
ctx := context.Background()
// Create a DAG store wrapper
w, err := NewDagStoreWrapper(MarketDAGStoreConfig{
dagst, w, err := NewDAGStore(config.DAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
GCInterval: time.Millisecond,
DatastoreDir: t.TempDir(),
GCInterval: config.Duration(1 * time.Millisecond),
}, mockLotusMount{})
require.NoError(t, err)
defer dagst.Close() //nolint:errcheck
// Create a mock DAG store in place of the real DAG store
mock := &mockDagStore{
gc: make(chan struct{}, 1),
recover: make(chan shard.Key, 1),
close: make(chan struct{}, 1),
}
w.dagStore = mock
w.dagst = mock
// Start up the wrapper
err = w.Start(ctx)
@ -128,6 +136,18 @@ type mockDagStore struct {
close chan struct{}
}
func (m *mockDagStore) DestroyShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.DestroyOpts) error {
panic("implement me")
}
func (m *mockDagStore) GetShardInfo(k shard.Key) (dagstore.ShardInfo, error) {
panic("implement me")
}
func (m *mockDagStore) AllShardsInfo() dagstore.AllShardsInfo {
panic("implement me")
}
func (m *mockDagStore) Start(_ context.Context) error {
return nil
}

View File

@ -66,6 +66,7 @@ var (
AutoNATSvcKey = special{10} // Libp2p option
BandwidthReporterKey = special{11} // Libp2p option
ConnGaterKey = special{12} // libp2p option
DAGStoreKey = special{13} // constructor returns multiple values
)
type invoke int

View File

@ -148,8 +148,8 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
// DAG Store
Override(new(dagstore.LotusAccessor), modules.NewLotusAccessor),
Override(new(*dagstore.Wrapper), modules.DagStoreWrapper),
Override(new(dagstore.MinerAPI), modules.NewMinerAPI),
Override(DAGStoreKey, modules.DAGStore),
// Markets (retrieval)
Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode),

View File

@ -188,6 +188,16 @@ func DefaultStorageMiner() *StorageMiner {
TerminateControl: []string{},
DealPublishControl: []string{},
},
// The default DAGStoreConfig doesn't define any paths for transients,
// indices and the datastore. Empty values will lead to these being
// placed under <repo>/dagStore.
DAGStore: DAGStoreConfig{
MaxConcurrentIndex: 5,
MaxConcurrentReadyFetches: 2,
MaxConcurrencyStorageCalls: 100,
GCInterval: Duration(1 * time.Minute),
},
}
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345"

View File

@ -125,6 +125,66 @@ and storage providers`,
Comment: ``,
},
},
"DAGStoreConfig": []DocField{
{
Name: "TransientsDir",
Type: "string",
Comment: `Path to the transients directory. The transients directory caches
unsealed deals that have been fetched from the storage subsystem for
serving retrievals. When empty or omitted, the default value applies.
Default value: $LOTUS_MARKETS_PATH/dagStore/transients (split deployment)
or $LOTUS_MINER_PATH/dagStore/transients (monolith deployment)`,
},
{
Name: "IndexDir",
Type: "string",
Comment: `Path to indices directory. When empty or omitted, the default value applies.
Default value: $LOTUS_MARKETS_PATH/dagStore/index (split deployment)
or $LOTUS_MINER_PATH/dagStore/index (monolith deployment)`,
},
{
Name: "DatastoreDir",
Type: "string",
Comment: `Path to datastore directory. The datastore is a KV store tracking the
state of shards known to the DAG store.
Default value: $LOTUS_MARKETS_PATH/dagStore/datastore (split deployment)
or $LOTUS_MINER_PATH/dagStore/datastore (monolith deployment)`,
},
{
Name: "MaxConcurrentIndex",
Type: "int",
Comment: `The maximum amount of indexing jobs that can run simultaneously.
Default value: 5.`,
},
{
Name: "MaxConcurrentReadyFetches",
Type: "int",
Comment: `The maximum amount of unsealed deals that can be fetched simultaneously
from the storage subsystem.
Default value: 2.`,
},
{
Name: "MaxConcurrencyStorageCalls",
Type: "int",
Comment: `The maximum number of simultaneous inflight API calls to the storage
subsystem.
Default value: 100.`,
},
{
Name: "GCInterval",
Type: "Duration",
Comment: `The time between calls to periodic dagstore GC, in time.Duration string
representation, e.g. 1m, 5m, 1h.
Default value: 1 minute.`,
},
},
"DealmakingConfig": []DocField{
{
Name: "ConsiderOnlineStorageDeals",
@ -753,6 +813,12 @@ Default is 20 (about once a week).`,
Name: "Addresses",
Type: "MinerAddressConfig",
Comment: ``,
},
{
Name: "DAGStore",
Type: "DAGStoreConfig",
Comment: ``,
},
},

View File

@ -50,6 +50,46 @@ type StorageMiner struct {
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
DAGStore DAGStoreConfig
}
type DAGStoreConfig struct {
// Path to the transients directory. The transients directory caches
// unsealed deals that have been fetched from the storage subsystem for
// serving retrievals. When empty or omitted, the default value applies.
// Default value: $LOTUS_MARKETS_PATH/dagStore/transients (split deployment)
// or $LOTUS_MINER_PATH/dagStore/transients (monolith deployment)
TransientsDir string
// Path to indices directory. When empty or omitted, the default value applies.
// Default value: $LOTUS_MARKETS_PATH/dagStore/index (split deployment)
// or $LOTUS_MINER_PATH/dagStore/index (monolith deployment)
IndexDir string
// Path to datastore directory. The datastore is a KV store tracking the
// state of shards known to the DAG store.
// Default value: $LOTUS_MARKETS_PATH/dagStore/datastore (split deployment)
// or $LOTUS_MINER_PATH/dagStore/datastore (monolith deployment)
DatastoreDir string
// The maximum amount of indexing jobs that can run simultaneously.
// Default value: 5.
MaxConcurrentIndex int
// The maximum amount of unsealed deals that can be fetched simultaneously
// from the storage subsystem.
// Default value: 2.
MaxConcurrentReadyFetches int
// The maximum number of simultaneous inflight API calls to the storage
// subsystem.
// Default value: 100.
MaxConcurrencyStorageCalls int
// The time between calls to periodic dagstore GC, in time.Duration string
// representation, e.g. 1m, 5m, 1h.
// Default value: 1 minute.
GCInterval Duration
}
type MinerSubsystemConfig struct {

View File

@ -10,8 +10,7 @@ import (
"sort"
"time"
"github.com/filecoin-project/go-fil-markets/filestorecaradapter"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/ipld/go-car"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
@ -26,7 +25,7 @@ import (
files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-merkledag"
unixfile "github.com/ipfs/go-unixfs/file"
"github.com/ipld/go-car"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
@ -47,6 +46,7 @@ import (
"github.com/filecoin-project/go-fil-markets/shared"
"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-fil-markets/storagemarket/network"
"github.com/filecoin-project/go-fil-markets/stores"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/v3/actors/builtin/market"
@ -54,6 +54,7 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/markets/utils"
@ -198,7 +199,7 @@ func (a *API) dealStarter(ctx context.Context, params *api.StartDealParams, isSt
Rt: st,
FastRetrieval: params.FastRetrieval,
VerifiedDeal: params.VerifiedDeal,
FilestoreCARv2FilePath: CARV2FilePath,
IndexedCAR: CARV2FilePath,
})
if err != nil {
@ -519,7 +520,7 @@ func (a *API) ClientImport(ctx context.Context, ref api.FileRef) (res *api.Impor
}
}()
root, err = a.importNormalFileToFilestoreCARv2(ctx, id, ref.Path, carFile)
root, err = a.doImport(ctx, ref.Path, carFile)
if err != nil {
return nil, xerrors.Errorf("failed to import normal file to CARv2: %w", err)
}
@ -1031,21 +1032,21 @@ func (w *lenWriter) Write(p []byte) (n int, err error) {
}
func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, error) {
fc, err := a.imgr().FilestoreCARV2FilePathFor(root)
path, err := a.imgr().FilestoreCARV2FilePathFor(root)
if err != nil {
return api.DataSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err)
}
if len(fc) == 0 {
if len(path) == 0 {
return api.DataSize{}, xerrors.New("no CARv2 file for root")
}
rdOnly, err := filestorecaradapter.NewReadOnlyFileStore(fc)
fs, err := stores.ReadOnlyFilestore(path)
if err != nil {
return api.DataSize{}, xerrors.Errorf("failed to open read only filestore: %w", err)
return api.DataSize{}, xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", path, err)
}
defer rdOnly.Close() //nolint:errcheck
defer fs.Close() //nolint:errcheck
dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly)))
dag := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs)))
w := lenWriter(0)
err = car.WriteCar(ctx, dag, []cid.Cid{root}, &w)
@ -1062,21 +1063,21 @@ func (a *API) ClientDealSize(ctx context.Context, root cid.Cid) (api.DataSize, e
}
func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCIDSize, error) {
fc, err := a.imgr().FilestoreCARV2FilePathFor(root)
path, err := a.imgr().FilestoreCARV2FilePathFor(root)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("failed to find CARv2 file for root: %w", err)
}
if len(fc) == 0 {
if len(path) == 0 {
return api.DataCIDSize{}, xerrors.New("no CARv2 file for root")
}
rdOnly, err := filestorecaradapter.NewReadOnlyFileStore(fc)
fs, err := stores.ReadOnlyFilestore(path)
if err != nil {
return api.DataCIDSize{}, xerrors.Errorf("failed to open read only blockstore: %w", err)
return api.DataCIDSize{}, xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", path, err)
}
defer rdOnly.Close() //nolint:errcheck
defer fs.Close() //nolint:errcheck
dag := merkledag.NewDAGService(blockservice.New(rdOnly, offline.Exchange(rdOnly)))
dag := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs)))
w := &writer.Writer{}
bw := bufio.NewWriterSize(w, int(writer.CommPBuf))
@ -1095,22 +1096,20 @@ func (a *API) ClientDealPieceCID(ctx context.Context, root cid.Cid) (api.DataCID
func (a *API) ClientGenCar(ctx context.Context, ref api.FileRef, outputPath string) error {
id := importmgr.ImportID(rand.Uint64())
tmpCARv2File, err := a.imgr().NewTempFile(id)
tmp, err := a.imgr().NewTempFile(id)
if err != nil {
return xerrors.Errorf("failed to create temp file: %w", err)
}
defer os.Remove(tmpCARv2File) //nolint:errcheck
defer os.Remove(tmp) //nolint:errcheck
root, err := a.importNormalFileToFilestoreCARv2(ctx, id, ref.Path, tmpCARv2File)
root, err := a.doImport(ctx, ref.Path, tmp)
if err != nil {
return xerrors.Errorf("failed to import normal file to CARv2")
}
// generate a deterministic CARv1 payload from the UnixFS DAG by doing an IPLD
// traversal over the Unixfs DAG in the CARv2 file using the "all selector" i.e the entire DAG selector.
fs, err := filestorecaradapter.NewReadOnlyFileStore(tmpCARv2File)
fs, err := stores.ReadOnlyFilestore(tmp)
if err != nil {
return xerrors.Errorf("failed to open read only CARv2 blockstore: %w", err)
return xerrors.Errorf("failed to open filestore from carv2 in path %s: %w", tmp, err)
}
defer fs.Close() //nolint:errcheck

View File

@ -2,18 +2,13 @@ package client
import (
"context"
"io/ioutil"
"os"
"github.com/filecoin-project/go-fil-markets/filestorecaradapter"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/filecoin-project/go-fil-markets/stores"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-cidutil"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-filestore"
chunker "github.com/ipfs/go-ipfs-chunker"
offline "github.com/ipfs/go-ipfs-exchange-offline"
files2 "github.com/ipfs/go-ipfs-files"
@ -21,56 +16,83 @@ import (
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/build"
)
// importNormalFileToFilestoreCARv2 transforms the client's "normal file" to a Unixfs IPLD DAG and writes out the DAG to a CARv2 file
// that can be used to back a filestore.
func (a *API) importNormalFileToFilestoreCARv2(ctx context.Context, importID importmgr.ImportID, inputFilePath string, outputCARv2Path string) (c cid.Cid, finalErr error) {
// TODO: We've currently put in a hack to create the Unixfs DAG as a CARv2 without using Badger.
// We first create the Unixfs DAG using a filestore to get the root of the Unixfs DAG.
// We can't create the UnixfsDAG right away using a CARv2 read-write blockstore as the blockstore
// needs the root of the DAG during instantiation to write out a valid CARv2 file.
// doImport takes a standard file (src), forms a UnixFS DAG, and writes a
// CARv2 file with positional mapping (backed by the go-filestore library).
func (a *API) doImport(ctx context.Context, src string, dst string) (cid.Cid, error) {
// This method uses a two-phase approach with a staging CAR blockstore and
// a final CAR blockstore.
//
// In the second pass, we create a CARv2 file with the root present using the root node we get in the above step.
// This hack should be fixed when CARv2 allows specifying the root AFTER finishing the CARv2 streaming write.
fm := filestore.NewFileManager(ds_sync.MutexWrap(datastore.NewMapDatastore()), "/")
fm.AllowFiles = true
fstore := filestore.NewFilestore(bstore.NewMemorySync(), fm)
// This is necessary because of https://github.com/ipld/go-car/issues/196
//
// TODO: do we need to chunk twice? Isn't the first output already in the
// right order? Can't we just copy the CAR file and replace the header?
f, err := ioutil.TempFile("", "")
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create temp file: %w", err)
}
_ = f.Close() // close; we only want the path.
tmp := f.Name()
defer os.Remove(tmp) //nolint:errcheck
// Step 1. Compute the UnixFS DAG and write it to a CARv2 file to get
// the root CID of the DAG.
fstore, err := stores.ReadWriteFilestore(tmp)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create temporary filestore: %w", err)
}
bsvc := blockservice.New(fstore, offline.Exchange(fstore))
defer bsvc.Close() //nolint:errcheck
dags := merkledag.NewDAGService(bsvc)
// ---- First Pass --- Write out the UnixFS DAG to a rootless CARv2 file by instantiating a read-write CARv2 blockstore without the root.
root, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc))
root, err := buildUnixFS(ctx, src, dags)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to import file to store: %w", err)
_ = fstore.Close()
return cid.Undef, xerrors.Errorf("failed to import file to store to compute root: %w", err)
}
//------ Second Pass --- Now that we have the root of the Unixfs DAG -> write out the Unixfs DAG to a CARv2 file with the root present by using a
// filestore backed by a read-write CARv2 blockstore.
fsb, err := filestorecaradapter.NewReadWriteFileStore(outputCARv2Path, []cid.Cid{root})
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create a CARv2 read-write blockstore: %w", err)
}
defer fsb.Close() //nolint:errcheck
bsvc = blockservice.New(fsb, offline.Exchange(fsb))
root2, err := importNormalFileToUnixfsDAG(ctx, inputFilePath, merkledag.NewDAGService(bsvc))
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create Unixfs DAG with CARv2 blockstore: %w", err)
if err := fstore.Close(); err != nil {
return cid.Undef, xerrors.Errorf("failed to finalize car filestore: %w", err)
}
if root != root2 {
// Step 2. We now have the root of the UnixFS DAG, and we can write the
// final CAR for real under `dst`.
bs, err := blockstore.OpenReadWrite(dst, []cid.Cid{root},
car.ZeroLengthSectionAsEOF(true),
blockstore.UseWholeCIDs(true))
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create a carv2 read/write blockstore: %w", err)
}
bsvc = blockservice.New(bs, offline.Exchange(bs))
dags = merkledag.NewDAGService(bsvc)
finalRoot, err := buildUnixFS(ctx, src, dags)
if err != nil {
_ = bs.Close()
return cid.Undef, xerrors.Errorf("failed to create UnixFS DAG with carv2 blockstore: %w", err)
}
if err := bs.Finalize(); err != nil {
return cid.Undef, xerrors.Errorf("failed to finalize car blockstore: %w", err)
}
if root != finalRoot {
return cid.Undef, xerrors.New("roots do not match")
}
return root, nil
}
// importNormalFileToUnixfsDAG transforms a client's normal file to a UnixfsDAG and imports the DAG to the given DAG service.
func importNormalFileToUnixfsDAG(ctx context.Context, inputFilePath string, dag ipld.DAGService) (cid.Cid, error) {
f, err := os.Open(inputFilePath)
// buildUnixFS builds a UnixFS DAG out of the supplied ordinary file,
// and imports the DAG into the supplied service.
func buildUnixFS(ctx context.Context, src string, dag ipld.DAGService) (cid.Cid, error) {
f, err := os.Open(src)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to open input file: %w", err)
}
@ -81,7 +103,7 @@ func importNormalFileToUnixfsDAG(ctx context.Context, inputFilePath string, dag
return cid.Undef, xerrors.Errorf("failed to stat file :%w", err)
}
file, err := files2.NewReaderPathFile(inputFilePath, f, stat)
file, err := files2.NewReaderPathFile(src, f, stat)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to create reader path file: %w", err)
}

View File

@ -4,12 +4,11 @@ import (
"context"
"io"
"io/ioutil"
"math/rand"
"os"
"strings"
"testing"
"github.com/filecoin-project/go-fil-markets/filestorecaradapter"
"github.com/filecoin-project/go-fil-markets/stores"
"github.com/filecoin-project/lotus/node/repo/importmgr"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
@ -22,39 +21,50 @@ import (
"github.com/stretchr/testify/require"
)
func TestImportNormalFileToUnixfsDAG(t *testing.T) {
// This test uses a full "dense" CARv2, and not a filestore (positional mapping).
func TestRoundtripUnixFS_Dense(t *testing.T) {
ctx := context.Background()
inputPath, inputContents := genNormalInputFile(t)
inputPath, inputContents := genInputFile(t)
defer os.Remove(inputPath) //nolint:errcheck
carv2File := genTmpFile(t)
carv2File := newTmpFile(t)
defer os.Remove(carv2File) //nolint:errcheck
// import a normal file to a Unixfs DAG using a CARv2 read-write blockstore and flush it out to a CARv2 file.
tempCARv2Store, err := blockstore.OpenReadWrite(carv2File, []cid.Cid{}, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true))
// import a file to a Unixfs DAG using a CARv2 read/write blockstore.
path, err := blockstore.OpenReadWrite(carv2File, nil,
carv2.ZeroLengthSectionAsEOF(true),
blockstore.UseWholeCIDs(true))
require.NoError(t, err)
bsvc := blockservice.New(tempCARv2Store, offline.Exchange(tempCARv2Store))
root, err := importNormalFileToUnixfsDAG(ctx, inputPath, merkledag.NewDAGService(bsvc))
bsvc := blockservice.New(path, offline.Exchange(path))
dags := merkledag.NewDAGService(bsvc)
root, err := buildUnixFS(ctx, inputPath, dags)
require.NoError(t, err)
require.NotEqual(t, cid.Undef, root)
require.NoError(t, tempCARv2Store.Finalize())
require.NoError(t, path.Finalize())
// convert the CARv2 file to a normal file again and ensure the contents match.
readOnly, err := blockstore.OpenReadOnly(carv2File, carv2.ZeroLengthSectionAsEOF(true), blockstore.UseWholeCIDs(true))
// reconstruct the file.
readOnly, err := blockstore.OpenReadOnly(carv2File,
carv2.ZeroLengthSectionAsEOF(true),
blockstore.UseWholeCIDs(true))
require.NoError(t, err)
defer readOnly.Close() //nolint:errcheck
dag := merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly)))
nd, err := dag.Get(ctx, root)
require.NoError(t, err)
file, err := unixfile.NewUnixfsFile(ctx, dag, nd)
dags = merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly)))
nd, err := dags.Get(ctx, root)
require.NoError(t, err)
tmpOutput := genTmpFile(t)
file, err := unixfile.NewUnixfsFile(ctx, dags, nd)
require.NoError(t, err)
tmpOutput := newTmpFile(t)
defer os.Remove(tmpOutput) //nolint:errcheck
require.NoError(t, files.WriteTo(file, tmpOutput))
// ensure contents of the initial input file and the output file are identical.
fo, err := os.Open(tmpOutput)
require.NoError(t, err)
bz2, err := ioutil.ReadAll(fo)
@ -63,35 +73,36 @@ func TestImportNormalFileToUnixfsDAG(t *testing.T) {
require.Equal(t, inputContents, bz2)
}
func TestImportNormalFileToCARv2(t *testing.T) {
func TestRoundtripUnixFS_Filestore(t *testing.T) {
ctx := context.Background()
a := &API{
Imports: &importmgr.Mgr{},
}
importID := importmgr.ImportID(rand.Uint64())
inputFilePath, inputContents := genNormalInputFile(t)
inputFilePath, inputContents := genInputFile(t)
defer os.Remove(inputFilePath) //nolint:errcheck
outputCARv2 := genTmpFile(t)
defer os.Remove(outputCARv2) //nolint:errcheck
path := newTmpFile(t)
defer os.Remove(path) //nolint:errcheck
root, err := a.importNormalFileToFilestoreCARv2(ctx, importID, inputFilePath, outputCARv2)
root, err := a.doImport(ctx, inputFilePath, path)
require.NoError(t, err)
require.NotEqual(t, cid.Undef, root)
// convert the CARv2 to a normal file again and ensure the contents match
readOnly, err := filestorecaradapter.NewReadOnlyFileStore(outputCARv2)
fs, err := stores.ReadOnlyFilestore(path)
require.NoError(t, err)
defer readOnly.Close() //nolint:errcheck
dag := merkledag.NewDAGService(blockservice.New(readOnly, offline.Exchange(readOnly)))
defer fs.Close() //nolint:errcheck
nd, err := dag.Get(ctx, root)
require.NoError(t, err)
file, err := unixfile.NewUnixfsFile(ctx, dag, nd)
dags := merkledag.NewDAGService(blockservice.New(fs, offline.Exchange(fs)))
nd, err := dags.Get(ctx, root)
require.NoError(t, err)
tmpOutput := genTmpFile(t)
file, err := unixfile.NewUnixfsFile(ctx, dags, nd)
require.NoError(t, err)
tmpOutput := newTmpFile(t)
defer os.Remove(tmpOutput) //nolint:errcheck
require.NoError(t, files.WriteTo(file, tmpOutput))
@ -104,14 +115,14 @@ func TestImportNormalFileToCARv2(t *testing.T) {
require.Equal(t, inputContents, bz2)
}
func genTmpFile(t *testing.T) string {
func newTmpFile(t *testing.T) string {
f, err := os.CreateTemp("", "")
require.NoError(t, err)
require.NoError(t, f.Close())
return f.Name()
}
func genNormalInputFile(t *testing.T) (filepath string, contents []byte) {
func genInputFile(t *testing.T) (filepath string, contents []byte) {
s := strings.Repeat("abcde", 100)
tmp, err := os.CreateTemp("", "")
require.NoError(t, err)

View File

@ -182,7 +182,7 @@ func StorageClient(lc fx.Lifecycle, h host.Host, dataTransfer dtypes.ClientDataT
func RetrievalClient(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dt dtypes.ClientDataTransfer, payAPI payapi.PaychAPI, resolver discovery.PeerResolver,
ds dtypes.MetadataDS, chainAPI full.ChainAPI, stateAPI full.StateAPI, j journal.Journal) (retrievalmarket.RetrievalClient, error) {
carsPath := filepath.Join(r.Path(), dagStore, "retrieval-cars")
carsPath := filepath.Join(r.Path(), DefaultDAGStoreDir, "retrieval-cars")
if err := os.MkdirAll(carsPath, 0755); err != nil {
return nil, xerrors.Errorf("failed to create dir")

View File

@ -7,9 +7,7 @@ import (
"fmt"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
@ -17,22 +15,6 @@ import (
"go.uber.org/multierr"
"golang.org/x/xerrors"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
levelds "github.com/ipfs/go-ds-leveldb"
measure "github.com/ipfs/go-ds-measure"
graphsync "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
ldbopts "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/filecoin-project/go-address"
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
@ -52,6 +34,18 @@ import (
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
"github.com/filecoin-project/go-storedcounter"
"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
graphsync "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
"github.com/ipfs/go-merkledag"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
@ -82,7 +76,6 @@ import (
)
var StorageCounterDSPrefix = "/storage/nextid"
var dagStore = "dagStore"
func minerAddrFromDS(ds dtypes.MetadataDS) (address.Address, error) {
maddrb, err := ds.Get(datastore.NewKey("miner-address"))
@ -581,101 +574,6 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
}
}
func NewLotusAccessor(lc fx.Lifecycle,
pieceStore dtypes.ProviderPieceStore,
rpn retrievalmarket.RetrievalProviderNode,
) (dagstore.LotusAccessor, error) {
mountApi := dagstore.NewLotusAccessor(pieceStore, rpn)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
})
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
if err := <-ready; err != nil {
return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err)
}
return mountApi.Start(ctx)
},
OnStop: func(context.Context) error {
return nil
},
})
return mountApi, nil
}
func DagStoreWrapper(
lc fx.Lifecycle,
r repo.LockedRepo,
lotusAccessor dagstore.LotusAccessor,
) (*dagstore.Wrapper, error) {
dagStoreDir := filepath.Join(r.Path(), dagStore)
dagStoreDS, err := createDAGStoreDatastore(dagStoreDir)
if err != nil {
return nil, err
}
var maxCopies = 2
// TODO replace env with config.toml attribute.
v, ok := os.LookupEnv("LOTUS_DAGSTORE_COPY_CONCURRENCY")
if ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
maxCopies = concurrency
}
}
cfg := dagstore.MarketDAGStoreConfig{
TransientsDir: filepath.Join(dagStoreDir, "transients"),
IndexDir: filepath.Join(dagStoreDir, "index"),
Datastore: dagStoreDS,
GCInterval: 1 * time.Minute,
MaxConcurrentIndex: 5,
MaxConcurrentReadyFetches: maxCopies,
}
dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err)
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return dsw.Start(ctx)
},
OnStop: func(context.Context) error {
return dsw.Close()
},
})
return dsw, nil
}
// createDAGStoreDatastore creates a datastore under the given base directory
// for DAG store metadata
func createDAGStoreDatastore(baseDir string) (datastore.Batching, error) {
// Create a datastore directory under the base dir if it doesn't already exist
dsDir := path.Join(baseDir, "datastore")
if err := os.MkdirAll(dsDir, 0755); err != nil {
return nil, xerrors.Errorf("failed to create directory %s for DAG store datastore: %w", dsDir, err)
}
// Create a new LevelDB datastore
ds, err := levelds.NewDatastore(dsDir, &levelds.Options{
Compression: ldbopts.NoCompression,
NoSync: false,
Strict: ldbopts.StrictAll,
ReadOnly: false,
})
if err != nil {
return nil, xerrors.Errorf("failed to open datastore for DAG store: %w", err)
}
// Keep statistics about the datastore
mds := measure.New("measure.", ds)
return mds, nil
}
func StorageProvider(minerAddress dtypes.MinerAddress,
storedAsk *storedask.StoredAsk,
h host.Host, ds dtypes.MetadataDS,
@ -692,7 +590,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
return nil, err
}
dagStorePath := filepath.Join(r.Path(), dagStore)
dagStorePath := filepath.Join(r.Path(), DefaultDAGStoreDir)
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
shardMigrator := storageimpl.NewShardMigrator(address.Address(minerAddress), dagStorePath, dsw, pieceStore, spn)

View File

@ -0,0 +1,118 @@
package modules
import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"go.uber.org/fx"
"golang.org/x/xerrors"
mdagstore "github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/repo"
)
const (
EnvDAGStoreCopyConcurrency = "LOTUS_DAGSTORE_COPY_CONCURRENCY"
DefaultDAGStoreDir = "dagStore"
)
// NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts.
func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, rpn retrievalmarket.RetrievalProviderNode) (mdagstore.MinerAPI, error) {
cfg, err := extractDAGStoreConfig(r)
if err != nil {
return nil, err
}
// caps the amount of concurrent calls to the storage, so that we don't
// spam it during heavy processes like bulk migration.
if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrencyStorageCalls = concurrency
}
}
mountApi := mdagstore.NewMinerAPI(pieceStore, rpn, cfg.MaxConcurrencyStorageCalls)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
})
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
if err := <-ready; err != nil {
return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err)
}
return mountApi.Start(ctx)
},
OnStop: func(context.Context) error {
return nil
},
})
return mountApi, nil
}
// DAGStore constructs a DAG store using the supplied minerAPI, and the
// user configuration. It returns both the DAGStore and the Wrapper suitable for
// passing to markets.
func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) {
cfg, err := extractDAGStoreConfig(r)
if err != nil {
return nil, nil, err
}
// populate default directories if not explicitly set in the config.
defaultDir := filepath.Join(r.Path(), DefaultDAGStoreDir)
if cfg.TransientsDir == "" {
cfg.TransientsDir = filepath.Join(defaultDir, "transients")
}
if cfg.IndexDir == "" {
cfg.IndexDir = filepath.Join(defaultDir, "index")
}
if cfg.DatastoreDir == "" {
cfg.DatastoreDir = filepath.Join(defaultDir, "datastore")
}
v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency)
if ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrentReadyFetches = concurrency
}
}
dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI)
if err != nil {
return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err)
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return w.Start(ctx)
},
OnStop: func(context.Context) error {
return w.Close()
},
})
return dagst, w, nil
}
func extractDAGStoreConfig(r repo.LockedRepo) (config.DAGStoreConfig, error) {
cfg, err := r.Config()
if err != nil {
return config.DAGStoreConfig{}, xerrors.Errorf("could not load config: %w", err)
}
mcfg, ok := cfg.(*config.StorageMiner)
if !ok {
return config.DAGStoreConfig{}, xerrors.Errorf("config not expected type; expected config.StorageMiner, got: %T", cfg)
}
return mcfg.DAGStore, nil
}