lotus/cmd/lotus-health/main.go
ognots 0c6e4c6c40 fixes health agent bug
was passing wrong variable to updateWindow function argument.
also updates duplicate function comment
2020-01-20 20:34:13 -05:00

212 lines
4.4 KiB
Go

package main
import (
"context"
"os"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"gopkg.in/urfave/cli.v2"
)
type CidWindow [][]cid.Cid
var log = logging.Logger("lotus-health")
func main() {
logging.SetLogLevel("*", "INFO")
log.Info("Starting health agent")
local := []*cli.Command{
watchHeadCmd,
}
app := &cli.App{
Name: "lotus-health",
Usage: "Tools for monitoring lotus daemon health",
Version: build.UserVersion,
Commands: local,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"LOTUS_PATH"},
Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME
},
},
}
if err := app.Run(os.Args); err != nil {
log.Warn(err)
return
}
}
var watchHeadCmd = &cli.Command{
Name: "watch-head",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "threshold",
Value: 3,
Usage: "number of times head remains unchanged before failing health check",
},
&cli.IntFlag{
Name: "interval",
Value: build.BlockDelay,
Usage: "interval in seconds between chain head checks",
},
&cli.StringFlag{
Name: "systemd-unit",
Value: "lotus-daemon.service",
Usage: "systemd unit name to restart on health check failure",
},
},
Action: func(c *cli.Context) error {
threshold := c.Int("threshold")
interval := time.Duration(c.Int("interval")) * time.Second
name := c.String("systemd-unit")
var headCheckWindow CidWindow
api, closer, err := lcli.GetFullNodeAPI(c)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(c)
if err := WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
ch := make(chan CidWindow, 1)
aCh := make(chan interface{}, 1)
go func() {
for {
headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, ch)
if err != nil {
log.Fatal(err)
}
time.Sleep(interval)
}
}()
go func() {
result, err := alertHandler(name, aCh)
if err != nil {
log.Fatal(err)
}
if result != "done" {
log.Fatal("systemd unit failed to restart:", result)
}
log.Info("restarting health agent")
// Exit health agent and let supervisor restart health agent
// Restarting lotus systemd unit kills api connection
os.Exit(130)
}()
for {
ok := checkWindow(ch, threshold)
if !ok {
log.Warn("chain head has not updated. Restarting systemd service")
aCh <- nil
break
}
log.Info("chain head is healthy")
}
return nil
},
}
/*
* reads channel of slices of Cids
* compares slices of Cids when len is greater or equal to `t` - threshold
* if all slices are equal, head has not updated and returns false
*/
func checkWindow(ch chan CidWindow, t int) bool {
select {
case window := <-ch:
var dup int
windowLen := len(window)
if windowLen >= t {
cidWindow:
for i, cids := range window {
next := windowLen - 1 - i
// if array length is different, head is changing
if next >= 1 && len(window[next]) != len(window[next-1]) {
break cidWindow
}
// if cids are different, head is changing
for j := range cids {
if next >= 1 && window[next][j] != window[next-1][j] {
break cidWindow
}
}
if i < (t - 1) {
dup++
}
}
if dup == (t - 1) {
return false
}
}
return true
}
}
/*
* get chain head from API
* returns a slice of slices of Cids
* len of slice <= `t` - threshold
*/
func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, ch chan CidWindow) (CidWindow, error) {
head, err := a.ChainHead(ctx)
if err != nil {
return nil, err
}
window := appendCIDsToWindow(w, head.Cids(), t)
ch <- window
return window, nil
}
/*
* appends slice of Cids to window slice
* keeps a fixed window slice size, dropping older slices
* returns new window
*/
func appendCIDsToWindow(w CidWindow, c []cid.Cid, t int) CidWindow {
offset := len(w) - t + 1
if offset >= 0 {
return append(w[offset:], c)
}
return append(w, c)
}
/*
* wait for node to sync
*/
func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(3 * time.Second):
head, err := napi.ChainHead(ctx)
if err != nil {
return err
}
if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay {
return nil
}
}
}
}