lotus/cmd/lotus-health/main.go
ognots effacec817 range over right index to prevent bounds errors
the test scenario 'healthyHeadCheckWindow5' was causing index out of bounds errors.
the second range function in checkWindow was iterating over the incorrect slice of cids.
should be comparing latest items in slices first
2020-01-21 16:07:53 -05:00

212 lines
4.4 KiB
Go

package main
import (
"context"
"os"
"time"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
lcli "github.com/filecoin-project/lotus/cli"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"gopkg.in/urfave/cli.v2"
)
type CidWindow [][]cid.Cid
var log = logging.Logger("lotus-health")
func main() {
logging.SetLogLevel("*", "INFO")
log.Info("Starting health agent")
local := []*cli.Command{
watchHeadCmd,
}
app := &cli.App{
Name: "lotus-health",
Usage: "Tools for monitoring lotus daemon health",
Version: build.UserVersion,
Commands: local,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
EnvVars: []string{"LOTUS_PATH"},
Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME
},
},
}
if err := app.Run(os.Args); err != nil {
log.Warn(err)
return
}
}
var watchHeadCmd = &cli.Command{
Name: "watch-head",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "threshold",
Value: 3,
Usage: "number of times head remains unchanged before failing health check",
},
&cli.IntFlag{
Name: "interval",
Value: build.BlockDelay,
Usage: "interval in seconds between chain head checks",
},
&cli.StringFlag{
Name: "systemd-unit",
Value: "lotus-daemon.service",
Usage: "systemd unit name to restart on health check failure",
},
},
Action: func(c *cli.Context) error {
threshold := c.Int("threshold")
interval := time.Duration(c.Int("interval")) * time.Second
name := c.String("systemd-unit")
var headCheckWindow CidWindow
api, closer, err := lcli.GetFullNodeAPI(c)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(c)
if err := WaitForSyncComplete(ctx, api); err != nil {
log.Fatal(err)
}
ch := make(chan CidWindow, 1)
aCh := make(chan interface{}, 1)
go func() {
for {
headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, ch)
if err != nil {
log.Fatal(err)
}
time.Sleep(interval)
}
}()
go func() {
result, err := alertHandler(name, aCh)
if err != nil {
log.Fatal(err)
}
if result != "done" {
log.Fatal("systemd unit failed to restart:", result)
}
log.Info("restarting health agent")
// Exit health agent and let supervisor restart health agent
// Restarting lotus systemd unit kills api connection
os.Exit(130)
}()
for {
ok := checkWindow(ch, threshold)
if !ok {
log.Warn("chain head has not updated. Restarting systemd service")
aCh <- nil
break
}
log.Info("chain head is healthy")
}
return nil
},
}
/*
* reads channel of slices of Cids
* compares slices of Cids when len is greater or equal to `t` - threshold
* if all slices are equal, head has not updated and returns false
*/
func checkWindow(ch chan CidWindow, t int) bool {
select {
case window := <-ch:
var dup int
windowLen := len(window)
if windowLen >= t {
cidWindow:
for i := range window {
next := windowLen - 1 - i
// if array length is different, head is changing
if next >= 1 && len(window[next]) != len(window[next-1]) {
break cidWindow
}
// if cids are different, head is changing
for j := range window[next] {
if next >= 1 && window[next][j] != window[next-1][j] {
break cidWindow
}
}
if i < (t - 1) {
dup++
}
}
if dup == (t - 1) {
return false
}
}
return true
}
}
/*
* get chain head from API
* returns a slice of slices of Cids
* len of slice <= `t` - threshold
*/
func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, ch chan CidWindow) (CidWindow, error) {
head, err := a.ChainHead(ctx)
if err != nil {
return nil, err
}
window := appendCIDsToWindow(w, head.Cids(), t)
ch <- window
return window, nil
}
/*
* appends slice of Cids to window slice
* keeps a fixed window slice size, dropping older slices
* returns new window
*/
func appendCIDsToWindow(w CidWindow, c []cid.Cid, t int) CidWindow {
offset := len(w) - t + 1
if offset >= 0 {
return append(w[offset:], c)
}
return append(w, c)
}
/*
* wait for node to sync
*/
func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(3 * time.Second):
head, err := napi.ChainHead(ctx)
if err != nil {
return err
}
if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay {
return nil
}
}
}
}