refactor lotus-health agent for robustness

add retry logic when calls to API fail.
if API reconnects fail, restart lotus-daemon as it means lotus-daemon is likely unhealthy.

wait for lotus node's chain to sync during each check cycle, to avoid restarting lotus-daemon if needing to sync.

handle SIGTERM properly.

general cleanup and refactor of code, getting ready of unnecessary channels
This commit is contained in:
ognots 2020-01-22 18:04:20 -05:00
parent 4310cc23ce
commit 445b7f3388
4 changed files with 149 additions and 146 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@
/lotus-storage-miner /lotus-storage-miner
/lotus-seal-worker /lotus-seal-worker
/lotus-seed /lotus-seed
/lotus-health
/pond /pond
/townhall /townhall
/fountain /fountain

View File

@ -2,12 +2,17 @@ package main
import ( import (
"context" "context"
"errors"
"os" "os"
"os/signal"
"syscall"
"time" "time"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/jsonrpc"
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
"gopkg.in/urfave/cli.v2" "gopkg.in/urfave/cli.v2"
@ -41,7 +46,7 @@ func main() {
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
log.Warn(err) log.Fatal(err)
return return
} }
} }
@ -64,61 +69,72 @@ var watchHeadCmd = &cli.Command{
Value: "lotus-daemon.service", Value: "lotus-daemon.service",
Usage: "systemd unit name to restart on health check failure", Usage: "systemd unit name to restart on health check failure",
}, },
&cli.IntFlag{
Name: "api-timeout",
Value: build.BlockDelay,
Usage: "timeout between API retries",
},
&cli.IntFlag{
Name: "api-retries",
Value: 8,
Usage: "number of API retry attemps",
},
}, },
Action: func(c *cli.Context) error { Action: func(c *cli.Context) error {
var headCheckWindow CidWindow
threshold := c.Int("threshold") threshold := c.Int("threshold")
interval := time.Duration(c.Int("interval")) * time.Second interval := time.Duration(c.Int("interval")) * time.Second
name := c.String("systemd-unit") name := c.String("systemd-unit")
apiRetries := c.Int("api-retries")
apiTimeout := time.Duration(c.Int("api-timeout")) * time.Second
var headCheckWindow CidWindow nCh := make(chan interface{}, 1)
sCh := make(chan os.Signal, 1)
signal.Notify(sCh, os.Interrupt, syscall.SIGTERM)
api, closer, err := lcli.GetFullNodeAPI(c) api, closer, err := getFullNodeAPI(c, apiRetries, apiTimeout)
if err != nil { if err != nil {
return err return err
} }
defer closer() defer closer()
ctx := lcli.ReqContext(c) ctx := lcli.ReqContext(c)
if err := WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
ch := make(chan CidWindow, 1)
aCh := make(chan interface{}, 1)
go func() { go func() {
for { for {
headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, ch) log.Info("Waiting for sync to complete")
if err != nil { if err := waitForSyncComplete(ctx, api, apiRetries, apiTimeout); err != nil {
log.Fatal(err) nCh <- err
return
} }
headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, apiRetries, apiTimeout)
if err != nil {
log.Warn("Failed to connect to API. Restarting systemd service")
nCh <- nil
return
}
ok := checkWindow(headCheckWindow, threshold)
if !ok {
log.Warn("Chain head has not updated. Restarting systemd service")
nCh <- nil
break
}
log.Info("Chain head is healthy")
time.Sleep(interval) time.Sleep(interval)
} }
return
}() }()
go func() { restart, err := notifyHandler(name, nCh, sCh)
result, err := alertHandler(name, aCh)
if err != nil { if err != nil {
log.Fatal(err) return err
} }
if result != "done" { if restart != "done" {
log.Fatal("systemd unit failed to restart:", result) return errors.New("Systemd unit failed to restart:" + restart)
} }
log.Info("restarting health agent") log.Info("Restarting health agent")
// Exit health agent and let supervisor restart health agent // Exit health agent and let supervisor restart health agent
// Restarting lotus systemd unit kills api connection // Restarting lotus systemd unit kills api connection
os.Exit(130) os.Exit(130)
}()
for {
ok := checkWindow(ch, threshold)
if !ok {
log.Warn("chain head has not updated. Restarting systemd service")
aCh <- nil
break
}
log.Info("chain head is healthy")
}
return nil return nil
}, },
} }
@ -128,9 +144,7 @@ var watchHeadCmd = &cli.Command{
* compares slices of Cids when len is greater or equal to `t` - threshold * compares slices of Cids when len is greater or equal to `t` - threshold
* if all slices are equal, head has not updated and returns false * if all slices are equal, head has not updated and returns false
*/ */
func checkWindow(ch chan CidWindow, t int) bool { func checkWindow(window CidWindow, t int) bool {
select {
case window := <-ch:
var dup int var dup int
windowLen := len(window) windowLen := len(window)
if windowLen >= t { if windowLen >= t {
@ -158,22 +172,39 @@ func checkWindow(ch chan CidWindow, t int) bool {
} }
return true return true
} }
/*
* returns a slice of slices of Cids
* len of slice <= `t` - threshold
*/
func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, r int, to time.Duration) (CidWindow, error) {
head, err := getHead(ctx, a, r, to)
if err != nil {
return nil, err
}
window := appendCIDsToWindow(w, head.Cids(), t)
return window, err
} }
/* /*
* get chain head from API * get chain head from API
* returns a slice of slices of Cids * retries if API no available
* len of slice <= `t` - threshold * returns tipset
*/ */
func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, ch chan CidWindow) (CidWindow, error) { func getHead(ctx context.Context, a api.FullNode, r int, t time.Duration) (*types.TipSet, error) {
for i := 0; i < r; i++ {
head, err := a.ChainHead(ctx) head, err := a.ChainHead(ctx)
if err != nil { if err != nil && i == (r-1) {
return nil, err return nil, err
} }
if err != nil {
window := appendCIDsToWindow(w, head.Cids(), t) log.Warnf("Call to API failed. Retrying in %.0fs", t.Seconds())
ch <- window time.Sleep(t)
return window, nil continue
}
return head, err
}
return nil, nil
} }
/* /*
@ -192,13 +223,13 @@ func appendCIDsToWindow(w CidWindow, c []cid.Cid, t int) CidWindow {
/* /*
* wait for node to sync * wait for node to sync
*/ */
func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error { func waitForSyncComplete(ctx context.Context, a api.FullNode, r int, t time.Duration) error {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
head, err := napi.ChainHead(ctx) head, err := getHead(ctx, a, r, t)
if err != nil { if err != nil {
return err return err
} }
@ -209,3 +240,23 @@ func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
} }
} }
} }
/*
* A thin wrapper around lotus cli GetFullNodeAPI
* Adds retry logic
*/
func getFullNodeAPI(ctx *cli.Context, r int, t time.Duration) (api.FullNode, jsonrpc.ClientCloser, error) {
for i := 0; i < r; i++ {
api, closer, err := lcli.GetFullNodeAPI(ctx)
if err != nil && i == (r-1) {
return nil, nil, err
}
if err != nil {
log.Warnf("API connection failed. Retrying in %.0fs", t.Seconds())
time.Sleep(t)
continue
}
return api, closer, err
}
return nil, nil, nil
}

View File

@ -28,13 +28,8 @@ func TestAppendCIDsToWindow(t *testing.T) {
func TestCheckWindow(t *testing.T) { func TestCheckWindow(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
ch := make(chan CidWindow, 1)
och := make(chan bool, 1)
threshold := 3 threshold := 3
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow CidWindow var healthyHeadCheckWindow CidWindow
healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{ healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{
makeCID("abcd"), makeCID("abcd"),
@ -47,15 +42,9 @@ func TestCheckWindow(t *testing.T) {
makeCID("bbcd"), makeCID("bbcd"),
makeCID("bbfe"), makeCID("bbfe"),
}, threshold) }, threshold)
ch <- healthyHeadCheckWindow ok := checkWindow(healthyHeadCheckWindow, threshold)
select {
case ok := <-och:
assert.True(ok) assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow1 CidWindow var healthyHeadCheckWindow1 CidWindow
healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{ healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{
makeCID("bbcd"), makeCID("bbcd"),
@ -69,15 +58,9 @@ func TestCheckWindow(t *testing.T) {
healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{ healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{
makeCID("abcd"), makeCID("abcd"),
}, threshold) }, threshold)
ch <- healthyHeadCheckWindow1 ok = checkWindow(healthyHeadCheckWindow1, threshold)
select {
case ok := <-och:
assert.True(ok) assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow2 CidWindow var healthyHeadCheckWindow2 CidWindow
healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{ healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{
makeCID("bbcd"), makeCID("bbcd"),
@ -86,15 +69,9 @@ func TestCheckWindow(t *testing.T) {
healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{ healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"), makeCID("abcd"),
}, threshold) }, threshold)
ch <- healthyHeadCheckWindow2 ok = checkWindow(healthyHeadCheckWindow2, threshold)
select {
case ok := <-och:
assert.True(ok) assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow3 CidWindow var healthyHeadCheckWindow3 CidWindow
healthyHeadCheckWindow3 = appendCIDsToWindow(healthyHeadCheckWindow3, []cid.Cid{ healthyHeadCheckWindow3 = appendCIDsToWindow(healthyHeadCheckWindow3, []cid.Cid{
makeCID("abcd"), makeCID("abcd"),
@ -103,29 +80,17 @@ func TestCheckWindow(t *testing.T) {
makeCID("bbcd"), makeCID("bbcd"),
makeCID("bbfe"), makeCID("bbfe"),
}, threshold) }, threshold)
ch <- healthyHeadCheckWindow3 ok = checkWindow(healthyHeadCheckWindow3, threshold)
select {
case ok := <-och:
assert.True(ok) assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow4 CidWindow var healthyHeadCheckWindow4 CidWindow
healthyHeadCheckWindow4 = appendCIDsToWindow(healthyHeadCheckWindow4, []cid.Cid{ healthyHeadCheckWindow4 = appendCIDsToWindow(healthyHeadCheckWindow4, []cid.Cid{
makeCID("bbcd"), makeCID("bbcd"),
makeCID("bbfe"), makeCID("bbfe"),
}, threshold) }, threshold)
ch <- healthyHeadCheckWindow4 ok = checkWindow(healthyHeadCheckWindow4, threshold)
select {
case ok := <-och:
assert.True(ok) assert.True(ok)
}
go func() {
och <- checkWindow(ch, 5)
}()
var healthyHeadCheckWindow5 CidWindow var healthyHeadCheckWindow5 CidWindow
healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{ healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{
makeCID("bbcd"), makeCID("bbcd"),
@ -147,15 +112,9 @@ func TestCheckWindow(t *testing.T) {
makeCID("cbcd"), makeCID("cbcd"),
makeCID("cbfe"), makeCID("cbfe"),
}, 5) }, 5)
ch <- healthyHeadCheckWindow5 ok = checkWindow(healthyHeadCheckWindow5, threshold)
select {
case ok := <-och:
assert.True(ok) assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow CidWindow var unhealthyHeadCheckWindow CidWindow
unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{ unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{
makeCID("abcd"), makeCID("abcd"),
@ -169,15 +128,9 @@ func TestCheckWindow(t *testing.T) {
makeCID("abcd"), makeCID("abcd"),
makeCID("fbcd"), makeCID("fbcd"),
}, threshold) }, threshold)
ch <- unhealthyHeadCheckWindow ok = checkWindow(unhealthyHeadCheckWindow, threshold)
select {
case ok := <-och:
assert.False(ok) assert.False(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow1 CidWindow var unhealthyHeadCheckWindow1 CidWindow
unhealthyHeadCheckWindow1 = appendCIDsToWindow(unhealthyHeadCheckWindow1, []cid.Cid{ unhealthyHeadCheckWindow1 = appendCIDsToWindow(unhealthyHeadCheckWindow1, []cid.Cid{
makeCID("abcd"), makeCID("abcd"),
@ -187,15 +140,9 @@ func TestCheckWindow(t *testing.T) {
makeCID("abcd"), makeCID("abcd"),
makeCID("fbcd"), makeCID("fbcd"),
}, threshold) }, threshold)
ch <- unhealthyHeadCheckWindow1 ok = checkWindow(unhealthyHeadCheckWindow1, threshold)
select {
case ok := <-och:
assert.True(ok) assert.True(ok)
}
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow2 CidWindow var unhealthyHeadCheckWindow2 CidWindow
unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{ unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"), makeCID("abcd"),
@ -206,12 +153,9 @@ func TestCheckWindow(t *testing.T) {
unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{ unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"), makeCID("abcd"),
}, threshold) }, threshold)
ch <- unhealthyHeadCheckWindow2 ok = checkWindow(unhealthyHeadCheckWindow2, threshold)
select {
case ok := <-och:
assert.False(ok) assert.False(ok)
} }
}
func makeCID(s string) cid.Cid { func makeCID(s string) cid.Cid {
h1, err := mh.Sum([]byte(s), mh.SHA2_256, -1) h1, err := mh.Sum([]byte(s), mh.SHA2_256, -1)

View File

@ -1,11 +1,14 @@
package main package main
import ( import (
"os"
"github.com/coreos/go-systemd/dbus" "github.com/coreos/go-systemd/dbus"
) )
func alertHandler(n string, ch chan interface{}) (string, error) { func notifyHandler(n string, ch chan interface{}, sCh chan os.Signal) (string, error) {
select { select {
// alerts to restart systemd unit
case <-ch: case <-ch:
statusCh := make(chan string, 1) statusCh := make(chan string, 1)
c, err := dbus.New() c, err := dbus.New()
@ -20,5 +23,9 @@ func alertHandler(n string, ch chan interface{}) (string, error) {
case result := <-statusCh: case result := <-statusCh:
return result, nil return result, nil
} }
// SIGTERM
case <-sCh:
os.Exit(1)
return "", nil
} }
} }