feat: log trace events

This commit is contained in:
Dirk McCormick 2021-07-20 12:37:52 +02:00
parent ffe7185443
commit ffb995035f

View File

@ -28,6 +28,8 @@ type MarketDAGStoreConfig struct {
TransientsDir string TransientsDir string
IndexDir string IndexDir string
Datastore ds.Datastore Datastore ds.Datastore
MaxConcurrentFetch int
MaxConcurrentIndex int
} }
type closableBlockstore struct { type closableBlockstore struct {
@ -43,6 +45,7 @@ type dagStoreWrapper struct {
dagStore *dagstore.DAGStore dagStore *dagstore.DAGStore
mountApi LotusAccessor mountApi LotusAccessor
failureCh chan dagstore.ShardResult failureCh chan dagstore.ShardResult
traceCh chan dagstore.Trace
} }
var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil) var _ shared.DagStoreWrapper = (*dagStoreWrapper)(nil)
@ -56,12 +59,17 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*dagS
// The dagstore will write Shard failures to the `failureCh` here. // The dagstore will write Shard failures to the `failureCh` here.
failureCh := make(chan dagstore.ShardResult, 1) failureCh := make(chan dagstore.ShardResult, 1)
// The dagstore will write Trace events to the `traceCh` here.
traceCh := make(chan dagstore.Trace, 32)
dcfg := dagstore.Config{ dcfg := dagstore.Config{
TransientsDir: cfg.TransientsDir, TransientsDir: cfg.TransientsDir,
IndexDir: cfg.IndexDir, IndexDir: cfg.IndexDir,
Datastore: cfg.Datastore, Datastore: cfg.Datastore,
MountRegistry: registry, MountRegistry: registry,
FailureCh: failureCh, FailureCh: failureCh,
TraceCh: traceCh,
MaxConcurrentFetch: cfg.MaxConcurrentFetch,
MaxConcurrentIndex: cfg.MaxConcurrentIndex,
} }
dagStore, err := dagstore.NewDAGStore(dcfg) dagStore, err := dagstore.NewDAGStore(dcfg)
if err != nil { if err != nil {
@ -72,6 +80,7 @@ func NewDagStoreWrapper(cfg MarketDAGStoreConfig, mountApi LotusAccessor) (*dagS
dagStore: dagStore, dagStore: dagStore,
mountApi: mountApi, mountApi: mountApi,
failureCh: failureCh, failureCh: failureCh,
traceCh: traceCh,
}, nil }, nil
} }
@ -80,11 +89,11 @@ func (ds *dagStoreWrapper) Start(ctx context.Context) {
ds.backgroundWg.Add(1) ds.backgroundWg.Add(1)
// Run a go-routine to handle failures and GC // Run a go-routine to handle failures, traces and GC
go ds.background(ds.failureCh) go ds.background()
} }
func (ds *dagStoreWrapper) background(failureCh chan dagstore.ShardResult) { func (ds *dagStoreWrapper) background() {
defer ds.backgroundWg.Done() defer ds.backgroundWg.Done()
gcTicker := time.NewTicker(gcInterval) gcTicker := time.NewTicker(gcInterval)
@ -98,8 +107,15 @@ func (ds *dagStoreWrapper) background(failureCh chan dagstore.ShardResult) {
case <-gcTicker.C: case <-gcTicker.C:
_, _ = ds.dagStore.GC(ds.ctx) _, _ = ds.dagStore.GC(ds.ctx)
// Log trace events from the DAG store
case tr := <-ds.traceCh:
log.Debugw("trace",
"shard-key", tr.Key.String(),
"op-type", tr.Op.String(),
"after", tr.After.String())
// Handle shard failures by attempting to recover the shard // Handle shard failures by attempting to recover the shard
case f := <-failureCh: case f := <-ds.failureCh:
log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error) log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil { if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil {
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err) log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)