Merge pull request #6793 from filecoin-project/refactor/mv-lotus-mount

move lotus mount, dag store etc from markets to lotus
This commit is contained in:
Aarsh Shah 2021-07-20 20:33:46 +05:30 committed by GitHub
commit 44496afc66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1018 additions and 13 deletions

4
go.mod
View File

@ -26,6 +26,7 @@ require (
github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.9.0
github.com/filecoin-project/dagstore v0.1.0
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-bitfield v0.2.4
@ -34,7 +35,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.7.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
@ -78,6 +79,7 @@ require (
github.com/ipfs/go-fs-lock v0.0.6
github.com/ipfs/go-graphsync v0.6.4
github.com/ipfs/go-ipfs-blockstore v1.0.3
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1

4
go.sum
View File

@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e h1:mJHQp7htPo04N1IQjnYneUoif1FcsN43KuHvMbeNF7E=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719084150-3111d5504a9e/go.mod h1:rfdpy6u0CdbknZNxRb5+7t7+yaPAAk4xLLhPaQICr0c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd h1:5Gg9NyMV/5FauQu497je92yPbu8o2kbnb14eI7wKvBg=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210719131749-0459d0c576bd/go.mod h1:21Kl9Ml8XIueT5o1UIqjk9XX88UKkRqSSh+VmEqT7To=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

Binary file not shown.

View File

@ -0,0 +1,91 @@
package dagstore
import (
"context"
"io"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)
type LotusAccessor interface {
FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error)
GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error)
}
type lotusAccessor struct {
pieceStore piecestore.PieceStore
rm retrievalmarket.RetrievalProviderNode
}
var _ LotusAccessor = (*lotusAccessor)(nil)
func NewLotusMountAPI(store piecestore.PieceStore, rm retrievalmarket.RetrievalProviderNode) *lotusAccessor {
return &lotusAccessor{
pieceStore: store,
rm: rm,
}
}
func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
if len(pieceInfo.Deals) == 0 {
return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid)
}
// prefer an unsealed sector containing the piece if one exists
for _, deal := range pieceInfo.Deals {
isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
log.Warnf("failed to check if deal %d unsealed: %s", deal.DealID, err)
continue
}
if isUnsealed {
// UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around.
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err == nil {
return reader, nil
}
}
}
lastErr := xerrors.New("no sectors found to unseal from")
// if there is no unsealed sector containing the piece, just read the piece from the first sector we are able to unseal.
for _, deal := range pieceInfo.Deals {
// Note that if the deal data is not already unsealed, unsealing may
// block for a long time with the current PoRep
reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded())
if err != nil {
lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err)
log.Warn(lastErr.Error())
continue
}
// Successfully fetched the deal data so return a reader over the data
return reader, nil
}
return nil, lastErr
}
func (m *lotusAccessor) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid)
if err != nil {
return 0, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err)
}
if len(pieceInfo.Deals) == 0 {
return 0, xerrors.Errorf("no storage deals found for piece %s", pieceCid)
}
len := pieceInfo.Deals[0].Length
return uint64(len), nil
}

View File

@ -0,0 +1,166 @@
package dagstore
import (
"bytes"
"context"
"io"
"testing"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/paych"
"github.com/filecoin-project/go-fil-markets/piecestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/shared"
)
const unsealedSectorID = abi.SectorNumber(1)
const sealedSectorID = abi.SectorNumber(2)
func TestLotusAccessorFetchUnsealedPiece(t *testing.T) {
ctx := context.Background()
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
unsealedSectorData := "unsealed"
sealedSectorData := "sealed"
mockData := map[abi.SectorNumber]string{
unsealedSectorID: unsealedSectorData,
sealedSectorID: sealedSectorData,
}
testCases := []struct {
name string
deals []abi.SectorNumber
fetchedData string
expectErr bool
}{{
// Expect error if there is no deal info for piece CID
name: "no deals",
expectErr: true,
}, {
// Expect the API to always fetch the unsealed deal (because it's
// cheaper than fetching the sealed deal)
name: "prefer unsealed deal",
deals: []abi.SectorNumber{unsealedSectorID, sealedSectorID},
fetchedData: unsealedSectorData,
}, {
// Expect the API to unseal the data if there are no unsealed deals
name: "unseal if necessary",
deals: []abi.SectorNumber{sealedSectorID},
fetchedData: sealedSectorData,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ps := getPieceStore(t)
rpn := &mockRPN{
sectors: mockData,
}
api := NewLotusMountAPI(ps, rpn)
// Add deals to piece store
for _, sectorID := range tc.deals {
dealInfo := piecestore.DealInfo{
SectorID: sectorID,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)
}
// Fetch the piece
r, err := api.FetchUnsealedPiece(ctx, cid1)
if tc.expectErr {
require.Error(t, err)
return
}
// Check that the returned reader is for the correct piece
require.NoError(t, err)
bz, err := io.ReadAll(r)
require.NoError(t, err)
require.Equal(t, tc.fetchedData, string(bz))
})
}
}
func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) {
cid1, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
ps := getPieceStore(t)
rpn := &mockRPN{}
api := NewLotusMountAPI(ps, rpn)
// Add a deal with data Length 10
dealInfo := piecestore.DealInfo{
Length: 10,
}
err = ps.AddDealForPiece(cid1, dealInfo)
require.NoError(t, err)
// Check that the data length is correct
len, err := api.GetUnpaddedCARSize(cid1)
require.NoError(t, err)
require.EqualValues(t, 10, len)
}
func getPieceStore(t *testing.T) piecestore.PieceStore {
ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore()))
require.NoError(t, err)
err = ps.Start(context.Background())
require.NoError(t, err)
ready := make(chan error)
ps.OnReady(func(err error) {
ready <- err
})
err = <-ready
require.NoError(t, err)
return ps
}
type mockRPN struct {
sectors map[abi.SectorNumber]string
}
func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) {
data, ok := m.sectors[sectorID]
if !ok {
panic("sector not found")
}
return io.NopCloser(bytes.NewBuffer([]byte(data))), nil
}
func (m *mockRPN) IsUnsealed(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (bool, error) {
return sectorID == unsealedSectorID, nil
}
func (m *mockRPN) GetChainHead(ctx context.Context) (shared.TipSetToken, abi.ChainEpoch, error) {
panic("implement me")
}
func (m *mockRPN) GetMinerWorkerAddress(ctx context.Context, miner address.Address, tok shared.TipSetToken) (address.Address, error) {
panic("implement me")
}
func (m *mockRPN) SavePaymentVoucher(ctx context.Context, paymentChannel address.Address, voucher *paych.SignedVoucher, proof []byte, expectedAmount abi.TokenAmount, tok shared.TipSetToken) (abi.TokenAmount, error) {
panic("implement me")
}
func (m *mockRPN) GetRetrievalPricingInput(ctx context.Context, pieceCID cid.Cid, storageDeals []abi.DealID) (retrievalmarket.PricingInput, error) {
panic("implement me")
}
var _ retrievalmarket.RetrievalProviderNode = (*mockRPN)(nil)

View File

@ -0,0 +1,67 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: mount_api.go
// Package mock_dagstore is a generated GoMock package.
package mock_dagstore
import (
context "context"
io "io"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
cid "github.com/ipfs/go-cid"
)
// MockLotusMountAPI is a mock of LotusMountAPI interface.
type MockLotusMountAPI struct {
ctrl *gomock.Controller
recorder *MockLotusMountAPIMockRecorder
}
// MockLotusMountAPIMockRecorder is the mock recorder for MockLotusMountAPI.
type MockLotusMountAPIMockRecorder struct {
mock *MockLotusMountAPI
}
// NewMockLotusMountAPI creates a new mock instance.
func NewMockLotusMountAPI(ctrl *gomock.Controller) *MockLotusMountAPI {
mock := &MockLotusMountAPI{ctrl: ctrl}
mock.recorder = &MockLotusMountAPIMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLotusMountAPI) EXPECT() *MockLotusMountAPIMockRecorder {
return m.recorder
}
// FetchUnsealedPiece mocks base method.
func (m *MockLotusMountAPI) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FetchUnsealedPiece", ctx, pieceCid)
ret0, _ := ret[0].(io.ReadCloser)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FetchUnsealedPiece indicates an expected call of FetchUnsealedPiece.
func (mr *MockLotusMountAPIMockRecorder) FetchUnsealedPiece(ctx, pieceCid interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchUnsealedPiece", reflect.TypeOf((*MockLotusMountAPI)(nil).FetchUnsealedPiece), ctx, pieceCid)
}
// GetUnpaddedCARSize mocks base method.
func (m *MockLotusMountAPI) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetUnpaddedCARSize", pieceCid)
ret0, _ := ret[0].(uint64)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetUnpaddedCARSize indicates an expected call of GetUnpaddedCARSize.
func (mr *MockLotusMountAPIMockRecorder) GetUnpaddedCARSize(pieceCid interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUnpaddedCARSize", reflect.TypeOf((*MockLotusMountAPI)(nil).GetUnpaddedCARSize), pieceCid)
}

103
markets/dagstore/mount.go Normal file
View File

@ -0,0 +1,103 @@
package dagstore
import (
"context"
"io"
"net/url"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore/mount"
)
const lotusScheme = "lotus"
var _ mount.Mount = (*LotusMount)(nil)
// LotusMount is the Lotus implementation of a Sharded DAG Store Mount.
// A Filecoin Piece is treated as a Shard by this implementation.
type LotusMount struct {
api LotusAccessor
pieceCid cid.Cid
}
// This method is called when registering a mount with the DAG store registry.
// The DAG store registry receives an instance of the mount (a "template").
// When the registry needs to deserialize a mount it clones the template then
// calls Deserialize on the cloned instance, which will have a reference to the
// lotus mount API supplied here.
func NewLotusMountTemplate(api LotusAccessor) *LotusMount {
return &LotusMount{api: api}
}
func NewLotusMount(pieceCid cid.Cid, api LotusAccessor) (*LotusMount, error) {
return &LotusMount{
pieceCid: pieceCid,
api: api,
}, nil
}
func (l *LotusMount) Serialize() *url.URL {
return &url.URL{
Host: l.pieceCid.String(),
}
}
func (l *LotusMount) Deserialize(u *url.URL) error {
pieceCid, err := cid.Decode(u.Host)
if err != nil {
return xerrors.Errorf("failed to parse PieceCid from host '%s': %w", u.Host, err)
}
l.pieceCid = pieceCid
return nil
}
func (l *LotusMount) Fetch(ctx context.Context) (mount.Reader, error) {
r, err := l.api.FetchUnsealedPiece(ctx, l.pieceCid)
if err != nil {
return nil, xerrors.Errorf("failed to fetch unsealed piece %s: %w", l.pieceCid, err)
}
return &readCloser{r}, nil
}
func (l *LotusMount) Info() mount.Info {
return mount.Info{
Kind: mount.KindRemote,
AccessSequential: true,
AccessSeek: false,
AccessRandom: false,
}
}
func (l *LotusMount) Close() error {
return nil
}
func (l *LotusMount) Stat(_ context.Context) (mount.Stat, error) {
size, err := l.api.GetUnpaddedCARSize(l.pieceCid)
if err != nil {
return mount.Stat{}, xerrors.Errorf("failed to fetch piece size for piece %s: %w", l.pieceCid, err)
}
// TODO Mark false when storage deal expires.
return mount.Stat{
Exists: true,
Size: int64(size),
}, nil
}
type readCloser struct {
io.ReadCloser
}
var _ mount.Reader = (*readCloser)(nil)
func (r *readCloser) ReadAt(p []byte, off int64) (n int, err error) {
return 0, xerrors.Errorf("ReadAt called but not implemented")
}
func (r *readCloser) Seek(offset int64, whence int) (int64, error) {
return 0, xerrors.Errorf("Seek called but not implemented")
}

View File

@ -0,0 +1,93 @@
package dagstore
import (
"context"
"io/ioutil"
"net/url"
"strings"
"testing"
"github.com/golang/mock/gomock"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/dagstore/mount"
mock_dagstore "github.com/filecoin-project/lotus/markets/dagstore/mocks"
)
func TestLotusMount(t *testing.T) {
ctx := context.Background()
bgen := blocksutil.NewBlockGenerator()
cid := bgen.Next().Cid()
mockCtrl := gomock.NewController(t)
// when test is done, assert expectations on all mock objects.
defer mockCtrl.Finish()
// create a mock lotus api that returns the reader we want
mockLotusMountAPI := mock_dagstore.NewMockLotusMountAPI(mockCtrl)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
mockLotusMountAPI.EXPECT().FetchUnsealedPiece(gomock.Any(), cid).Return(&readCloser{ioutil.NopCloser(strings.NewReader("testing"))}, nil).Times(1)
mockLotusMountAPI.EXPECT().GetUnpaddedCARSize(cid).Return(uint64(100), nil).Times(1)
mnt, err := NewLotusMount(cid, mockLotusMountAPI)
require.NoError(t, err)
info := mnt.Info()
require.Equal(t, info.Kind, mount.KindRemote)
// fetch and assert success
rd, err := mnt.Fetch(context.Background())
require.NoError(t, err)
bz, err := ioutil.ReadAll(rd)
require.NoError(t, err)
require.NoError(t, rd.Close())
require.Equal(t, []byte("testing"), bz)
stat, err := mnt.Stat(ctx)
require.NoError(t, err)
require.EqualValues(t, 100, stat.Size)
// serialize url then deserialize from mount template -> should get back
// the same mount
url := mnt.Serialize()
mnt2 := NewLotusMountTemplate(mockLotusMountAPI)
err = mnt2.Deserialize(url)
require.NoError(t, err)
// fetching on this mount should get us back the same data.
rd, err = mnt2.Fetch(context.Background())
require.NoError(t, err)
bz, err = ioutil.ReadAll(rd)
require.NoError(t, err)
require.NoError(t, rd.Close())
require.Equal(t, []byte("testing"), bz)
}
func TestLotusMountDeserialize(t *testing.T) {
api := &lotusAccessor{}
bgen := blocksutil.NewBlockGenerator()
cid := bgen.Next().Cid()
// success
us := lotusScheme + "://" + cid.String()
u, err := url.Parse(us)
require.NoError(t, err)
mnt := NewLotusMountTemplate(api)
err = mnt.Deserialize(u)
require.NoError(t, err)
require.Equal(t, cid, mnt.pieceCid)
require.Equal(t, api, mnt.api)
// fails if cid is not valid
us = lotusScheme + "://" + "rand"
u, err = url.Parse(us)
require.NoError(t, err)
err = mnt.Deserialize(u)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to parse PieceCid")
}

View File

@ -0,0 +1,33 @@
package dagstore
import (
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
bstore "github.com/ipfs/go-ipfs-blockstore"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
)
// ReadOnlyBlockstore stubs out Blockstore mutators with methods that error out
type ReadOnlyBlockstore struct {
dagstore.ReadBlockstore
}
func NewReadOnlyBlockstore(rbs dagstore.ReadBlockstore) bstore.Blockstore {
return ReadOnlyBlockstore{ReadBlockstore: rbs}
}
func (r ReadOnlyBlockstore) DeleteBlock(c cid.Cid) error {
return xerrors.Errorf("DeleteBlock called but not implemented")
}
func (r ReadOnlyBlockstore) Put(block blocks.Block) error {
return xerrors.Errorf("Put called but not implemented")
}
func (r ReadOnlyBlockstore) PutMany(blocks []blocks.Block) error {
return xerrors.Errorf("PutMany called but not implemented")
}
var _ bstore.Blockstore = (*ReadOnlyBlockstore)(nil)

233
markets/dagstore/wrapper.go Normal file
View File

@ -0,0 +1,233 @@
package dagstore
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/filecoin-project/go-fil-markets/carstore"
"github.com/filecoin-project/go-fil-markets/shared"
)
var log = logging.Logger("dagstore-wrapper")
// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
type MarketDAGStoreConfig struct {
TransientsDir string
IndexDir string
Datastore ds.Datastore
MaxConcurrentFetch int
MaxConcurrentIndex int
GCInterval time.Duration
}
// DAGStore provides an interface for the DAG store that can be mocked out
// by tests
type DAGStore interface {
RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error
AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error
RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error
GC(ctx context.Context) (map[shard.Key]error, error)
Close() error
}
type closableBlockstore struct {
bstore.Blockstore
io.Closer
}
type Wrapper struct {
ctx context.Context
cancel context.CancelFunc
backgroundWg sync.WaitGroup
dagStore DAGStore
mountApi LotusAccessor
failureCh chan dagstore.ShardResult
traceCh chan dagstore.Trace
gcInterval time.Duration
}
var _ shared.DagStoreWrapper = (*Wrapper)(nil)
func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrapper, error) {
// construct the DAG Store.
registry := mount.NewRegistry()
if err := registry.Register(lotusScheme, NewLotusMountTemplate(mountApi)); err != nil {
return nil, xerrors.Errorf("failed to create registry: %w", err)
}
// The dagstore will write Shard failures to the `failureCh` here.
failureCh := make(chan dagstore.ShardResult, 1)
// The dagstore will write Trace events to the `traceCh` here.
traceCh := make(chan dagstore.Trace, 32)
dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir,
IndexDir: cfg.IndexDir,
Datastore: cfg.Datastore,
MountRegistry: registry,
FailureCh: failureCh,
TraceCh: traceCh,
MaxConcurrentFetch: cfg.MaxConcurrentFetch,
MaxConcurrentIndex: cfg.MaxConcurrentIndex,
}
dagStore, err := dagstore.NewDAGStore(dcfg)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store: %w", err)
}
return &Wrapper{
dagStore: dagStore,
mountApi: mountApi,
failureCh: failureCh,
traceCh: traceCh,
gcInterval: cfg.GCInterval,
}, nil
}
func (ds *Wrapper) Start(ctx context.Context) {
ds.ctx, ds.cancel = context.WithCancel(ctx)
ds.backgroundWg.Add(1)
// Run a go-routine to handle failures, traces and GC
go ds.background()
}
func (ds *Wrapper) background() {
defer ds.backgroundWg.Done()
gcTicker := time.NewTicker(ds.gcInterval)
defer gcTicker.Stop()
recoverShardResults := make(chan dagstore.ShardResult, 32)
for ds.ctx.Err() == nil {
select {
// GC the DAG store on every tick
case <-gcTicker.C:
_, _ = ds.dagStore.GC(ds.ctx)
// Log trace events from the DAG store
case tr := <-ds.traceCh:
log.Debugw("trace",
"shard-key", tr.Key.String(),
"op-type", tr.Op.String(),
"after", tr.After.String())
// Handle shard failures by attempting to recover the shard
case f := <-ds.failureCh:
log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil {
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
}
// Consume recover shard results
case res := <-recoverShardResults:
if res.Error != nil {
log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error)
}
// Exit when the DAG store wrapper is shutdown
case <-ds.ctx.Done():
return
}
}
}
func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.ClosableBlockstore, error) {
key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
if err != nil {
if !errors.Is(err, dagstore.ErrShardUnknown) {
return nil, xerrors.Errorf("failed to schedule acquire shard for piece CID %s: %w", pieceCid, err)
}
// if the DAGStore does not know about the Shard -> register it and then try to acquire it again.
log.Warnw("failed to load shard as shard is not registered, will re-register", "pieceCID", pieceCid)
// The path of a transient file that we can ask the DAG Store to use
// to perform the Indexing rather than fetching it via the Mount if
// we already have a transient file. However, we don't have it here
// and therefore we pass an empty file path.
carPath := ""
if err := shared.RegisterShardSync(ctx, ds, pieceCid, carPath, false); err != nil {
return nil, xerrors.Errorf("failed to re-register shard during loading piece CID %s: %w", pieceCid, err)
}
log.Warnw("successfully re-registered shard", "pieceCID", pieceCid)
resch = make(chan dagstore.ShardResult, 1)
if err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}); err != nil {
return nil, xerrors.Errorf("failed to acquire Shard for piece CID %s after re-registering: %w", pieceCid, err)
}
}
// TODO: The context is not yet being actively monitored by the DAG store,
// so we need to select against ctx.Done() until the following issue is
// implemented:
// https://github.com/filecoin-project/dagstore/issues/39
var res dagstore.ShardResult
select {
case <-ctx.Done():
return nil, ctx.Err()
case res = <-resch:
if res.Error != nil {
return nil, xerrors.Errorf("failed to acquire shard for piece CID %s: %w", pieceCid, res.Error)
}
}
bs, err := res.Accessor.Blockstore()
if err != nil {
return nil, err
}
return &closableBlockstore{Blockstore: NewReadOnlyBlockstore(bs), Closer: res.Accessor}, nil
}
func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath string, eagerInit bool, resch chan dagstore.ShardResult) error {
// Create a lotus mount with the piece CID
key := shard.KeyFromCID(pieceCid)
mt, err := NewLotusMount(pieceCid, ds.mountApi)
if err != nil {
return xerrors.Errorf("failed to create lotus mount for piece CID %s: %w", pieceCid, err)
}
// Register the shard
opts := dagstore.RegisterOpts{
ExistingTransient: carPath,
LazyInitialization: !eagerInit,
}
err = ds.dagStore.RegisterShard(ctx, key, mt, resch, opts)
if err != nil {
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
}
return nil
}
func (ds *Wrapper) Close() error {
// Cancel the context
ds.cancel()
// Close the DAG store
if err := ds.dagStore.Close(); err != nil {
return xerrors.Errorf("failed to close DAG store: %w", err)
}
// Wait for the background go routine to exit
ds.backgroundWg.Wait()
return nil
}

View File

@ -0,0 +1,200 @@
package dagstore
import (
"bytes"
"context"
"io"
"os"
"testing"
"time"
"golang.org/x/xerrors"
"github.com/filecoin-project/dagstore"
"github.com/filecoin-project/dagstore/mount"
"github.com/filecoin-project/dagstore/shard"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
)
// TestWrapperAcquireRecovery verifies that if acquire shard returns a "not found"
// error, the wrapper will attempt to register the shard then reacquire
func TestWrapperAcquireRecovery(t *testing.T) {
ctx := context.Background()
pieceCid, err := cid.Parse("bafkqaaa")
require.NoError(t, err)
// Create a DAG store wrapper
w, err := NewDagStoreWrapper(MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
GCInterval: time.Millisecond,
}, mockLotusMount{})
require.NoError(t, err)
// Return an error from acquire shard the first time
acquireShardErr := make(chan error, 1)
acquireShardErr <- xerrors.Errorf("unknown shard: %w", dagstore.ErrShardUnknown)
// Create a mock DAG store in place of the real DAG store
mock := &mockDagStore{
acquireShardErr: acquireShardErr,
acquireShardRes: dagstore.ShardResult{
Accessor: getShardAccessor(t),
},
register: make(chan shard.Key, 1),
}
w.dagStore = mock
mybs, err := w.LoadShard(ctx, pieceCid)
require.NoError(t, err)
// Expect the wrapper to try to recover from the error returned from
// acquire shard by calling register shard with the same key
tctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
select {
case <-tctx.Done():
require.Fail(t, "failed to call register")
case k := <-mock.register:
require.Equal(t, k.String(), pieceCid.String())
}
// Verify that we can get things from the acquired blockstore
var count int
ch, err := mybs.AllKeysChan(ctx)
require.NoError(t, err)
for range ch {
count++
}
require.Greater(t, count, 0)
}
// TestWrapperBackground verifies the behaviour of the background go routine
func TestWrapperBackground(t *testing.T) {
ctx := context.Background()
// Create a DAG store wrapper
w, err := NewDagStoreWrapper(MarketDAGStoreConfig{
TransientsDir: t.TempDir(),
IndexDir: t.TempDir(),
GCInterval: time.Millisecond,
}, mockLotusMount{})
require.NoError(t, err)
// Create a mock DAG store in place of the real DAG store
mock := &mockDagStore{
gc: make(chan struct{}, 1),
recover: make(chan shard.Key, 1),
close: make(chan struct{}, 1),
}
w.dagStore = mock
// Start up the wrapper
w.Start(ctx)
// Expect GC to be called automatically
tctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
select {
case <-tctx.Done():
require.Fail(t, "failed to call GC")
case <-mock.gc:
}
// Expect that when a result is sent on the failure channel, the wrapper
// will attempt to recover the shard
shardKey := shard.KeyFromString("key")
w.failureCh <- dagstore.ShardResult{
Key: shardKey,
}
tctx, cancel2 := context.WithTimeout(ctx, time.Second)
defer cancel2()
select {
case <-tctx.Done():
require.Fail(t, "failed to call recover")
case k := <-mock.recover:
require.Equal(t, shardKey, k)
}
// Expect that when the wrapper is closed it will call close on the
// DAG store
err = w.Close()
require.NoError(t, err)
tctx, cancel3 := context.WithTimeout(ctx, time.Second)
defer cancel3()
select {
case <-tctx.Done():
require.Fail(t, "failed to call close")
case <-mock.close:
}
}
type mockDagStore struct {
acquireShardErr chan error
acquireShardRes dagstore.ShardResult
register chan shard.Key
gc chan struct{}
recover chan shard.Key
close chan struct{}
}
func (m *mockDagStore) RegisterShard(ctx context.Context, key shard.Key, mnt mount.Mount, out chan dagstore.ShardResult, opts dagstore.RegisterOpts) error {
m.register <- key
out <- dagstore.ShardResult{Key: key}
return nil
}
func (m *mockDagStore) AcquireShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.AcquireOpts) error {
select {
case err := <-m.acquireShardErr:
return err
default:
}
out <- m.acquireShardRes
return nil
}
func (m *mockDagStore) RecoverShard(ctx context.Context, key shard.Key, out chan dagstore.ShardResult, _ dagstore.RecoverOpts) error {
m.recover <- key
return nil
}
func (m *mockDagStore) GC(ctx context.Context) (map[shard.Key]error, error) {
select {
case m.gc <- struct{}{}:
default:
}
return nil, nil
}
func (m *mockDagStore) Close() error {
m.close <- struct{}{}
return nil
}
type mockLotusMount struct {
}
func (m mockLotusMount) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) {
panic("implement me")
}
func (m mockLotusMount) GetUnpaddedCARSize(pieceCid cid.Cid) (uint64, error) {
panic("implement me")
}
func getShardAccessor(t *testing.T) *dagstore.ShardAccessor {
data, err := os.ReadFile("./fixtures/sample-rw-bs-v2.car")
require.NoError(t, err)
buff := bytes.NewReader(data)
reader := &mount.NopCloser{Reader: buff, ReaderAt: buff, Seeker: buff}
shardAccessor, err := dagstore.NewShardAccessor(reader, nil, nil)
require.NoError(t, err)
return shardAccessor
}

View File

@ -7,7 +7,6 @@ import (
"go.uber.org/fx"
"golang.org/x/xerrors"
mktdagstore "github.com/filecoin-project/go-fil-markets/dagstore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storagemarket"
@ -23,6 +22,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/markets/dagstore"
"github.com/filecoin-project/lotus/markets/dealfilter"
"github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/markets/storageadapter"
@ -147,7 +147,7 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
// DAG Store
Override(new(mktdagstore.DagStoreWrapper), modules.DagStoreWrapper),
Override(new(*dagstore.Wrapper), modules.DagStoreWrapper),
// Markets (retrieval)
Override(new(retrievalmarket.RetrievalProviderNode), retrievaladapter.NewRetrievalProviderNode),

View File

@ -32,7 +32,6 @@ import (
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
dtgstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
mktdagstore "github.com/filecoin-project/go-fil-markets/dagstore"
piecefilestore "github.com/filecoin-project/go-fil-markets/filestore"
piecestoreimpl "github.com/filecoin-project/go-fil-markets/piecestore/impl"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
@ -66,6 +65,7 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/journal"
"github.com/filecoin-project/lotus/markets"
"github.com/filecoin-project/lotus/markets/dagstore"
marketevents "github.com/filecoin-project/lotus/markets/loggers"
"github.com/filecoin-project/lotus/markets/pricing"
lotusminer "github.com/filecoin-project/lotus/miner"
@ -576,20 +576,36 @@ func BasicDealFilter(user dtypes.StorageDealFilter) func(onlineOk dtypes.Conside
}
func DagStoreWrapper(
lc fx.Lifecycle,
ds dtypes.MetadataDS,
r repo.LockedRepo,
pieceStore dtypes.ProviderPieceStore,
rpn retrievalmarket.RetrievalProviderNode,
) (mktdagstore.DagStoreWrapper, error) {
) (*dagstore.Wrapper, error) {
dagStoreDir := filepath.Join(r.Path(), "dagstore")
dagStoreDS := namespace.Wrap(ds, datastore.NewKey("/dagstore/provider"))
cfg := mktdagstore.MarketDAGStoreConfig{
cfg := dagstore.MarketDAGStoreConfig{
TransientsDir: filepath.Join(dagStoreDir, "transients"),
IndexDir: filepath.Join(dagStoreDir, "index"),
Datastore: dagStoreDS,
GCInterval: 5 * time.Minute,
}
mountApi := mktdagstore.NewLotusMountAPI(pieceStore, rpn)
return mktdagstore.NewDagStoreWrapper(cfg, mountApi)
mountApi := dagstore.NewLotusMountAPI(pieceStore, rpn)
dsw, err := dagstore.NewDagStoreWrapper(cfg, mountApi)
if err != nil {
return nil, xerrors.Errorf("failed to create DAG store wrapper: %w", err)
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
dsw.Start(ctx)
return nil
},
OnStop: func(context.Context) error {
return dsw.Close()
},
})
return dsw, nil
}
func StorageProvider(minerAddress dtypes.MinerAddress,
@ -600,7 +616,7 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
dataTransfer dtypes.ProviderDataTransfer,
spn storagemarket.StorageProviderNode,
df dtypes.StorageDealFilter,
dagStore mktdagstore.DagStoreWrapper,
dagStore *dagstore.Wrapper,
) (storagemarket.StorageProvider, error) {
net := smnet.NewFromLibp2pHost(h)
store, err := piecefilestore.NewLocalFileStore(piecefilestore.OsPath(r.Path()))
@ -609,9 +625,10 @@ func StorageProvider(minerAddress dtypes.MinerAddress,
}
opt := storageimpl.CustomDealDecisionLogic(storageimpl.DealDeciderFunc(df))
shardMigrator := storageimpl.NewShardMigrator(address.Address(minerAddress), ds, dagStore, pieceStore, spn)
return storageimpl.NewProvider(net, namespace.Wrap(ds, datastore.NewKey("/deals/provider")), store, dagStore, pieceStore,
dataTransfer, spn, address.Address(minerAddress), storedAsk, opt)
dataTransfer, spn, address.Address(minerAddress), storedAsk, shardMigrator, opt)
}
func RetrievalDealFilter(userFilter dtypes.RetrievalDealFilter) func(onlineOk dtypes.ConsiderOnlineRetrievalDealsConfigFunc,
@ -675,7 +692,7 @@ func RetrievalProvider(
dt dtypes.ProviderDataTransfer,
pricingFnc dtypes.RetrievalPricingFunc,
userFilter dtypes.RetrievalDealFilter,
dagStore mktdagstore.DagStoreWrapper,
dagStore *dagstore.Wrapper,
) (retrievalmarket.RetrievalProvider, error) {
opt := retrievalimpl.DealDeciderOpt(retrievalimpl.DealDecider(userFilter))
return retrievalimpl.NewProvider(