diff --git a/Makefile b/Makefile index b81840767..b839347dc 100644 --- a/Makefile +++ b/Makefile @@ -131,7 +131,7 @@ BINS+=fountain chainwatch: rm -f chainwatch go build -o chainwatch ./cmd/lotus-chainwatch - go run github.com/GeertJohan/go.rice/rice append --exec chainwatch -i ./cmd/lotus-chainwatch + go run github.com/GeertJohan/go.rice/rice append --exec chainwatch -i ./cmd/lotus-chainwatch -i ./build .PHONY: chainwatch BINS+=chainwatch @@ -149,6 +149,14 @@ stats: .PHONY: stats BINS+=stats +health: + rm -f lotus-health + go build -o lotus-health ./cmd/lotus-health + go run github.com/GeertJohan/go.rice/rice append --exec lotus-health -i ./build + +.PHONY: health +BINS+=health + # MISC buildall: $(BINS) diff --git a/build/version.go b/build/version.go index 01c209b1d..3ca9bda56 100644 --- a/build/version.go +++ b/build/version.go @@ -5,7 +5,7 @@ import "fmt" var CurrentCommit string // BuildVersion is the local build version, set by build system -const BuildVersion = "0.2.1" +const BuildVersion = "0.2.2" var UserVersion = BuildVersion + CurrentCommit diff --git a/chain/types/blockheader.go b/chain/types/blockheader.go index c91d029fc..ff4c6467f 100644 --- a/chain/types/blockheader.go +++ b/chain/types/blockheader.go @@ -120,6 +120,10 @@ func (blk *BlockHeader) CheckBlockSignature(ctx context.Context, worker address. _, span := trace.StartSpan(ctx, "checkBlockSignature") defer span.End() + if blk.BlockSig == nil { + return xerrors.New("block signature not present") + } + sigb, err := blk.SigningBytes() if err != nil { return xerrors.Errorf("failed to get block signing bytes: %w", err) diff --git a/chain/types/signature.go b/chain/types/signature.go index f3a61327b..3b0862206 100644 --- a/chain/types/signature.go +++ b/chain/types/signature.go @@ -116,5 +116,8 @@ func (s *Signature) UnmarshalCBOR(br io.Reader) error { } func (s *Signature) Equals(o *Signature) bool { + if s == nil || o == nil { + return s == o + } return s.Type == o.Type && bytes.Equal(s.Data, o.Data) } diff --git a/chain/types/signature_cgo.go b/chain/types/signature_cgo.go index d86e41d72..a7423d946 100644 --- a/chain/types/signature_cgo.go +++ b/chain/types/signature_cgo.go @@ -9,9 +9,14 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-crypto" "github.com/minio/blake2b-simd" + "golang.org/x/xerrors" ) func (s *Signature) Verify(addr address.Address, msg []byte) error { + if s == nil { + return xerrors.Errorf("signature is nil") + } + if addr.Protocol() == address.ID { return fmt.Errorf("must resolve ID addresses before using them to verify a signature") } diff --git a/cmd/lotus-health/main.go b/cmd/lotus-health/main.go new file mode 100644 index 000000000..be5c496b4 --- /dev/null +++ b/cmd/lotus-health/main.go @@ -0,0 +1,211 @@ +package main + +import ( + "context" + "os" + "time" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/build" + lcli "github.com/filecoin-project/lotus/cli" + cid "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "gopkg.in/urfave/cli.v2" +) + +type CidWindow [][]cid.Cid + +var log = logging.Logger("lotus-health") + +func main() { + logging.SetLogLevel("*", "INFO") + + log.Info("Starting health agent") + + local := []*cli.Command{ + watchHeadCmd, + } + + app := &cli.App{ + Name: "lotus-health", + Usage: "Tools for monitoring lotus daemon health", + Version: build.UserVersion, + Commands: local, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "repo", + EnvVars: []string{"LOTUS_PATH"}, + Value: "~/.lotus", // TODO: Consider XDG_DATA_HOME + }, + }, + } + + if err := app.Run(os.Args); err != nil { + log.Warn(err) + return + } +} + +var watchHeadCmd = &cli.Command{ + Name: "watch-head", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "threshold", + Value: 3, + Usage: "number of times head remains unchanged before failing health check", + }, + &cli.IntFlag{ + Name: "interval", + Value: build.BlockDelay, + Usage: "interval in seconds between chain head checks", + }, + &cli.StringFlag{ + Name: "systemd-unit", + Value: "lotus-daemon.service", + Usage: "systemd unit name to restart on health check failure", + }, + }, + Action: func(c *cli.Context) error { + threshold := c.Int("threshold") + interval := time.Duration(c.Int("interval")) * time.Second + name := c.String("systemd-unit") + + var headCheckWindow CidWindow + + api, closer, err := lcli.GetFullNodeAPI(c) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(c) + + if err := WaitForSyncComplete(ctx, api); err != nil { + log.Fatal(err) + } + + ch := make(chan CidWindow, 1) + aCh := make(chan interface{}, 1) + + go func() { + for { + headCheckWindow, err = updateWindow(ctx, api, headCheckWindow, threshold, ch) + if err != nil { + log.Fatal(err) + } + time.Sleep(interval) + } + }() + + go func() { + result, err := alertHandler(name, aCh) + if err != nil { + log.Fatal(err) + } + if result != "done" { + log.Fatal("systemd unit failed to restart:", result) + } + log.Info("restarting health agent") + // Exit health agent and let supervisor restart health agent + // Restarting lotus systemd unit kills api connection + os.Exit(130) + }() + + for { + ok := checkWindow(ch, int(interval)) + if !ok { + log.Warn("chain head has not updated. Restarting systemd service") + aCh <- nil + break + } + log.Info("chain head is healthy") + } + return nil + }, +} + +/* + * reads channel of slices of Cids + * compares slices of Cids when len is greater or equal to `t` - threshold + * if all slices are equal, head has not updated and returns false + */ +func checkWindow(ch chan CidWindow, t int) bool { + select { + case window := <-ch: + var dup int + windowLen := len(window) + if windowLen >= t { + cidWindow: + for i, cids := range window { + next := windowLen - 1 - i + // if array length is different, head is changing + if next >= 1 && len(window[next]) != len(window[next-1]) { + break cidWindow + } + // if cids are different, head is changing + for j := range cids { + if next >= 1 && window[next][j] != window[next-1][j] { + break cidWindow + } + } + if i < (t - 1) { + dup++ + } + } + + if dup == (t - 1) { + return false + } + } + return true + } +} + +/* + * reads channel of slices of slices of Cids + * compares Cids when len of window is greater or equal to `t` - threshold + * if all slices are the equal, head has not updated and returns false + */ +func updateWindow(ctx context.Context, a api.FullNode, w CidWindow, t int, ch chan CidWindow) (CidWindow, error) { + head, err := a.ChainHead(ctx) + if err != nil { + return nil, err + } + + window := appendCIDsToWindow(w, head.Cids(), t) + ch <- window + return window, nil +} + +/* + * appends slice of Cids to window slice + * keeps a fixed window slice size, dropping older slices + * returns new window + */ +func appendCIDsToWindow(w CidWindow, c []cid.Cid, t int) CidWindow { + offset := len(w) - t + 1 + if offset >= 0 { + return append(w[offset:], c) + } + return append(w, c) +} + +/* + * wait for node to sync + */ +func WaitForSyncComplete(ctx context.Context, napi api.FullNode) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(3 * time.Second): + head, err := napi.ChainHead(ctx) + if err != nil { + return err + } + + if time.Now().Unix()-int64(head.MinTimestamp()) < build.BlockDelay { + return nil + } + } + } +} diff --git a/cmd/lotus-health/main_test.go b/cmd/lotus-health/main_test.go new file mode 100644 index 000000000..a31115cba --- /dev/null +++ b/cmd/lotus-health/main_test.go @@ -0,0 +1,84 @@ +package main + +import ( + "testing" + + cid "github.com/ipfs/go-cid" + mh "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/assert" +) + +func TestAppendCIDsToWindow(t *testing.T) { + assert := assert.New(t) + var window CidWindow + threshold := 3 + cid0 := makeCID("0") + cid1 := makeCID("1") + cid2 := makeCID("2") + cid3 := makeCID("3") + window = appendCIDsToWindow(window, []cid.Cid{cid0}, threshold) + window = appendCIDsToWindow(window, []cid.Cid{cid1}, threshold) + window = appendCIDsToWindow(window, []cid.Cid{cid2}, threshold) + window = appendCIDsToWindow(window, []cid.Cid{cid3}, threshold) + assert.Len(window, 3) + assert.Equal(window[0][0], cid1) + assert.Equal(window[1][0], cid2) + assert.Equal(window[2][0], cid3) +} + +func TestCheckWindow(t *testing.T) { + assert := assert.New(t) + ch := make(chan CidWindow, 1) + och := make(chan bool, 1) + threshold := 3 + go func() { + och <- checkWindow(ch, threshold) + }() + var healthyHeadCheckWindow CidWindow + healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{ + makeCID("abcd"), + }, threshold) + healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{ + makeCID("bbcd"), + makeCID("bbfe"), + }, threshold) + healthyHeadCheckWindow = appendCIDsToWindow(healthyHeadCheckWindow, []cid.Cid{ + makeCID("bbcd"), + makeCID("bbfe"), + }, threshold) + ch <- healthyHeadCheckWindow + select { + case ok := <-och: + assert.True(ok) + } + go func() { + och <- checkWindow(ch, threshold) + }() + var unhealthyHeadCheckWindow CidWindow + unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{ + makeCID("abcd"), + makeCID("fbcd"), + }, threshold) + unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{ + makeCID("abcd"), + makeCID("fbcd"), + }, threshold) + unhealthyHeadCheckWindow = appendCIDsToWindow(unhealthyHeadCheckWindow, []cid.Cid{ + makeCID("abcd"), + makeCID("fbcd"), + }, threshold) + ch <- unhealthyHeadCheckWindow + select { + case ok := <-och: + assert.False(ok) + } + +} + +func makeCID(s string) cid.Cid { + h1, err := mh.Sum([]byte(s), mh.SHA2_256, -1) + if err != nil { + log.Fatal(err) + } + return cid.NewCidV1(0x55, h1) +} diff --git a/cmd/lotus-health/systemd.go b/cmd/lotus-health/systemd.go new file mode 100644 index 000000000..0287204bc --- /dev/null +++ b/cmd/lotus-health/systemd.go @@ -0,0 +1,24 @@ +package main + +import ( + "github.com/coreos/go-systemd/dbus" +) + +func alertHandler(n string, ch chan interface{}) (string, error) { + select { + case <-ch: + statusCh := make(chan string, 1) + c, err := dbus.New() + if err != nil { + return "", err + } + _, err = c.TryRestartUnit(n, "fail", statusCh) + if err != nil { + return "", err + } + select { + case result := <-statusCh: + return result, nil + } + } +} diff --git a/go.mod b/go.mod index 84efee20d..1daa92ac8 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/GeertJohan/go.rice v1.0.0 github.com/Gurpartap/async v0.0.0-20180927173644-4f7f499dd9ee github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect + github.com/coreos/go-systemd v0.0.0-00010101000000-000000000000 github.com/docker/go-units v0.4.0 github.com/filecoin-project/chain-validation v0.0.3 github.com/filecoin-project/filecoin-ffi v0.0.0-20191219131535-bb699517a590 @@ -109,3 +110,5 @@ require ( replace github.com/golangci/golangci-lint => github.com/golangci/golangci-lint v1.18.0 replace github.com/filecoin-project/filecoin-ffi => ./extern/filecoin-ffi + +replace github.com/coreos/go-systemd => github.com/coreos/go-systemd/v22 v22.0.0 diff --git a/go.sum b/go.sum index 29f05f2c1..bd9ae759f 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/coreos/go-semver v0.2.1-0.20180108230905-e214231b295a/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd/v22 v22.0.0 h1:XJIw/+VlJ+87J+doOxznsAWIdmWuViOVhkQamW5YV28= +github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= @@ -148,6 +150,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/godbus/dbus/v5 v5.0.3 h1:ZqHaoEF7TBzh4jzPmqVhE/5A1z9of6orkAe5uHoAeME= +github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= diff --git a/miner/miner.go b/miner/miner.go index d9d4880f3..16e2a1cc1 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -403,7 +403,8 @@ func selectMessages(ctx context.Context, al actorLookup, base *MiningBase, msgs if _, ok := inclNonces[from]; !ok { act, err := al(ctx, from, base.ts) if err != nil { - return nil, xerrors.Errorf("failed to check message sender balance: %w", err) + log.Warnf("failed to check message sender balance, skipping message: %+v", err) + continue } inclNonces[from] = act.Nonce diff --git a/node/node_test.go b/node/node_test.go index ea92a92b6..078f0ba14 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -393,7 +393,7 @@ func TestAPIRPC(t *testing.T) { } func TestAPIDealFlow(t *testing.T) { - test.TestDealFlow(t, mockSbBuilder, 10 * time.Millisecond) + test.TestDealFlow(t, mockSbBuilder, 10*time.Millisecond) } func TestAPIDealFlowReal(t *testing.T) {