post worker sched: Retry on alternative worker on RPC errors
This commit is contained in:
parent
4b99472b35
commit
9a295e58b0
@ -2,6 +2,9 @@ package sealer
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/filecoin-project/go-jsonrpc"
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -102,15 +105,31 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
selected := candidates[0]
|
var rpcErrs error
|
||||||
|
|
||||||
|
for i, selected := range candidates {
|
||||||
worker := ps.workers[selected.id]
|
worker := ps.workers[selected.id]
|
||||||
|
|
||||||
return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
|
err := worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
|
||||||
ps.lk.Unlock()
|
ps.lk.Unlock()
|
||||||
defer ps.lk.Lock()
|
defer ps.lk.Lock()
|
||||||
|
|
||||||
return work(ctx, worker.workerRpc)
|
return work(ctx, worker.workerRpc)
|
||||||
})
|
})
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if the error is RPCConnectionError, try another worker, if not, return the error
|
||||||
|
if !errors.As(err, new(*jsonrpc.RPCConnectionError)) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Warnw("worker RPC connection error, will retry with another candidate if possible", "error", err, "worker", selected.id, "candidate", i, "candidates", len(candidates))
|
||||||
|
rpcErrs = multierror.Append(rpcErrs, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return xerrors.Errorf("got RPC errors from all workers: %w", rpcErrs)
|
||||||
}
|
}
|
||||||
|
|
||||||
type candidateWorker struct {
|
type candidateWorker struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user