Merge pull request #6826 from filecoin-project/fix/dagstore-wrapper-bugs
Fix bugs in the DAGStore wrapper
This commit is contained in:
commit
62044c34b1
4
go.mod
4
go.mod
@ -26,7 +26,7 @@ require (
|
||||
github.com/elastic/gosigar v0.12.0
|
||||
github.com/etclabscore/go-openrpc-reflect v0.0.36
|
||||
github.com/fatih/color v1.9.0
|
||||
github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c
|
||||
github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4
|
||||
github.com/filecoin-project/filecoin-ffi v0.30.4-0.20200910194244-f640612a1a1f
|
||||
github.com/filecoin-project/go-address v0.0.5
|
||||
github.com/filecoin-project/go-bitfield v0.2.4
|
||||
@ -35,7 +35,7 @@ require (
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
||||
github.com/filecoin-project/go-data-transfer v1.7.0
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a
|
||||
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19
|
||||
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643
|
||||
github.com/filecoin-project/go-jsonrpc v0.1.4-0.20210217175800-45ea43ac2bec
|
||||
github.com/filecoin-project/go-multistore v0.0.3
|
||||
github.com/filecoin-project/go-padreader v0.0.0-20200903213702-ed5fae088b20
|
||||
|
8
go.sum
8
go.sum
@ -256,8 +256,8 @@ github.com/fatih/color v1.8.0/go.mod h1:3l45GVGkyrnYNl9HoIjnp2NnNWvh6hLAqD8yTfGj
|
||||
github.com/fatih/color v1.9.0 h1:8xPHl4/q1VyqGIPif1F+1V3Y3lSmrq01EabUW3CoW5s=
|
||||
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
|
||||
github.com/fd/go-nat v1.0.0/go.mod h1:BTBu/CKvMmOMUPkKVef1pngt2WFH/lg7E6yQnulfp6E=
|
||||
github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c h1:GTiVztvL5i019pAZIuaWMGXXeeJ7h066NEDHcIq6P5E=
|
||||
github.com/filecoin-project/dagstore v0.2.2-0.20210721184657-325a1dab778c/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI=
|
||||
github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4 h1:Yr16AAror3Snqg6kwtSrzA5mJPQKb1ss0+A+y6nev9A=
|
||||
github.com/filecoin-project/dagstore v0.2.2-0.20210722035743-7ddd92f518b4/go.mod h1:N0DVt3djIIzUpvab9Ja5D3dLgBVftWwC6idgFG2tZRI=
|
||||
github.com/filecoin-project/go-address v0.0.3/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
|
||||
github.com/filecoin-project/go-address v0.0.5 h1:SSaFT/5aLfPXycUlFyemoHYhRgdyXClXCyDdNJKPlDM=
|
||||
github.com/filecoin-project/go-address v0.0.5/go.mod h1:jr8JxKsYx+lQlQZmF5i2U0Z+cGQ59wMIps/8YW/lDj8=
|
||||
@ -286,8 +286,8 @@ github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a h1:hyJ+pUm/4U4RdEZBlg6k8Ma4rDiuvqyGpoICXAxwsTg=
|
||||
github.com/filecoin-project/go-fil-commcid v0.0.0-20201016201715-d41df56b4f6a/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
|
||||
github.com/filecoin-project/go-fil-markets v1.0.5-0.20201113164554-c5eba40d5335/go.mod h1:AJySOJC00JRWEZzRG2KsfUnqEf5ITXxeX09BE9N4f9c=
|
||||
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19 h1:LdiK7gWwUqkZ9rvAF17ah3iROjTYayQcGlgpUPD6vfM=
|
||||
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210721184833-f8c185f35f19/go.mod h1:q1PQZl/eGMgL00Q7cG0xFyg/VGWAkC3F/fLrM2M4s2E=
|
||||
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643 h1:mtEdzSxp90Ci5xGYTWaJcfG9BNVXRMi0+bQuA5wwF2Q=
|
||||
github.com/filecoin-project/go-fil-markets v1.6.0-rc1.0.20210722035959-f8b9c7d0e643/go.mod h1:h1133jjcAEDHmWWh8izwQDnHk7IqhbEc7dSUwkOmqsE=
|
||||
github.com/filecoin-project/go-hamt-ipld v0.1.5 h1:uoXrKbCQZ49OHpsTCkrThPNelC4W3LPEk0OrS/ytIBM=
|
||||
github.com/filecoin-project/go-hamt-ipld v0.1.5/go.mod h1:6Is+ONR5Cd5R6XZoCse1CWaXZc0Hdb/JeX+EQCQzX24=
|
||||
github.com/filecoin-project/go-hamt-ipld/v2 v2.0.0 h1:b3UDemBYN2HNfk3KOXNuxgTTxlWi3xVvbQP0IT38fvM=
|
||||
|
@ -20,6 +20,8 @@ import (
|
||||
"github.com/filecoin-project/go-fil-markets/shared"
|
||||
)
|
||||
|
||||
const maxRecoverAttempts = 1
|
||||
|
||||
var log = logging.Logger("dagstore-wrapper")
|
||||
|
||||
// MarketDAGStoreConfig is the config the market needs to then construct a DAG Store.
|
||||
@ -81,6 +83,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
|
||||
TraceCh: traceCh,
|
||||
MaxConcurrentFetch: cfg.MaxConcurrentFetch,
|
||||
MaxConcurrentIndex: cfg.MaxConcurrentIndex,
|
||||
RecoverStrategy: dagstore.RecoverLazy,
|
||||
}
|
||||
dagStore, err := dagstore.NewDAGStore(dcfg)
|
||||
if err != nil {
|
||||
@ -99,26 +102,26 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*Wrap
|
||||
func (ds *Wrapper) Start(ctx context.Context) {
|
||||
ds.ctx, ds.cancel = context.WithCancel(ctx)
|
||||
|
||||
// Run a go-routine to do DagStore GC.
|
||||
ds.backgroundWg.Add(1)
|
||||
go ds.dagStoreGCLoop()
|
||||
|
||||
// Run a go-routine to handle failures, traces and GC
|
||||
go ds.background()
|
||||
// run a go-routine to read the trace for debugging.
|
||||
ds.backgroundWg.Add(1)
|
||||
go ds.traceLoop()
|
||||
|
||||
// Run a go-routine for shard recovery
|
||||
if dss, ok := ds.dagStore.(*dagstore.DAGStore); ok {
|
||||
ds.backgroundWg.Add(1)
|
||||
go dagstore.RecoverImmediately(ds.ctx, dss, ds.failureCh, maxRecoverAttempts, ds.backgroundWg.Done)
|
||||
}
|
||||
}
|
||||
|
||||
func (ds *Wrapper) background() {
|
||||
func (ds *Wrapper) traceLoop() {
|
||||
defer ds.backgroundWg.Done()
|
||||
|
||||
gcTicker := time.NewTicker(ds.gcInterval)
|
||||
defer gcTicker.Stop()
|
||||
|
||||
recoverShardResults := make(chan dagstore.ShardResult, 32)
|
||||
for ds.ctx.Err() == nil {
|
||||
select {
|
||||
|
||||
// GC the DAG store on every tick
|
||||
case <-gcTicker.C:
|
||||
_, _ = ds.dagStore.GC(ds.ctx)
|
||||
|
||||
// Log trace events from the DAG store
|
||||
case tr := <-ds.traceCh:
|
||||
log.Debugw("trace",
|
||||
@ -126,18 +129,23 @@ func (ds *Wrapper) background() {
|
||||
"op-type", tr.Op.String(),
|
||||
"after", tr.After.String())
|
||||
|
||||
// Handle shard failures by attempting to recover the shard
|
||||
case f := <-ds.failureCh:
|
||||
log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
|
||||
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil {
|
||||
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
|
||||
}
|
||||
case <-ds.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Consume recover shard results
|
||||
case res := <-recoverShardResults:
|
||||
if res.Error != nil {
|
||||
log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error)
|
||||
}
|
||||
func (ds *Wrapper) dagStoreGCLoop() {
|
||||
defer ds.backgroundWg.Done()
|
||||
|
||||
gcTicker := time.NewTicker(ds.gcInterval)
|
||||
defer gcTicker.Stop()
|
||||
|
||||
for ds.ctx.Err() == nil {
|
||||
select {
|
||||
// GC the DAG store on every tick
|
||||
case <-gcTicker.C:
|
||||
_, _ = ds.dagStore.GC(ds.ctx)
|
||||
|
||||
// Exit when the DAG store wrapper is shutdown
|
||||
case <-ds.ctx.Done():
|
||||
@ -151,7 +159,7 @@ func (ds *Wrapper) LoadShard(ctx context.Context, pieceCid cid.Cid) (carstore.Cl
|
||||
key := shard.KeyFromCID(pieceCid)
|
||||
resch := make(chan dagstore.ShardResult, 1)
|
||||
err := ds.dagStore.AcquireShard(ctx, key, resch, dagstore.AcquireOpts{})
|
||||
log.Info("sent message to acquire shard")
|
||||
log.Debugf("sent message to acquire shard for piece CID %s", pieceCid)
|
||||
|
||||
if err != nil {
|
||||
if !errors.Is(err, dagstore.ErrShardUnknown) {
|
||||
@ -216,6 +224,7 @@ func (ds *Wrapper) RegisterShard(ctx context.Context, pieceCid cid.Cid, carPath
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to schedule register shard for piece CID %s: %w", pieceCid, err)
|
||||
}
|
||||
log.Debugf("successfully submitted Register Shard request for piece CID %s with eagerInit=%t", pieceCid, eagerInit)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -225,12 +234,16 @@ func (ds *Wrapper) Close() error {
|
||||
ds.cancel()
|
||||
|
||||
// Close the DAG store
|
||||
log.Info("will close the dagstore")
|
||||
if err := ds.dagStore.Close(); err != nil {
|
||||
return xerrors.Errorf("failed to close DAG store: %w", err)
|
||||
}
|
||||
log.Info("dagstore closed")
|
||||
|
||||
// Wait for the background go routine to exit
|
||||
log.Info("waiting for dagstore background wrapper routines to exist")
|
||||
ds.backgroundWg.Wait()
|
||||
log.Info("exited dagstore background warpper routines")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -103,21 +103,6 @@ func TestWrapperBackground(t *testing.T) {
|
||||
case <-mock.gc:
|
||||
}
|
||||
|
||||
// Expect that when a result is sent on the failure channel, the wrapper
|
||||
// will attempt to recover the shard
|
||||
shardKey := shard.KeyFromString("key")
|
||||
w.failureCh <- dagstore.ShardResult{
|
||||
Key: shardKey,
|
||||
}
|
||||
tctx, cancel2 := context.WithTimeout(ctx, time.Second)
|
||||
defer cancel2()
|
||||
select {
|
||||
case <-tctx.Done():
|
||||
require.Fail(t, "failed to call recover")
|
||||
case k := <-mock.recover:
|
||||
require.Equal(t, shardKey, k)
|
||||
}
|
||||
|
||||
// Expect that when the wrapper is closed it will call close on the
|
||||
// DAG store
|
||||
err = w.Close()
|
||||
|
Loading…
Reference in New Issue
Block a user