From 7655e660f3934a57dc9d957ece5e43e3c75b4195 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 18 Nov 2021 12:07:12 +0400 Subject: [PATCH] integrate store-the-hash --- go.mod | 3 ++- go.sum | 8 ++++++-- itests/kit/client.go | 11 ++++++++--- itests/kit/deals_state.go | 2 +- markets/dagstore/wrapper.go | 17 ++++++++++------- markets/dagstore/wrapper_migration_test.go | 7 ++++++- markets/dagstore/wrapper_test.go | 10 ++++++++-- node/modules/storageminer_dagstore.go | 6 ++++-- 8 files changed, 45 insertions(+), 19 deletions(-) diff --git a/go.mod b/go.mod index 06a6799b6..6b0f24f3e 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/elastic/gosigar v0.14.1 github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/fatih/color v1.13.0 - github.com/filecoin-project/dagstore v0.5.0 + github.com/filecoin-project/dagstore v0.5.1-0.20211118071820-a75afb62d8ce github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.6 github.com/filecoin-project/go-bitfield v0.2.4 @@ -37,6 +37,7 @@ require ( github.com/filecoin-project/go-fil-commcid v0.1.0 github.com/filecoin-project/go-fil-commp-hashhash v0.1.0 github.com/filecoin-project/go-fil-markets v1.13.3-0.20211117072527-8713155662ff + github.com/filecoin-project/go-indexer-core v0.2.4 github.com/filecoin-project/go-jsonrpc v0.1.5 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.2 diff --git a/go.sum b/go.sum index 4831d889b..be18c532d 100644 --- a/go.sum +++ b/go.sum @@ -302,8 +302,9 @@ github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL github.com/fatih/color v1.12.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/filecoin-project/dagstore v0.5.0 h1:akiheGtSCU7aGon6T74bwNbDhYW0XZCTsnORKNmzSgU= github.com/filecoin-project/dagstore v0.5.0/go.mod h1:Id2VG7uZxnF6NWR3JX1evNA83JonnU0U7Fc2r14Vltc= +github.com/filecoin-project/dagstore v0.5.1-0.20211118071820-a75afb62d8ce h1:cYK19JNHrVQD4nkh6b41wLU1TnxByaln+sc8AEtZT9I= +github.com/filecoin-project/dagstore v0.5.1-0.20211118071820-a75afb62d8ce/go.mod h1:bW1kjUKlVsQkITxKYlIdXHC8j/+wIgEl3caxUFJijGI= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.6 h1:DWQtj38ax+ogHwyH3VULRIoT8E6loyXqsk/p81xoY7M= @@ -351,8 +352,9 @@ github.com/filecoin-project/go-hamt-ipld/v3 v3.0.1/go.mod h1:gXpNmr3oQx8l3o7qkGy github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0 h1:rVVNq0x6RGQIzCo1iiJlGFm9AGIZzeifggxtKMU7zmI= github.com/filecoin-project/go-hamt-ipld/v3 v3.1.0/go.mod h1:bxmzgT8tmeVQA1/gvBwFmYdT8SOFUwB3ovSUfG1Ux0g= github.com/filecoin-project/go-indexer-core v0.2.2/go.mod h1:wV+NmrF8fHG6Xii3ecoZf2JW3laGTe5xtsWz609jo+Y= -github.com/filecoin-project/go-indexer-core v0.2.3 h1:kaUL2r8CuihK53lhmtCScffb7Bzs+N1yRGpwvxzCN+U= github.com/filecoin-project/go-indexer-core v0.2.3/go.mod h1:MSe5aRWmfRB+5syR4yDV+OApgJU+MUJ4rl9VUuzwCfc= +github.com/filecoin-project/go-indexer-core v0.2.4 h1:90vvxoBeNZN+h4W+vZ+VsoxKaDBr/bfZJJNByapGeM0= +github.com/filecoin-project/go-indexer-core v0.2.4/go.mod h1:MSe5aRWmfRB+5syR4yDV+OApgJU+MUJ4rl9VUuzwCfc= github.com/filecoin-project/go-jsonrpc v0.1.5 h1:ckxqZ09ivBAVf5CSmxxrqqNHC7PJm3GYGtYKiNQ+vGk= github.com/filecoin-project/go-jsonrpc v0.1.5/go.mod h1:XBBpuKIMaXIIzeqzO1iucq4GvbF8CxmXRFoezRh+Cx4= github.com/filecoin-project/go-legs v0.0.0-20211013165050-9ab325b6d2eb/go.mod h1:lKwBnslfNGG7JnsP9uQZl3yK7f74fit1MyHcwuuOP3k= @@ -427,6 +429,7 @@ github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09 github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/gammazero/keymutex v0.0.2 h1:cmpLBJHdEwn+WlR5Z/o9/BN92znSZTp5AKPQDpu1QcI= github.com/gammazero/keymutex v0.0.2/go.mod h1:qtzWCCLMisQUmVa4dvqHVgwfh4BP2YB7JxNDGXnsKrs= github.com/gammazero/radixtree v0.2.5 h1:muPQ4eEgCkUymFWPiVQRuXOQv4IhWg8YXH2r71MoqPM= github.com/gammazero/radixtree v0.2.5/go.mod h1:VPqqCDZ3YZZxAzUUsIF/ytFBigVWV7JIV1Stld8hri0= @@ -909,6 +912,7 @@ github.com/ipld/go-ipld-prime-proto v0.0.0-20200922192210-9a2bfd4440a6/go.mod h1 github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE= github.com/ipld/go-ipld-selector-text-lite v0.0.0 h1:MLU1YUAgd3Z+RfVCXUbvxH1RQjEe+larJ9jmlW1aMgA= github.com/ipld/go-ipld-selector-text-lite v0.0.0/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM= +github.com/ipld/go-storethehash v0.0.0-20210915160027-d72ca9b0968c h1:izfvqCuEqk2V7BRkh7GCm7lyKC2ItyAbzUu4WgNmggc= github.com/ipld/go-storethehash v0.0.0-20210915160027-d72ca9b0968c/go.mod h1:PwE6iq8TiWJRI3zMGA1RtkFAnrDMK93dLA5SUeu0lH8= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4= diff --git a/itests/kit/client.go b/itests/kit/client.go index c9f8946ec..9fb546d8c 100644 --- a/itests/kit/client.go +++ b/itests/kit/client.go @@ -106,9 +106,14 @@ func RunClientTest(t *testing.T, cmds []*lcli.Command, clientNode *TestFullNode) tmpdir, err := ioutil.TempDir(os.TempDir(), "test-cli-Client") require.NoError(t, err) path := filepath.Join(tmpdir, "outfile.dat") - out = clientCLI.RunCmd("client", "retrieve", dataCid.String(), path) - fmt.Println("retrieve:\n", out) - require.Regexp(t, regexp.MustCompile("Success"), out) + + for { + out = clientCLI.RunCmd("client", "retrieve", dataCid.String(), path) + fmt.Println("retrieve:\n", out) + if strings.Contains(out, "Success") { + break + } + } } func CreateImportFile(ctx context.Context, client api.FullNode, rseed int, size int) (res *api.ImportRes, path string, data []byte, err error) { diff --git a/itests/kit/deals_state.go b/itests/kit/deals_state.go index 617a6d28e..1a69e20b9 100644 --- a/itests/kit/deals_state.go +++ b/itests/kit/deals_state.go @@ -14,7 +14,7 @@ func CategorizeDealState(dealStatus string) TestDealState { switch dealStatus { case "StorageDealFailing", "StorageDealError": return TestDealStateFailed - case "StorageDealStaged", "StorageDealAwaitingPreCommit", "StorageDealSealing", "StorageDealActive", "StorageDealExpired", "StorageDealSlashed": + case "StorageDealAwaitingPreCommit", "StorageDealSealing", "StorageDealActive", "StorageDealExpired", "StorageDealSlashed": return TestDealStateComplete } return TestDealStateInProgress diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 98c63361f..34e848844 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -10,6 +10,9 @@ import ( "sync" "time" + "github.com/filecoin-project/go-indexer-core/store/storethehash" + "github.com/libp2p/go-libp2p-core/host" + carindex "github.com/ipld/go-car/v2/index" "github.com/ipfs/go-cid" @@ -56,7 +59,7 @@ type Wrapper struct { var _ stores.DAGStoreWrapper = (*Wrapper)(nil) -func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGStore, *Wrapper, error) { +func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI, h host.Host) (*dagstore.DAGStore, *Wrapper, error) { // construct the DAG Store. registry := mount.NewRegistry() if err := registry.Register(lotusScheme, mountTemplate(minerApi)); err != nil { @@ -85,11 +88,11 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGSto return nil, nil, xerrors.Errorf("failed to initialise dagstore index repo: %w", err) } - //store, err := storethehash.New(indexDir) - //if err != nil { - //return nil, nil, xerrors.Errorf("failed to initialise store the index: %w", err) - //} - //topIndex := index.NewInverted(store) + store, err := storethehash.New(indexDir) + if err != nil { + return nil, nil, xerrors.Errorf("failed to initialise store the index: %w", err) + } + topIndex := index.NewInverted(store, h.ID()) dcfg := dagstore.Config{ TransientsDir: transientsDir, @@ -98,7 +101,7 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI) (*dagstore.DAGSto MountRegistry: registry, FailureCh: failureCh, TraceCh: traceCh, - //TopLevelIndex: topIndex, + TopLevelIndex: topIndex, // not limiting fetches globally, as the Lotus mount does // conditional throttling. MaxConcurrentIndex: cfg.MaxConcurrentIndex, diff --git a/markets/dagstore/wrapper_migration_test.go b/markets/dagstore/wrapper_migration_test.go index 13d8db876..eaa782590 100644 --- a/markets/dagstore/wrapper_migration_test.go +++ b/markets/dagstore/wrapper_migration_test.go @@ -4,6 +4,8 @@ import ( "context" "testing" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "github.com/filecoin-project/dagstore" "github.com/stretchr/testify/require" @@ -93,8 +95,11 @@ func TestShardRegistration(t *testing.T) { cfg := config.DefaultStorageMiner().DAGStore cfg.RootDir = t.TempDir() + h, err := mocknet.New(ctx).GenPeer() + require.NoError(t, err) + mapi := NewMinerAPI(ps, sa, 10) - dagst, w, err := NewDAGStore(cfg, mapi) + dagst, w, err := NewDAGStore(cfg, mapi, h) require.NoError(t, err) require.NotNil(t, dagst) require.NotNil(t, w) diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index 4cf537373..235edecb1 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -8,6 +8,8 @@ import ( "testing" "time" + mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + mh "github.com/multiformats/go-multihash" carindex "github.com/ipld/go-car/v2/index" @@ -31,11 +33,13 @@ func TestWrapperAcquireRecovery(t *testing.T) { pieceCid, err := cid.Parse("bafkqaaa") require.NoError(t, err) + h, err := mocknet.New(ctx).GenPeer() + require.NoError(t, err) // Create a DAG store wrapper dagst, w, err := NewDAGStore(config.DAGStoreConfig{ RootDir: t.TempDir(), GCInterval: config.Duration(1 * time.Millisecond), - }, mockLotusMount{}) + }, mockLotusMount{}, h) require.NoError(t, err) defer dagst.Close() //nolint:errcheck @@ -81,12 +85,14 @@ func TestWrapperAcquireRecovery(t *testing.T) { // TestWrapperBackground verifies the behaviour of the background go routine func TestWrapperBackground(t *testing.T) { ctx := context.Background() + h, err := mocknet.New(ctx).GenPeer() + require.NoError(t, err) // Create a DAG store wrapper dagst, w, err := NewDAGStore(config.DAGStoreConfig{ RootDir: t.TempDir(), GCInterval: config.Duration(1 * time.Millisecond), - }, mockLotusMount{}) + }, mockLotusMount{}, h) require.NoError(t, err) defer dagst.Close() //nolint:errcheck diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go index 1f72a49b9..3d0b101bd 100644 --- a/node/modules/storageminer_dagstore.go +++ b/node/modules/storageminer_dagstore.go @@ -7,6 +7,8 @@ import ( "path/filepath" "strconv" + "github.com/libp2p/go-libp2p-core/host" + "go.uber.org/fx" "golang.org/x/xerrors" @@ -63,7 +65,7 @@ func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderP // DAGStore constructs a DAG store using the supplied minerAPI, and the // user configuration. It returns both the DAGStore and the Wrapper suitable for // passing to markets. -func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) { +func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI, h host.Host) (*dagstore.DAGStore, *mdagstore.Wrapper, error) { cfg, err := extractDAGStoreConfig(r) if err != nil { return nil, nil, err @@ -82,7 +84,7 @@ func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) ( } } - dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI) + dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI, h) if err != nil { return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err) }