fix bugs in the dagstore wrapper

This commit is contained in:
aarshkshah1992 2021-07-22 09:44:14 +05:30
parent fd879e0426
commit c8d328d386
4 changed files with 43 additions and 45 deletions

4
go.mod
View File

@ -26,7 +26,7 @@ require (
github.com/elastic/gosigar v0.12.0 github.com/elastic/gosigar v0.12.0
github.com/etclabscore/go-openrpc-reflect v0.0.36 github.com/etclabscore/go-openrpc-reflect v0.0.36
github.com/fatih/color v1.9.0 github.com/fatih/color v1.9.0
github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
github.com/filecoin-project/go-address v0.0.5 github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-bitfield v0.2.4 github.com/filecoin-project/go-bitfield v0.2.4
@ -35,7 +35,7 @@ require (
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
github.com/filecoin-project/go-data-transfer v1.7.0 github.com/filecoin-project/go-data-transfer v1.7.0
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19 github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
github.com/filecoin-project/go-multistore v0.0.3 github.com/filecoin-project/go-multistore v0.0.3
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20 github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20

8
go.sum
View File

@ -256,8 +256,8 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s= github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU= github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E= github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c h1:GTiVztvL5i019pAZIuaWMGXXeeJ7h066NEDHcIq6P5E= github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4 h1:Yr16AAror3Snqg6kwtSrzA5mJPQKb1ss0+A+y6nev9A=
github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI= github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI=
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM= github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8= github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ= github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c= github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19 h1:LdiK7gWwUqkZ9rvAF17ah3iROjTYayQcGlgpUPD6vfM= github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643 h1:mtEdzSxp90Ci5xGYTWaJcfG9BNVXRMi0+bQuA5wwF2Q=
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19/go.mod h1:q1PQZl/eGMgL00Q7cG0xFyg/VGWAkC3F/fLrM2M4s2E= github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643/go.mod h1:h1133jjcAEDHmWWh8izwQDnHk7IqhbEc7dSUwkOmqsE=
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM= github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24= github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM= github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=

View File

@ -20,6 +20,8 @@ import (
"github.com/filecoin-project/go-fil-markets/shared" "github.com/filecoin-project/go-fil-markets/shared"
) )
const maxRecoverAttempts = 1
var log = logging.Logger("dagstore-wrapper") var log = logging.Logger("dagstore-wrapper")
// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. // MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
@ -81,6 +83,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
TraceCh: traceCh, TraceCh: traceCh,
MaxConcurrentFetch: cfg.MaxConcurrentFetch, MaxConcurrentFetch: cfg.MaxConcurrentFetch,
MaxConcurrentIndex: cfg.MaxConcurrentIndex, MaxConcurrentIndex: cfg.MaxConcurrentIndex,
RecoverStrategy: dagstore.RecoverLazy,
} }
dagStore, err := dagstore.NewDAGStore(dcfg) dagStore, err := dagstore.NewDAGStore(dcfg)
if err != nil { if err != nil {
@ -99,26 +102,26 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
func (ds *Wrapper) Start(ctx context.Context) { func (ds *Wrapper) Start(ctx context.Context) {
ds.ctx, ds.cancel = context.WithCancel(ctx) ds.ctx, ds.cancel = context.WithCancel(ctx)
// Run a go-routine to do DagStore GC.
ds.backgroundWg.Add(1) ds.backgroundWg.Add(1)
go ds.dagStoreGCLoop()
// Run a go-routine to handle failures, traces and GC // run a go-routine to read the trace for debugging.
go ds.background() ds.backgroundWg.Add(1)
go ds.traceLoop()
// Run a go-routine for shard recovery
if dss, ok := ds.dagStore.(*dagstore.DAGStore); ok {
ds.backgroundWg.Add(1)
go dagstore.RecoverImmediately(ds.ctx, dss, ds.failureCh, maxRecoverAttempts, ds.backgroundWg.Done)
}
} }
func (ds *Wrapper) background() { func (ds *Wrapper) traceLoop() {
defer ds.backgroundWg.Done() defer ds.backgroundWg.Done()
gcTicker := time.NewTicker(ds.gcInterval)
defer gcTicker.Stop()
recoverShardResults := make(chan dagstore.ShardResult, 32)
for ds.ctx.Err() == nil { for ds.ctx.Err() == nil {
select { select {
// GC the DAG store on every tick
case <-gcTicker.C:
_, _ = ds.dagStore.GC(ds.ctx)
// Log trace events from the DAG store // Log trace events from the DAG store
case tr := <-ds.traceCh: case tr := <-ds.traceCh:
log.Debugw("trace", log.Debugw("trace",
@ -126,18 +129,23 @@ func (ds *Wrapper) background() {
"op-type", tr.Op.String(), "op-type", tr.Op.String(),
"after", tr.After.String()) "after", tr.After.String())
// Handle shard failures by attempting to recover the shard case <-ds.ctx.Done():
case f := <-ds.failureCh: return
log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error) }
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil { }
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
} }
// Consume recover shard results func (ds *Wrapper) dagStoreGCLoop() {
case res := <-recoverShardResults: defer ds.backgroundWg.Done()
if res.Error != nil {
log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error) gcTicker := time.NewTicker(ds.gcInterval)
} defer gcTicker.Stop()
for ds.ctx.Err() == nil {
select {
// GC the DAG store on every tick
case <-gcTicker.C:
_, _ = ds.dagStore.GC(ds.ctx)
// Exit when the DAG store wrapper is shutdown // Exit when the DAG store wrapper is shutdown
case <-ds.ctx.Done(): case <-ds.ctx.Done():
@ -151,7 +159,7 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl
key := shard.KeyFromCID(pieceCid) key := shard.KeyFromCID(pieceCid)
resch := make(chan dagstore.ShardResult, 1) resch := make(chan dagstore.ShardResult, 1)
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{}) err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
log.Info("sent message to acquire shard") log.Debugf("sent message to acquire shard for piece CID %s", pieceCid)
if err != nil { if err != nil {
if !errors.Is(err, dagstore.ErrShardUnknown) { if !errors.Is(err, dagstore.ErrShardUnknown) {
@ -216,6 +224,7 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath
if err != nil { if err != nil {
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err) return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
} }
log.Debugf("successfully submitted Register Shard request for piece CID %s with eagerInit=%t", pieceCid, eagerInit)
return nil return nil
} }
@ -225,12 +234,16 @@ func (ds *Wrapper) Close() error {
ds.cancel() ds.cancel()
// Close the DAG store // Close the DAG store
log.Info("will close the dagstore")
if err := ds.dagStore.Close(); err != nil { if err := ds.dagStore.Close(); err != nil {
return xerrors.Errorf("failed to close DAG store: %w", err) return xerrors.Errorf("failed to close DAG store: %w", err)
} }
log.Info("dagstore closed")
// Wait for the background go routine to exit // Wait for the background go routine to exit
log.Info("waiting for dagstore background wrapper routines to exist")
ds.backgroundWg.Wait() ds.backgroundWg.Wait()
log.Info("exited dagstore background warpper routines")
return nil return nil
} }

View File

@ -103,21 +103,6 @@ func TestWrapperBackground(t *testing.T) {
case <-mock.gc: case <-mock.gc:
} }
// Expect that when a result is sent on the failure channel, the wrapper
// will attempt to recover the shard
shardKey := shard.KeyFromString("key")
w.failureCh <- dagstore.ShardResult{
Key: shardKey,
}
tctx, cancel2 := context.WithTimeout(ctx, time.Second)
defer cancel2()
select {
case <-tctx.Done():
require.Fail(t, "failed to call recover")
case k := <-mock.recover:
require.Equal(t, shardKey, k)
}
// Expect that when the wrapper is closed it will call close on the // Expect that when the wrapper is closed it will call close on the
// DAG store // DAG store
err = w.Close() err = w.Close()