fix(poller): graceful shutdown
This commit is contained in:
parent
8cffac65d9
commit
9830f34d36
@ -12,11 +12,6 @@ import (
|
|||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
errorRetryCounterLimit = 3
|
|
||||||
errorRetryTimeSleepSecs = 30
|
|
||||||
)
|
|
||||||
|
|
||||||
var ErrDataLock = errors.New("Data Lock Error")
|
var ErrDataLock = errors.New("Data Lock Error")
|
||||||
|
|
||||||
func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error) *Poller {
|
func New(cli client.Client, dispatch func(context.Context, *runnerv1.Task) error) *Poller {
|
||||||
@ -33,9 +28,8 @@ type Poller struct {
|
|||||||
Filter *client.Filter
|
Filter *client.Filter
|
||||||
Dispatch func(context.Context, *runnerv1.Task) error
|
Dispatch func(context.Context, *runnerv1.Task) error
|
||||||
|
|
||||||
routineGroup *routineGroup
|
routineGroup *routineGroup
|
||||||
metric *metric
|
metric *metric
|
||||||
errorRetryCounter int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Poller) Wait() {
|
func (p *Poller) Wait() {
|
||||||
@ -59,10 +53,10 @@ func (p *Poller) Poll(ctx context.Context, n int) error {
|
|||||||
if err := p.poll(ctx, i+1); err != nil {
|
if err := p.poll(ctx, i+1); err != nil {
|
||||||
log.WithField("thread", i+1).
|
log.WithField("thread", i+1).
|
||||||
WithError(err).Error("poll error")
|
WithError(err).Error("poll error")
|
||||||
if p.errorRetryCounter > errorRetryCounterLimit {
|
select {
|
||||||
log.WithField("thread", i+1).Error("poller: too many errors, sleeping for 30 seconds")
|
case <-ctx.Done():
|
||||||
// FIXME: it makes ctrl+c hang up
|
return
|
||||||
time.Sleep(time.Second * errorRetryTimeSleepSecs)
|
case <-time.After(5 * time.Second):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -86,19 +80,16 @@ func (p *Poller) poll(ctx context.Context, thread int) error {
|
|||||||
resp, err := p.Client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{}))
|
resp, err := p.Client.FetchTask(reqCtx, connect.NewRequest(&runnerv1.FetchTaskRequest{}))
|
||||||
if err == context.Canceled || err == context.DeadlineExceeded {
|
if err == context.Canceled || err == context.DeadlineExceeded {
|
||||||
l.WithError(err).Trace("poller: no stage returned")
|
l.WithError(err).Trace("poller: no stage returned")
|
||||||
p.errorRetryCounter++
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil && err == ErrDataLock {
|
if err != nil && err == ErrDataLock {
|
||||||
l.WithError(err).Info("task accepted by another runner")
|
l.WithError(err).Info("task accepted by another runner")
|
||||||
p.errorRetryCounter++
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.WithError(err).Error("cannot accept task")
|
l.WithError(err).Error("cannot accept task")
|
||||||
p.errorRetryCounter++
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -108,8 +99,6 @@ func (p *Poller) poll(ctx context.Context, thread int) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
p.errorRetryCounter = 0
|
|
||||||
|
|
||||||
runCtx, cancel := context.WithTimeout(ctx, time.Hour)
|
runCtx, cancel := context.WithTimeout(ctx, time.Hour)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user