Merge pull request #11191 from filecoin-project/nonsense/remove-trace-goroutine
fix: markets/dagstore: remove trace goroutine for dagstore wrapper
This commit is contained in:
commit
5f5c6a08a4
@ -48,7 +48,6 @@ type Wrapper struct {
|
|||||||
dagst dagstore.Interface
|
dagst dagstore.Interface
|
||||||
minerAPI MinerAPI
|
minerAPI MinerAPI
|
||||||
failureCh chan dagstore.ShardResult
|
failureCh chan dagstore.ShardResult
|
||||||
traceCh chan dagstore.Trace
|
|
||||||
gcInterval time.Duration
|
gcInterval time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,9 +63,6 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI, h host.Host) (*da
|
|||||||
// The dagstore will write Shard failures to the `failureCh` here.
|
// The dagstore will write Shard failures to the `failureCh` here.
|
||||||
failureCh := make(chan dagstore.ShardResult, 1)
|
failureCh := make(chan dagstore.ShardResult, 1)
|
||||||
|
|
||||||
// The dagstore will write Trace events to the `traceCh` here.
|
|
||||||
traceCh := make(chan dagstore.Trace, 32)
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
transientsDir = filepath.Join(cfg.RootDir, "transients")
|
transientsDir = filepath.Join(cfg.RootDir, "transients")
|
||||||
datastoreDir = filepath.Join(cfg.RootDir, "datastore")
|
datastoreDir = filepath.Join(cfg.RootDir, "datastore")
|
||||||
@ -90,7 +86,6 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI, h host.Host) (*da
|
|||||||
Datastore: dstore,
|
Datastore: dstore,
|
||||||
MountRegistry: registry,
|
MountRegistry: registry,
|
||||||
FailureCh: failureCh,
|
FailureCh: failureCh,
|
||||||
TraceCh: traceCh,
|
|
||||||
TopLevelIndex: topIndex,
|
TopLevelIndex: topIndex,
|
||||||
// not limiting fetches globally, as the Lotus mount does
|
// not limiting fetches globally, as the Lotus mount does
|
||||||
// conditional throttling.
|
// conditional throttling.
|
||||||
@ -109,7 +104,6 @@ func NewDAGStore(cfg config.DAGStoreConfig, minerApi MinerAPI, h host.Host) (*da
|
|||||||
dagst: dagst,
|
dagst: dagst,
|
||||||
minerAPI: minerApi,
|
minerAPI: minerApi,
|
||||||
failureCh: failureCh,
|
failureCh: failureCh,
|
||||||
traceCh: traceCh,
|
|
||||||
gcInterval: time.Duration(cfg.GCInterval),
|
gcInterval: time.Duration(cfg.GCInterval),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,10 +140,6 @@ func (w *Wrapper) Start(ctx context.Context) error {
|
|||||||
w.backgroundWg.Add(1)
|
w.backgroundWg.Add(1)
|
||||||
go w.gcLoop()
|
go w.gcLoop()
|
||||||
|
|
||||||
// run a go-routine to read the trace for debugging.
|
|
||||||
w.backgroundWg.Add(1)
|
|
||||||
go w.traceLoop()
|
|
||||||
|
|
||||||
// Run a go-routine for shard recovery
|
// Run a go-routine for shard recovery
|
||||||
if dss, ok := w.dagst.(*dagstore.DAGStore); ok {
|
if dss, ok := w.dagst.(*dagstore.DAGStore); ok {
|
||||||
w.backgroundWg.Add(1)
|
w.backgroundWg.Add(1)
|
||||||
@ -159,24 +149,6 @@ func (w *Wrapper) Start(ctx context.Context) error {
|
|||||||
return w.dagst.Start(ctx)
|
return w.dagst.Start(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Wrapper) traceLoop() {
|
|
||||||
defer w.backgroundWg.Done()
|
|
||||||
|
|
||||||
for w.ctx.Err() == nil {
|
|
||||||
select {
|
|
||||||
// Log trace events from the DAG store
|
|
||||||
case tr := <-w.traceCh:
|
|
||||||
log.Debugw("trace",
|
|
||||||
"shard-key", tr.Key.String(),
|
|
||||||
"op-type", tr.Op.String(),
|
|
||||||
"after", tr.After.String())
|
|
||||||
|
|
||||||
case <-w.ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Wrapper) gcLoop() {
|
func (w *Wrapper) gcLoop() {
|
||||||
defer w.backgroundWg.Done()
|
defer w.backgroundWg.Done()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user