fix: simplify recover shard processing

This commit is contained in:
Dirk McCormick 2021-07-20 11:50:40 +02:00
parent 44036d9a4a
commit ffe7185443

View File

@ -5,7 +5,6 @@ import (
"errors"
"io"
"sync"
"sync/atomic"
"time"
"github.com/ipfs/go-cid"
@ -92,34 +91,6 @@ func (ds *dagStoreWrapper) background(failureCh chan dagstore.ShardResult) {
defer gcTicker.Stop()
recoverShardResults := make(chan dagstore.ShardResult, 32)
var recShardResCount int32
done := make(chan struct{})
defer close(done)
go func() {
// Consume recover shard results
for {
select {
// When the DAG store wrapper shuts down, drain the channel so as
// not to block the DAG store
case <-done:
for i := atomic.LoadInt32(&recShardResCount); i > 0; i-- {
res := <-recoverShardResults
if res.Error != nil {
log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error)
}
}
return
case res := <-recoverShardResults:
atomic.AddInt32(&recShardResCount, -1)
if res.Error != nil {
log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error)
}
}
}
}()
for ds.ctx.Err() != nil {
select {
@ -129,11 +100,15 @@ func (ds *dagStoreWrapper) background(failureCh chan dagstore.ShardResult) {
// Handle shard failures by attempting to recover the shard
case f := <-failureCh:
atomic.AddInt32(&recShardResCount, 1)
log.Warnw("shard failed", "shard-key", f.Key.String(), "error", f.Error)
if err := ds.dagStore.RecoverShard(ds.ctx, f.Key, recoverShardResults, dagstore.RecoverOpts{}); err != nil {
log.Warnw("shard recovery failed", "shard-key", f.Key.String(), "error", err)
atomic.AddInt32(&recShardResCount, -1)
}
// Consume recover shard results
case res := <-recoverShardResults:
if res.Error != nil {
log.Warnw("shard recovery failed", "shard-key", res.Key.String(), "error", res.Error)
}
// Exit when the DAG store wrapper is shutdown