diff --git a/markets/dagstore/wrapper.go b/markets/dagstore/wrapper.go index f920f566d..b49ceda00 100644 --- a/markets/dagstore/wrapper.go +++ b/markets/dagstore/wrapper.go @@ -25,9 +25,11 @@ var gcInterval = 5 * time.Minute // MarketDAGStoreConfig is the config the market needs to then construct a DAG Store. type MarketDAGStoreConfig struct { - TransientsDir string - IndexDir string - Datastore ds.Datastore + TransientsDir string + IndexDir string + Datastore ds.Datastore + MaxConcurrentFetch int + MaxConcurrentIndex int } type closableBlockstore struct { @@ -43,6 +45,7 @@ type dagStoreWrapper struct { dagStore *dagstore.DAGStore mountApi LotusAccessor failureCh chan dagstore.ShardResult + traceCh chan dagstore.Trace } var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil) @@ -56,12 +59,17 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*dagS // The dagstore will write Shard failures to the `failureCh` here. failureCh := make(chan dagstore.ShardResult, 1) + // The dagstore will write Trace events to the `traceCh` here. + traceCh := make(chan dagstore.Trace, 32) dcfg := dagstore.Config{ - TransientsDir: cfg.TransientsDir, - IndexDir: cfg.IndexDir, - Datastore: cfg.Datastore, - MountRegistry: registry, - FailureCh: failureCh, + TransientsDir: cfg.TransientsDir, + IndexDir: cfg.IndexDir, + Datastore: cfg.Datastore, + MountRegistry: registry, + FailureCh: failureCh, + TraceCh: traceCh, + MaxConcurrentFetch: cfg.MaxConcurrentFetch, + MaxConcurrentIndex: cfg.MaxConcurrentIndex, } dagStore, err := dagstore.NewDAGStore(dcfg) if err != nil { @@ -72,6 +80,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*dagS dagStore: dagStore, mountApi: mountApi, failureCh: failureCh, + traceCh: traceCh, }, nil } @@ -80,11 +89,11 @@ func (ds *dagStoreWrapper) Start(ctx context.Context) { ds.backgroundWg.Add(1) - // Run a go-routine to handle failures and GC - go ds.background(ds.failureCh) + // Run a go-routine to handle failures, traces and GC + go ds.background() } -func (ds *dagStoreWrapper) background(failureCh chan dagstore.ShardResult) { +func (ds *dagStoreWrapper) background() { defer ds.backgroundWg.Done() gcTicker := time.NewTicker(gcInterval) @@ -98,8 +107,15 @@ func (ds *dagStoreWrapper) background(failureCh chan dagstore.ShardResult) { case <-gcTicker.C: _, _ = ds.dagStore.GC(ds.ctx) + // Log trace events from the DAG store + case tr := <-ds.traceCh: + log.Debugw("trace", + "shard-key", tr.Key.String(), + "op-type", tr.Op.String(), + "after", tr.After.String()) + // Handle shard failures by attempting to recover the shard - case f := <-failureCh: + case f := <-ds.failureCh: log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error) if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil { log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)