Merge pull request #1151 from filecoin-project/lotus-health-v2

refactor lotus-health agent for robustness
This commit is contained in:
Whyrusleeping 2020-01-24 11:44:47 -08:00 committed by GitHub
commit 351a683812
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 149 additions and 146 deletions

1
.gitignore vendored
View File

@ -2,6 +2,7 @@
/lotus-storage-miner
/lotus-seal-worker
/lotus-seed
/lotus-health
/pond
/townhall
/fountain

View File

@ -2,12 +2,17 @@ package main
import (
"context"
"errors"
"os"
"os/signal"
"syscall"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/jsonrpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"gopkg.in/urfave/cli.v2"
@ -41,7 +46,7 @@ func main() {
}
if err := app.Run(os.Args); err != nil {
log.Warn(err)
log.Fatal(err)
return
}
}
@ -64,61 +69,72 @@ var watchHeadCmd = &cli.Command{
Value: "lotus-daemon.service",
Usage: "systemd unit name to restart on health check failure",
},
&cli.IntFlag{
Name: "api-timeout",
Value: build.BlockDelay,
Usage: "timeout between API retries",
},
&cli.IntFlag{
Name: "api-retries",
Value: 8,
Usage: "number of API retry attemps",
},
},
Action: func(c *cli.Context) error {
var headCheckWindow CidWindow
threshold := c.Int("threshold")
interval := time.Duration(c.Int("interval")) * time.Second
name := c.String("systemd-unit")
apiRetries := c.Int("api-retries")
apiTimeout := time.Duration(c.Int("api-timeout")) * time.Second
var headCheckWindow CidWindow
nCh := make(chan interface{}, 1)
sCh := make(chan os.Signal, 1)
signal.Notify(sCh, os.Interrupt, syscall.SIGTERM)
api, closer, err := lcli.GetFullNodeAPI(c)
api, closer, err := getFullNodeAPI(c, apiRetries, apiTimeout)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(c)
if err := WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
ch := make(chan CidWindow, 1)
aCh := make(chan interface{}, 1)
go func() {
for {
headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, ch)
if err != nil {
log.Fatal(err)
log.Info("Waiting for sync to complete")
if err := waitForSyncComplete(ctx, api, apiRetries, apiTimeout); err != nil {
nCh <- err
return
}
headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, apiRetries, apiTimeout)
if err != nil {
log.Warn("Failed to connect to API. Restarting systemd service")
nCh <- nil
return
}
ok := checkWindow(headCheckWindow, threshold)
if !ok {
log.Warn("Chain head has not updated. Restarting systemd service")
nCh <- nil
break
}
log.Info("Chain head is healthy")
time.Sleep(interval)
}
return
}()
go func() {
result, err := alertHandler(name, aCh)
if err != nil {
log.Fatal(err)
}
if result != "done" {
log.Fatal("systemd unit failed to restart:", result)
}
log.Info("restarting health agent")
// Exit health agent and let supervisor restart health agent
// Restarting lotus systemd unit kills api connection
os.Exit(130)
}()
for {
ok := checkWindow(ch, threshold)
if !ok {
log.Warn("chain head has not updated. Restarting systemd service")
aCh <- nil
break
}
log.Info("chain head is healthy")
restart, err := notifyHandler(name, nCh, sCh)
if err != nil {
return err
}
if restart != "done" {
return errors.New("Systemd unit failed to restart:" + restart)
}
log.Info("Restarting health agent")
// Exit health agent and let supervisor restart health agent
// Restarting lotus systemd unit kills api connection
os.Exit(130)
return nil
},
}
@ -128,52 +144,67 @@ var watchHeadCmd = &cli.Command{
* compares slices of Cids when len is greater or equal to `t` - threshold
* if all slices are equal, head has not updated and returns false
*/
func checkWindow(ch chan CidWindow, t int) bool {
select {
case window := <-ch:
var dup int
windowLen := len(window)
if windowLen >= t {
cidWindow:
for i := range window {
next := windowLen - 1 - i
// if array length is different, head is changing
if next >= 1 && len(window[next]) != len(window[next-1]) {
func checkWindow(window CidWindow, t int) bool {
var dup int
windowLen := len(window)
if windowLen >= t {
cidWindow:
for i := range window {
next := windowLen - 1 - i
// if array length is different, head is changing
if next >= 1 && len(window[next]) != len(window[next-1]) {
break cidWindow
}
// if cids are different, head is changing
for j := range window[next] {
if next >= 1 && window[next][j] != window[next-1][j] {
break cidWindow
}
// if cids are different, head is changing
for j := range window[next] {
if next >= 1 && window[next][j] != window[next-1][j] {
break cidWindow
}
}
if i < (t - 1) {
dup++
}
}
if dup == (t - 1) {
return false
if i < (t - 1) {
dup++
}
}
return true
if dup == (t - 1) {
return false
}
}
return true
}
/*
* returns a slice of slices of Cids
* len of slice <= `t` - threshold
*/
func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, r int, to time.Duration) (CidWindow, error) {
head, err := getHead(ctx, a, r, to)
if err != nil {
return nil, err
}
window := appendCIDsToWindow(w, head.Cids(), t)
return window, err
}
/*
* get chain head from API
* returns a slice of slices of Cids
* len of slice <= `t` - threshold
* retries if API no available
* returns tipset
*/
func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, ch chan CidWindow) (CidWindow, error) {
head, err := a.ChainHead(ctx)
if err != nil {
return nil, err
func getHead(ctx context.Context, a api.FullNode, r int, t time.Duration) (*types.TipSet, error) {
for i := 0; i < r; i++ {
head, err := a.ChainHead(ctx)
if err != nil && i == (r-1) {
return nil, err
}
if err != nil {
log.Warnf("Call to API failed. Retrying in %.0fs", t.Seconds())
time.Sleep(t)
continue
}
return head, err
}
window := appendCIDsToWindow(w, head.Cids(), t)
ch <- window
return window, nil
return nil, nil
}
/*
@ -192,13 +223,13 @@ func appendCIDsToWindow(w CidWindow, c []cid.Cid, t int) CidWindow {
/*
* wait for node to sync
*/
func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
func waitForSyncComplete(ctx context.Context, a api.FullNode, r int, t time.Duration) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(3 * time.Second):
head, err := napi.ChainHead(ctx)
head, err := getHead(ctx, a, r, t)
if err != nil {
return err
}
@ -209,3 +240,23 @@ func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
}
}
}
/*
* A thin wrapper around lotus cli GetFullNodeAPI
* Adds retry logic
*/
func getFullNodeAPI(ctx *cli.Context, r int, t time.Duration) (api.FullNode, jsonrpc.ClientCloser, error) {
for i := 0; i < r; i++ {
api, closer, err := lcli.GetFullNodeAPI(ctx)
if err != nil && i == (r-1) {
return nil, nil, err
}
if err != nil {
log.Warnf("API connection failed. Retrying in %.0fs", t.Seconds())
time.Sleep(t)
continue
}
return api, closer, err
}
return nil, nil, nil
}

View File

@ -28,13 +28,8 @@ func TestAppendCIDsToWindow(t *testing.T) {
func TestCheckWindow(t *testing.T) {
assert := assert.New(t)
ch := make(chan CidWindow, 1)
och := make(chan bool, 1)
threshold := 3
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow CidWindow
healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{
makeCID("abcd"),
@ -47,15 +42,9 @@ func TestCheckWindow(t *testing.T) {
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
ch <- healthyHeadCheckWindow
select {
case ok := <-och:
assert.True(ok)
}
ok := checkWindow(healthyHeadCheckWindow, threshold)
assert.True(ok)
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow1 CidWindow
healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{
makeCID("bbcd"),
@ -69,15 +58,9 @@ func TestCheckWindow(t *testing.T) {
healthyHeadCheckWindow1 = appendCIDsToWindow(healthyHeadCheckWindow1, []cid.Cid{
makeCID("abcd"),
}, threshold)
ch <- healthyHeadCheckWindow1
select {
case ok := <-och:
assert.True(ok)
}
ok = checkWindow(healthyHeadCheckWindow1, threshold)
assert.True(ok)
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow2 CidWindow
healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{
makeCID("bbcd"),
@ -86,15 +69,9 @@ func TestCheckWindow(t *testing.T) {
healthyHeadCheckWindow2 = appendCIDsToWindow(healthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"),
}, threshold)
ch <- healthyHeadCheckWindow2
select {
case ok := <-och:
assert.True(ok)
}
ok = checkWindow(healthyHeadCheckWindow2, threshold)
assert.True(ok)
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow3 CidWindow
healthyHeadCheckWindow3 = appendCIDsToWindow(healthyHeadCheckWindow3, []cid.Cid{
makeCID("abcd"),
@ -103,29 +80,17 @@ func TestCheckWindow(t *testing.T) {
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
ch <- healthyHeadCheckWindow3
select {
case ok := <-och:
assert.True(ok)
}
ok = checkWindow(healthyHeadCheckWindow3, threshold)
assert.True(ok)
go func() {
och <- checkWindow(ch, threshold)
}()
var healthyHeadCheckWindow4 CidWindow
healthyHeadCheckWindow4 = appendCIDsToWindow(healthyHeadCheckWindow4, []cid.Cid{
makeCID("bbcd"),
makeCID("bbfe"),
}, threshold)
ch <- healthyHeadCheckWindow4
select {
case ok := <-och:
assert.True(ok)
}
ok = checkWindow(healthyHeadCheckWindow4, threshold)
assert.True(ok)
go func() {
och <- checkWindow(ch, 5)
}()
var healthyHeadCheckWindow5 CidWindow
healthyHeadCheckWindow5 = appendCIDsToWindow(healthyHeadCheckWindow5, []cid.Cid{
makeCID("bbcd"),
@ -147,15 +112,9 @@ func TestCheckWindow(t *testing.T) {
makeCID("cbcd"),
makeCID("cbfe"),
}, 5)
ch <- healthyHeadCheckWindow5
select {
case ok := <-och:
assert.True(ok)
}
ok = checkWindow(healthyHeadCheckWindow5, threshold)
assert.True(ok)
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow CidWindow
unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{
makeCID("abcd"),
@ -169,15 +128,9 @@ func TestCheckWindow(t *testing.T) {
makeCID("abcd"),
makeCID("fbcd"),
}, threshold)
ch <- unhealthyHeadCheckWindow
select {
case ok := <-och:
assert.False(ok)
}
ok = checkWindow(unhealthyHeadCheckWindow, threshold)
assert.False(ok)
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow1 CidWindow
unhealthyHeadCheckWindow1 = appendCIDsToWindow(unhealthyHeadCheckWindow1, []cid.Cid{
makeCID("abcd"),
@ -187,15 +140,9 @@ func TestCheckWindow(t *testing.T) {
makeCID("abcd"),
makeCID("fbcd"),
}, threshold)
ch <- unhealthyHeadCheckWindow1
select {
case ok := <-och:
assert.True(ok)
}
ok = checkWindow(unhealthyHeadCheckWindow1, threshold)
assert.True(ok)
go func() {
och <- checkWindow(ch, threshold)
}()
var unhealthyHeadCheckWindow2 CidWindow
unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"),
@ -206,11 +153,8 @@ func TestCheckWindow(t *testing.T) {
unhealthyHeadCheckWindow2 = appendCIDsToWindow(unhealthyHeadCheckWindow2, []cid.Cid{
makeCID("abcd"),
}, threshold)
ch <- unhealthyHeadCheckWindow2
select {
case ok := <-och:
assert.False(ok)
}
ok = checkWindow(unhealthyHeadCheckWindow2, threshold)
assert.False(ok)
}
func makeCID(s string) cid.Cid {

View File

@ -1,11 +1,14 @@
package main
import (
"os"
"github.com/coreos/go-systemd/dbus"
)
func alertHandler(n string, ch chan interface{}) (string, error) {
func notifyHandler(n string, ch chan interface{}, sCh chan os.Signal) (string, error) {
select {
// alerts to restart systemd unit
case <-ch:
statusCh := make(chan string, 1)
c, err := dbus.New()
@ -20,5 +23,9 @@ func alertHandler(n string, ch chan interface{}) (string, error) {
case result := <-statusCh:
return result, nil
}
// SIGTERM
case <-sCh:
os.Exit(1)
return "", nil
}
}