From 650d6efb1a5e337aea9455f00070509ec1f71b27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 28 Jul 2021 15:39:23 +0100 Subject: [PATCH 1/2] dagstore: throttle calls to storage except actual unseals. --- markets/dagstore/lotusaccessor.go | 72 ++++++++++++++++++-------- markets/dagstore/lotusaccessor_test.go | 4 +- 2 files changed, 53 insertions(+), 23 deletions(-) diff --git a/markets/dagstore/lotusaccessor.go b/markets/dagstore/lotusaccessor.go index 551ec79bd..151e9fa4c 100644 --- a/markets/dagstore/lotusaccessor.go +++ b/markets/dagstore/lotusaccessor.go @@ -2,7 +2,10 @@ package dagstore import ( "context" + "fmt" "io" + "os" + "strconv" "github.com/filecoin-project/dagstore/throttle" "github.com/ipfs/go-cid" @@ -13,10 +16,19 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" ) -// MaxConcurrentUnsealedFetches caps the amount of concurrent fetches for -// unsealed pieces, so that we don't saturate IO or the network too much, -// especially when bulk processing (e.g. at migration). -var MaxConcurrentUnsealedFetches = 3 +// MaxConcurrentStorageCalls caps the amount of concurrent calls to the +// storage, so that we don't spam it during heavy processes like bulk migration. +var MaxConcurrentStorageCalls = func() int { + // TODO replace env with config.toml attribute. + v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY") + if ok { + concurrency, err := strconv.Atoi(v) + if err == nil { + return concurrency + } + } + return 100 +}() type LotusAccessor interface { FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) @@ -37,7 +49,7 @@ func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalP return &lotusAccessor{ pieceStore: store, rm: rm, - throttle: throttle.Fixed(MaxConcurrentUnsealedFetches), + throttle: throttle.Fixed(MaxConcurrentStorageCalls), readyMgr: shared.NewReadyManager(), } } @@ -74,34 +86,48 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid return nil, err } - pieceInfo, err := m.pieceStore.GetPieceInfo(pieceCid) + // Throttle this path to avoid flooding the storage subsystem. + var pieceInfo piecestore.PieceInfo + err = m.throttle.Do(ctx, func(ctx context.Context) (err error) { + pieceInfo, err = m.pieceStore.GetPieceInfo(pieceCid) + return err + }) + if err != nil { return nil, xerrors.Errorf("failed to fetch pieceInfo for piece %s: %w", pieceCid, err) } if len(pieceInfo.Deals) == 0 { - return nil, xerrors.Errorf("no storage deals found for Piece %s", pieceCid) + return nil, xerrors.Errorf("no storage deals found for piece %s", pieceCid) } // prefer an unsealed sector containing the piece if one exists for _, deal := range pieceInfo.Deals { - isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + deal := deal + + // Throttle this path to avoid flooding the storage subsystem. + var reader io.ReadCloser + err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { + isUnsealed, err := m.rm.IsUnsealed(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + if err != nil { + return fmt.Errorf("failed to check if sector %d for deal %d was unsealed: %w", deal.SectorID, deal.DealID, err) + } + if !isUnsealed { + return nil + } + // Because we know we have an unsealed copy, this UnsealSector call will actually not perform any unsealing. + reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + return err + }) + if err != nil { - log.Warnf("failed to check if deal %d unsealed: %s", deal.DealID, err) - continue + log.Warnf("failed to check/retrieve unsealed sector: %s", err) + continue // move on to the next match. } - if isUnsealed { - var reader io.ReadCloser - // We want to throttle this path, as these copies will be downloaded - // immediately from the storage cluster without any unsealing - // necessary. - deal := deal // calm the linter - err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { - // UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around. - reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) - return err - }) - return reader, err + + if reader != nil { + // we were able to obtain a reader for an already unsealed piece + return reader, nil } } @@ -110,6 +136,8 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid for _, deal := range pieceInfo.Deals { // Note that if the deal data is not already unsealed, unsealing may // block for a long time with the current PoRep + // + // This path is unthrottled. reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) if err != nil { lastErr = xerrors.Errorf("failed to unseal deal %d: %w", deal.DealID, err) diff --git a/markets/dagstore/lotusaccessor_test.go b/markets/dagstore/lotusaccessor_test.go index 2485064f3..b6a7a4b25 100644 --- a/markets/dagstore/lotusaccessor_test.go +++ b/markets/dagstore/lotusaccessor_test.go @@ -123,6 +123,8 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { } func TestThrottle(t *testing.T) { + MaxConcurrentStorageCalls = 3 + ctx := context.Background() cid1, err := cid.Parse("bafkqaaa") require.NoError(t, err) @@ -160,7 +162,7 @@ func TestThrottle(t *testing.T) { } time.Sleep(500 * time.Millisecond) - require.EqualValues(t, MaxConcurrentUnsealedFetches, atomic.LoadInt32(&rpn.calls)) // throttled + require.EqualValues(t, MaxConcurrentStorageCalls, atomic.LoadInt32(&rpn.calls)) // throttled // allow to proceed. rpn.lk.Unlock() From b6112c6fc9531037066f2b1d79ed3d0b50bb0d1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Wed, 28 Jul 2021 16:28:27 +0100 Subject: [PATCH 2/2] wire in copy throttling. --- go.mod | 2 +- go.sum | 3 ++- markets/dagstore/wrapper.go | 16 +++++++++------- node/modules/storageminer.go | 11 ++++++----- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index d18d11edb..8ea02ccc3 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/elastic/gosigar v0.12.0 github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/fatih/color v1.9.0 - github.com/filecoin-project/dagstore v0.3.1 + github.com/filecoin-project/dagstore v0.3.2-0.20210728152352-8e0b04b9b384 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-bitfield v0.2.4 diff --git a/go.sum b/go.sum index a1b022f38..b228c25ed 100644 --- a/go.sum +++ b/go.sum @@ -256,8 +256,9 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= -github.com/filecoin-project/dagstore v0.3.1 h1:IShahbR3zEPE1pfHa5DiYMt/S40b9JuWZx6+kZPuyTQ= github.com/filecoin-project/dagstore v0.3.1/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= +github.com/filecoin-project/dagstore v0.3.2-0.20210728152352-8e0b04b9b384 h1:0LrfJHR6YAa8t7VEZmbNECW7Deov8Di9CZHsZaamiM4= +github.com/filecoin-project/dagstore v0.3.2-0.20210728152352-8e0b04b9b384/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index e35dc5e6d..65c6fce91 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -27,11 +27,12 @@ var log = logging.Logger("dagstore-wrapper") // MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. type MarketDAGStoreConfig struct { - TransientsDir string - IndexDir string - Datastore ds.Datastore - MaxConcurrentIndex int - GCInterval time.Duration + TransientsDir string + IndexDir string + Datastore ds.Datastore + MaxConcurrentIndex int + MaxConcurrentCopies int + GCInterval time.Duration } // DAGStore provides an interface for the DAG store that can be mocked out @@ -89,8 +90,9 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap TraceCh: traceCh, // not limiting fetches globally, as the Lotus mount does // conditional throttling. - MaxConcurrentIndex: cfg.MaxConcurrentIndex, - RecoverOnStart: dagstore.RecoverOnAcquire, + MaxConcurrentIndex: cfg.MaxConcurrentIndex, + MaxConcurrentCopies: cfg.MaxConcurrentCopies, + RecoverOnStart: dagstore.RecoverOnAcquire, } dagStore, err := dagstore.NewDAGStore(dcfg) if err != nil { diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 923350b65..8d61b13a7 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -609,11 +609,12 @@ func DagStoreWrapper( } cfg := dagstore.MarketDAGStoreConfig{ - TransientsDir: filepath.Join(dagStoreDir, "transients"), - IndexDir: filepath.Join(dagStoreDir, "index"), - Datastore: dagStoreDS, - GCInterval: 1 * time.Minute, - MaxConcurrentIndex: 5, + TransientsDir: filepath.Join(dagStoreDir, "transients"), + IndexDir: filepath.Join(dagStoreDir, "index"), + Datastore: dagStoreDS, + GCInterval: 1 * time.Minute, + MaxConcurrentIndex: 5, + MaxConcurrentCopies: 2, } dsw, err := dagstore.NewDagStoreWrapper(cfg, lotusAccessor)