From c8d328d386cc4f21b3ae13b509244665c30193dc Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Thu, 22 Jul 2021 09:44:14 +0530 Subject: [PATCH] fix bugs in the dagstore wrapper --- go.mod | 4 +-- go.sum | 8 ++--- markets/dagstore/wrapper.go | 61 +++++++++++++++++++------------- markets/dagstore/wrapper_test.go | 15 -------- 4 files changed, 43 insertions(+), 45 deletions(-) diff --git a/go.mod b/go.mod index ff9e99b8d..50db65e67 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/elastic/gosigar v0.12.0 github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/fatih/color v1.9.0 - github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c + github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4 github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-bitfield v0.2.4 @@ -35,7 +35,7 @@ require ( github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-data-transfer v1.7.0 github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a - github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19 + github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643 github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 diff --git a/go.sum b/go.sum index 2cfd01595..c6f7bd6c2 100644 --- a/go.sum +++ b/go.sum @@ -256,8 +256,8 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= -github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c h1:GTiVztvL5i019pAZIuaWMGXXeeJ7h066NEDHcIq6P5E= -github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI= +github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4 h1:Yr16AAror3Snqg6kwtSrzA5mJPQKb1ss0+A+y6nev9A= +github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= @@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= -github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19 h1:LdiK7gWwUqkZ9rvAF17ah3iROjTYayQcGlgpUPD6vfM= -github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19/go.mod h1:q1PQZl/eGMgL00Q7cG0xFyg/VGWAkC3F/fLrM2M4s2E= +github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643 h1:mtEdzSxp90Ci5xGYTWaJcfG9BNVXRMi0+bQuA5wwF2Q= +github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643/go.mod h1:h1133jjcAEDHmWWh8izwQDnHk7IqhbEc7dSUwkOmqsE= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index 42b0dbdc2..54830a7ae 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -20,6 +20,8 @@ import ( "github.com/filecoin-project/go-fil-markets/shared" ) +const maxRecoverAttempts = 1 + var log = logging.Logger("dagstore-wrapper") // MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. @@ -81,6 +83,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap TraceCh: traceCh, MaxConcurrentFetch: cfg.MaxConcurrentFetch, MaxConcurrentIndex: cfg.MaxConcurrentIndex, + RecoverStrategy: dagstore.RecoverLazy, } dagStore, err := dagstore.NewDAGStore(dcfg) if err != nil { @@ -99,26 +102,26 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap func (ds *Wrapper) Start(ctx context.Context) { ds.ctx, ds.cancel = context.WithCancel(ctx) + // Run a go-routine to do DagStore GC. ds.backgroundWg.Add(1) + go ds.dagStoreGCLoop() - // Run a go-routine to handle failures, traces and GC - go ds.background() + // run a go-routine to read the trace for debugging. + ds.backgroundWg.Add(1) + go ds.traceLoop() + + // Run a go-routine for shard recovery + if dss, ok := ds.dagStore.(*dagstore.DAGStore); ok { + ds.backgroundWg.Add(1) + go dagstore.RecoverImmediately(ds.ctx, dss, ds.failureCh, maxRecoverAttempts, ds.backgroundWg.Done) + } } -func (ds *Wrapper) background() { +func (ds *Wrapper) traceLoop() { defer ds.backgroundWg.Done() - gcTicker := time.NewTicker(ds.gcInterval) - defer gcTicker.Stop() - - recoverShardResults := make(chan dagstore.ShardResult, 32) for ds.ctx.Err() == nil { select { - - // GC the DAG store on every tick - case <-gcTicker.C: - _, _ = ds.dagStore.GC(ds.ctx) - // Log trace events from the DAG store case tr := <-ds.traceCh: log.Debugw("trace", @@ -126,18 +129,23 @@ func (ds *Wrapper) background() { "op-type", tr.Op.String(), "after", tr.After.String()) - // Handle shard failures by attempting to recover the shard - case f := <-ds.failureCh: - log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error) - if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil { - log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err) - } + case <-ds.ctx.Done(): + return + } + } +} - // Consume recover shard results - case res := <-recoverShardResults: - if res.Error != nil { - log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error) - } +func (ds *Wrapper) dagStoreGCLoop() { + defer ds.backgroundWg.Done() + + gcTicker := time.NewTicker(ds.gcInterval) + defer gcTicker.Stop() + + for ds.ctx.Err() == nil { + select { + // GC the DAG store on every tick + case <-gcTicker.C: + _, _ = ds.dagStore.GC(ds.ctx) // Exit when the DAG store wrapper is shutdown case <-ds.ctx.Done(): @@ -151,7 +159,7 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl key := shard.KeyFromCID(pieceCid) resch := make(chan dagstore.ShardResult, 1) err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) - log.Info("sent message to acquire shard") + log.Debugf("sent message to acquire shard for piece CID %s", pieceCid) if err != nil { if !errors.Is(err, dagstore.ErrShardUnknown) { @@ -216,6 +224,7 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath if err != nil { return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err) } + log.Debugf("successfully submitted Register Shard request for piece CID %s with eagerInit=%t", pieceCid, eagerInit) return nil } @@ -225,12 +234,16 @@ func (ds *Wrapper) Close() error { ds.cancel() // Close the DAG store + log.Info("will close the dagstore") if err := ds.dagStore.Close(); err != nil { return xerrors.Errorf("failed to close DAG store: %w", err) } + log.Info("dagstore closed") // Wait for the background go routine to exit + log.Info("waiting for dagstore background wrapper routines to exist") ds.backgroundWg.Wait() + log.Info("exited dagstore background warpper routines") return nil } diff --git a/markets/dagstore/wrapper_test.go b/markets/dagstore/wrapper_test.go index faf63a2a8..3b3858550 100644 --- a/markets/dagstore/wrapper_test.go +++ b/markets/dagstore/wrapper_test.go @@ -103,21 +103,6 @@ func TestWrapperBackground(t *testing.T) { case <-mock.gc: } - // Expect that when a result is sent on the failure channel, the wrapper - // will attempt to recover the shard - shardKey := shard.KeyFromString("key") - w.failureCh <- dagstore.ShardResult{ - Key: shardKey, - } - tctx, cancel2 := context.WithTimeout(ctx, time.Second) - defer cancel2() - select { - case <-tctx.Done(): - require.Fail(t, "failed to call recover") - case k := <-mock.recover: - require.Equal(t, shardKey, k) - } - // Expect that when the wrapper is closed it will call close on the // DAG store err = w.Close()