diff --git a/go.mod b/go.mod index 049c9c952..3ea7dcbe7 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/elastic/gosigar v0.12.0 github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/fatih/color v1.9.0 - github.com/filecoin-project/dagstore v0.3.0 + github.com/filecoin-project/dagstore v0.3.1-0.20210727155220-5db1798dc4c8 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-bitfield v0.2.4 diff --git a/go.sum b/go.sum index 06b3ac6bc..0fe1d45c2 100644 --- a/go.sum +++ b/go.sum @@ -256,8 +256,9 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= -github.com/filecoin-project/dagstore v0.3.0 h1:jYqycoe6WxK6CbARco17J4uqJr3pskC/8t6fqN/atbQ= github.com/filecoin-project/dagstore v0.3.0/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI= +github.com/filecoin-project/dagstore v0.3.1-0.20210727155220-5db1798dc4c8 h1:uKrlFJ7k7PIfbAQPpuRHqFqjA/soAkfyPOHrkCg7tYw= +github.com/filecoin-project/dagstore v0.3.1-0.20210727155220-5db1798dc4c8/go.mod h1:WY5OoLfnwISCk6eASSF927KKPqLPIlTwmG1qHpA08KY= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= diff --git a/markets/dagstore/lotusaccessor.go b/markets/dagstore/lotusaccessor.go index 153a6ba88..7cd363dc7 100644 --- a/markets/dagstore/lotusaccessor.go +++ b/markets/dagstore/lotusaccessor.go @@ -4,6 +4,7 @@ import ( "context" "io" + "github.com/filecoin-project/dagstore/throttle" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -12,6 +13,11 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" ) +// MaxConcurrentUnsealedFetches caps the amount of concurrent fetches for +// unsealed pieces, so that we don't saturate IO or the network too much, +// especially when bulk processing (e.g. at migration). +var MaxConcurrentUnsealedFetches = 3 + type LotusAccessor interface { FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid) (io.ReadCloser, error) GetUnpaddedCARSize(ctx context.Context, pieceCid cid.Cid) (uint64, error) @@ -21,8 +27,8 @@ type LotusAccessor interface { type lotusAccessor struct { pieceStore piecestore.PieceStore rm retrievalmarket.RetrievalProviderNode - - readyMgr *shared.ReadyManager + throttle throttle.Throttler + readyMgr *shared.ReadyManager } var _ LotusAccessor = (*lotusAccessor)(nil) @@ -31,6 +37,7 @@ func NewLotusAccessor(store piecestore.PieceStore, rm retrievalmarket.RetrievalP return &lotusAccessor{ pieceStore: store, rm: rm, + throttle: throttle.Fixed(MaxConcurrentUnsealedFetches), readyMgr: shared.NewReadyManager(), } } @@ -84,11 +91,16 @@ func (m *lotusAccessor) FetchUnsealedPiece(ctx context.Context, pieceCid cid.Cid continue } if isUnsealed { - // UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around. - reader, err := m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) - if err == nil { - return reader, nil - } + var reader io.ReadCloser + // We want to throttle this path, as these copies will be downloaded + // immediately from the storage cluster without any unsealing + // necessary. + err := m.throttle.Do(ctx, func(ctx context.Context) (err error) { + // UnsealSector will NOT unseal a sector if we already have an unsealed copy lying around. + reader, err = m.rm.UnsealSector(ctx, deal.SectorID, deal.Offset.Unpadded(), deal.Length.Unpadded()) + return err + }) + return reader, err } } diff --git a/markets/dagstore/lotusaccessor_test.go b/markets/dagstore/lotusaccessor_test.go index 666e881f4..2485064f3 100644 --- a/markets/dagstore/lotusaccessor_test.go +++ b/markets/dagstore/lotusaccessor_test.go @@ -4,12 +4,16 @@ import ( "bytes" "context" "io" + "sync" + "sync/atomic" "testing" + "time" "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" @@ -118,6 +122,57 @@ func TestLotusAccessorGetUnpaddedCARSize(t *testing.T) { require.EqualValues(t, 10, len) } +func TestThrottle(t *testing.T) { + ctx := context.Background() + cid1, err := cid.Parse("bafkqaaa") + require.NoError(t, err) + + ps := getPieceStore(t) + rpn := &mockRPN{ + sectors: map[abi.SectorNumber]string{ + unsealedSectorID: "foo", + }, + } + api := NewLotusAccessor(ps, rpn) + require.NoError(t, api.Start(ctx)) + + // Add a deal with data Length 10 + dealInfo := piecestore.DealInfo{ + SectorID: unsealedSectorID, + Length: 10, + } + err = ps.AddDealForPiece(cid1, dealInfo) + require.NoError(t, err) + + // hold the lock to block. + rpn.lk.Lock() + + // fetch the piece concurrently. + errgrp, ctx := errgroup.WithContext(context.Background()) + for i := 0; i < 10; i++ { + errgrp.Go(func() error { + r, err := api.FetchUnsealedPiece(ctx, cid1) + if err == nil { + _ = r.Close() + } + return err + }) + } + + time.Sleep(500 * time.Millisecond) + require.EqualValues(t, MaxConcurrentUnsealedFetches, atomic.LoadInt32(&rpn.calls)) // throttled + + // allow to proceed. + rpn.lk.Unlock() + + // allow all to finish. + err = errgrp.Wait() + require.NoError(t, err) + + require.EqualValues(t, 10, atomic.LoadInt32(&rpn.calls)) // throttled + +} + func getPieceStore(t *testing.T) piecestore.PieceStore { ps, err := piecestoreimpl.NewPieceStore(ds_sync.MutexWrap(ds.NewMapDatastore())) require.NoError(t, err) @@ -128,10 +183,16 @@ func getPieceStore(t *testing.T) piecestore.PieceStore { } type mockRPN struct { + calls int32 // guarded by atomic + lk sync.RWMutex // lock to simulate blocks. sectors map[abi.SectorNumber]string } func (m *mockRPN) UnsealSector(ctx context.Context, sectorID abi.SectorNumber, offset abi.UnpaddedPieceSize, length abi.UnpaddedPieceSize) (io.ReadCloser, error) { + atomic.AddInt32(&m.calls, 1) + m.lk.RLock() + defer m.lk.RUnlock() + data, ok := m.sectors[sectorID] if !ok { panic("sector not found") diff --git a/markets/dagstore/mount.go b/markets/dagstore/mount.go index b31d82ab1..e87914732 100644 --- a/markets/dagstore/mount.go +++ b/markets/dagstore/mount.go @@ -22,7 +22,9 @@ type LotusMount struct { PieceCid cid.Cid } -// This method is called when registering a mount with the DAG store registry. +// NewLotusMountTemplate is called when registering a mount with +// the DAG store registry. +// // The DAG store registry receives an instance of the mount (a "template"). // When the registry needs to deserialize a mount it clones the template then // calls Deserialize on the cloned instance, which will have a reference to the diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 9868463f7..878e30427 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -82,13 +82,14 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap } dcfg := dagstore.Config{ - TransientsDir: cfg.TransientsDir, - IndexRepo: irepo, - Datastore: cfg.Datastore, - MountRegistry: registry, - FailureCh: failureCh, - TraceCh: traceCh, - MaxConcurrentFetch: cfg.MaxConcurrentFetch, + TransientsDir: cfg.TransientsDir, + IndexRepo: irepo, + Datastore: cfg.Datastore, + MountRegistry: registry, + FailureCh: failureCh, + TraceCh: traceCh, + // not limiting fetches globally, as the Lotus mount does + // conditional throttling. MaxConcurrentIndex: cfg.MaxConcurrentIndex, RecoverOnStart: dagstore.RecoverOnAcquire, }