From 445b7f3388b16558c056620c940e1298f50ea274 Mon Sep 17 00:00:00 2001 From: ognots Date: Wed, 22 Jan 2020 18:04:20 -0500 Subject: [PATCH] refactor lotus-health agent for robustness add retry logic when calls to API fail. if API reconnects fail, restart lotus-daemon as it means lotus-daemon is likely unhealthy. wait for lotus node's chain to sync during each check cycle, to avoid restarting lotus-daemon if needing to sync. handle SIGTERM properly. general cleanup and refactor of code, getting ready of unnecessary channels --- .gitignore | 1 + cmd/lotus-health/main.go | 193 +++++++++++++-------- cmd/lotus-health/main_test.go | 92 ++-------- cmd/lotus-health/{systemd.go => notify.go} | 9 +- 4 files changed, 149 insertions(+), 146 deletions(-) rename cmd/lotus-health/{systemd.go => notify.go} (64%) diff --git a/.gitignore b/.gitignore index ce84e704d..9989ac40b 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ /lotus-storage-miner /lotus-seal-worker /lotus-seed +/lotus-health /pond /townhall /fountain diff --git a/cmd/lotus-health/main.go b/cmd/lotus-health/main.go index fc6b7de09..b37aee29f 100644 --- a/cmd/lotus-health/main.go +++ b/cmd/lotus-health/main.go @@ -2,12 +2,17 @@ package main import ( "context" + "errors" "os" + "os/signal" + "syscall" "time" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/lib/jsonrpc" cid "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" "gopkg.in/urfave/cli.v2" @@ -41,7 +46,7 @@ func main() { } if err := app.Run(os.Args); err != nil { - log.Warn(err) + log.Fatal(err) return } } @@ -64,61 +69,72 @@ var watchHeadCmd = &cli.Command{ Value: "lotus-daemon.service", Usage: "systemd unit name to restart on health check failure", }, + &cli.IntFlag{ + Name: "api-timeout", + Value: build.BlockDelay, + Usage: "timeout between API retries", + }, + &cli.IntFlag{ + Name: "api-retries", + Value: 8, + Usage: "number of API retry attemps", + }, }, Action: func(c *cli.Context) error { + var headCheckWindow CidWindow threshold := c.Int("threshold") interval := time.Duration(c.Int("interval")) * time.Second name := c.String("systemd-unit") + apiRetries := c.Int("api-retries") + apiTimeout := time.Duration(c.Int("api-timeout")) * time.Second - var headCheckWindow CidWindow + nCh := make(chan interface{}, 1) + sCh := make(chan os.Signal, 1) + signal.Notify(sCh, os.Interrupt, syscall.SIGTERM) - api, closer, err := lcli.GetFullNodeAPI(c) + api, closer, err := getFullNodeAPI(c, apiRetries, apiTimeout) if err != nil { return err } defer closer() ctx := lcli.ReqContext(c) - if err := WaitForSyncComplete(ctx, api); err != nil { - log.Fatal(err) - } - - ch := make(chan CidWindow, 1) - aCh := make(chan interface{}, 1) - go func() { for { - headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, ch) - if err != nil { - log.Fatal(err) + log.Info("Waiting for sync to complete") + if err := waitForSyncComplete(ctx, api, apiRetries, apiTimeout); err != nil { + nCh <- err + return } + headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, apiRetries, apiTimeout) + if err != nil { + log.Warn("Failed to connect to API. Restarting systemd service") + nCh <- nil + return + } + ok := checkWindow(headCheckWindow, threshold) + if !ok { + log.Warn("Chain head has not updated. Restarting systemd service") + nCh <- nil + break + } + log.Info("Chain head is healthy") time.Sleep(interval) } + return }() - go func() { - result, err := alertHandler(name, aCh) - if err != nil { - log.Fatal(err) - } - if result != "done" { - log.Fatal("systemd unit failed to restart:", result) - } - log.Info("restarting health agent") - // Exit health agent and let supervisor restart health agent - // Restarting lotus systemd unit kills api connection - os.Exit(130) - }() - - for { - ok := checkWindow(ch, threshold) - if !ok { - log.Warn("chain head has not updated. Restarting systemd service") - aCh <- nil - break - } - log.Info("chain head is healthy") + restart, err := notifyHandler(name, nCh, sCh) + if err != nil { + return err } + if restart != "done" { + return errors.New("Systemd unit failed to restart:" + restart) + } + log.Info("Restarting health agent") + // Exit health agent and let supervisor restart health agent + // Restarting lotus systemd unit kills api connection + os.Exit(130) return nil }, } @@ -128,52 +144,67 @@ var watchHeadCmd = &cli.Command{ * compares slices of Cids when len is greater or equal to `t` - threshold * if all slices are equal, head has not updated and returns false */ -func checkWindow(ch chan CidWindow, t int) bool { - select { - case window := <-ch: - var dup int - windowLen := len(window) - if windowLen >= t { - cidWindow: - for i := range window { - next := windowLen - 1 - i - // if array length is different, head is changing - if next >= 1 && len(window[next]) != len(window[next-1]) { +func checkWindow(window CidWindow, t int) bool { + var dup int + windowLen := len(window) + if windowLen >= t { + cidWindow: + for i := range window { + next := windowLen - 1 - i + // if array length is different, head is changing + if next >= 1 && len(window[next]) != len(window[next-1]) { + break cidWindow + } + // if cids are different, head is changing + for j := range window[next] { + if next >= 1 && window[next][j] != window[next-1][j] { break cidWindow } - // if cids are different, head is changing - for j := range window[next] { - if next >= 1 && window[next][j] != window[next-1][j] { - break cidWindow - } - } - if i < (t - 1) { - dup++ - } } - - if dup == (t - 1) { - return false + if i < (t - 1) { + dup++ } } - return true + + if dup == (t - 1) { + return false + } } + return true +} + +/* + * returns a slice of slices of Cids + * len of slice <= `t` - threshold + */ +func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, r int, to time.Duration) (CidWindow, error) { + head, err := getHead(ctx, a, r, to) + if err != nil { + return nil, err + } + window := appendCIDsToWindow(w, head.Cids(), t) + return window, err } /* * get chain head from API - * returns a slice of slices of Cids - * len of slice <= `t` - threshold + * retries if API no available + * returns tipset */ -func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, ch chan CidWindow) (CidWindow, error) { - head, err := a.ChainHead(ctx) - if err != nil { - return nil, err +func getHead(ctx context.Context, a api.FullNode, r int, t time.Duration) (*types.TipSet, error) { + for i := 0; i < r; i++ { + head, err := a.ChainHead(ctx) + if err != nil && i == (r-1) { + return nil, err + } + if err != nil { + log.Warnf("Call to API failed. Retrying in %.0fs", t.Seconds()) + time.Sleep(t) + continue + } + return head, err } - - window := appendCIDsToWindow(w, head.Cids(), t) - ch <- window - return window, nil + return nil, nil } /* @@ -192,13 +223,13 @@ func appendCIDsToWindow(w CidWindow, c []cid.Cid, t int) CidWindow { /* * wait for node to sync */ -func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error { +func waitForSyncComplete(ctx context.Context, a api.FullNode, r int, t time.Duration) error { for { select { case <-ctx.Done(): return ctx.Err() case <-time.After(3 * time.Second): - head, err := napi.ChainHead(ctx) + head, err := getHead(ctx, a, r, t) if err != nil { return err } @@ -209,3 +240,23 @@ func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error { } } } + +/* + * A thin wrapper around lotus cli GetFullNodeAPI + * Adds retry logic + */ +func getFullNodeAPI(ctx *cli.Context, r int, t time.Duration) (api.FullNode, jsonrpc.ClientCloser, error) { + for i := 0; i < r; i++ { + api, closer, err := lcli.GetFullNodeAPI(ctx) + if err != nil && i == (r-1) { + return nil, nil, err + } + if err != nil { + log.Warnf("API connection failed. Retrying in %.0fs", t.Seconds()) + time.Sleep(t) + continue + } + return api, closer, err + } + return nil, nil, nil +} diff --git a/cmd/lotus-health/main_test.go b/cmd/lotus-health/main_test.go index 568877994..346376167 100644 --- a/cmd/lotus-health/main_test.go +++ b/cmd/lotus-health/main_test.go @@ -28,13 +28,8 @@ func TestAppendCIDsToWindow(t *testing.T) { func TestCheckWindow(t *testing.T) { assert := assert.New(t) - ch := make(chan CidWindow, 1) - och := make(chan bool, 1) threshold := 3 - go func() { - och <- checkWindow(ch, threshold) - }() var healthyHeadCheckWindow CidWindow healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{ makeCID("abcd"), @@ -47,15 +42,9 @@ func TestCheckWindow(t *testing.T) { makeCID("bbcd"), makeCID("bbfe"), }, threshold) - ch <- healthyHeadCheckWindow - select { - case ok := <-och: - assert.True(ok) - } + ok := checkWindow(healthyHeadCheckWindow, threshold) + assert.True(ok) - go func() { - och <- checkWindow(ch, threshold) - }() var healthyHeadCheckWindow1 CidWindow healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{ makeCID("bbcd"), @@ -69,15 +58,9 @@ func TestCheckWindow(t *testing.T) { healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{ makeCID("abcd"), }, threshold) - ch <- healthyHeadCheckWindow1 - select { - case ok := <-och: - assert.True(ok) - } + ok = checkWindow(healthyHeadCheckWindow1, threshold) + assert.True(ok) - go func() { - och <- checkWindow(ch, threshold) - }() var healthyHeadCheckWindow2 CidWindow healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{ makeCID("bbcd"), @@ -86,15 +69,9 @@ func TestCheckWindow(t *testing.T) { healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{ makeCID("abcd"), }, threshold) - ch <- healthyHeadCheckWindow2 - select { - case ok := <-och: - assert.True(ok) - } + ok = checkWindow(healthyHeadCheckWindow2, threshold) + assert.True(ok) - go func() { - och <- checkWindow(ch, threshold) - }() var healthyHeadCheckWindow3 CidWindow healthyHeadCheckWindow3 = appendCIDsToWindow(healthyHeadCheckWindow3, []cid.Cid{ makeCID("abcd"), @@ -103,29 +80,17 @@ func TestCheckWindow(t *testing.T) { makeCID("bbcd"), makeCID("bbfe"), }, threshold) - ch <- healthyHeadCheckWindow3 - select { - case ok := <-och: - assert.True(ok) - } + ok = checkWindow(healthyHeadCheckWindow3, threshold) + assert.True(ok) - go func() { - och <- checkWindow(ch, threshold) - }() var healthyHeadCheckWindow4 CidWindow healthyHeadCheckWindow4 = appendCIDsToWindow(healthyHeadCheckWindow4, []cid.Cid{ makeCID("bbcd"), makeCID("bbfe"), }, threshold) - ch <- healthyHeadCheckWindow4 - select { - case ok := <-och: - assert.True(ok) - } + ok = checkWindow(healthyHeadCheckWindow4, threshold) + assert.True(ok) - go func() { - och <- checkWindow(ch, 5) - }() var healthyHeadCheckWindow5 CidWindow healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{ makeCID("bbcd"), @@ -147,15 +112,9 @@ func TestCheckWindow(t *testing.T) { makeCID("cbcd"), makeCID("cbfe"), }, 5) - ch <- healthyHeadCheckWindow5 - select { - case ok := <-och: - assert.True(ok) - } + ok = checkWindow(healthyHeadCheckWindow5, threshold) + assert.True(ok) - go func() { - och <- checkWindow(ch, threshold) - }() var unhealthyHeadCheckWindow CidWindow unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{ makeCID("abcd"), @@ -169,15 +128,9 @@ func TestCheckWindow(t *testing.T) { makeCID("abcd"), makeCID("fbcd"), }, threshold) - ch <- unhealthyHeadCheckWindow - select { - case ok := <-och: - assert.False(ok) - } + ok = checkWindow(unhealthyHeadCheckWindow, threshold) + assert.False(ok) - go func() { - och <- checkWindow(ch, threshold) - }() var unhealthyHeadCheckWindow1 CidWindow unhealthyHeadCheckWindow1 = appendCIDsToWindow(unhealthyHeadCheckWindow1, []cid.Cid{ makeCID("abcd"), @@ -187,15 +140,9 @@ func TestCheckWindow(t *testing.T) { makeCID("abcd"), makeCID("fbcd"), }, threshold) - ch <- unhealthyHeadCheckWindow1 - select { - case ok := <-och: - assert.True(ok) - } + ok = checkWindow(unhealthyHeadCheckWindow1, threshold) + assert.True(ok) - go func() { - och <- checkWindow(ch, threshold) - }() var unhealthyHeadCheckWindow2 CidWindow unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{ makeCID("abcd"), @@ -206,11 +153,8 @@ func TestCheckWindow(t *testing.T) { unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{ makeCID("abcd"), }, threshold) - ch <- unhealthyHeadCheckWindow2 - select { - case ok := <-och: - assert.False(ok) - } + ok = checkWindow(unhealthyHeadCheckWindow2, threshold) + assert.False(ok) } func makeCID(s string) cid.Cid { diff --git a/cmd/lotus-health/systemd.go b/cmd/lotus-health/notify.go similarity index 64% rename from cmd/lotus-health/systemd.go rename to cmd/lotus-health/notify.go index 0287204bc..a80dcf2ee 100644 --- a/cmd/lotus-health/systemd.go +++ b/cmd/lotus-health/notify.go @@ -1,11 +1,14 @@ package main import ( + "os" + "github.com/coreos/go-systemd/dbus" ) -func alertHandler(n string, ch chan interface{}) (string, error) { +func notifyHandler(n string, ch chan interface{}, sCh chan os.Signal) (string, error) { select { + // alerts to restart systemd unit case <-ch: statusCh := make(chan string, 1) c, err := dbus.New() @@ -20,5 +23,9 @@ func alertHandler(n string, ch chan interface{}) (string, error) { case result := <-statusCh: return result, nil } + // SIGTERM + case <-sCh: + os.Exit(1) + return "", nil } }