feat(lotus-shed): improve duplicate-messages audit command

- Allow a start/end epoch.
- Print one line per duplicate set.
- Allow filtering duplicate messages by method number.
- Make the number of threads configurable.
This commit is contained in:
lotus 2021-03-19 03:30:22 +00:00 committed by Steven Allen
parent c8fcab5d22
commit 77eefcd6d8

View File

@ -3,9 +3,11 @@ package main
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"os"
"runtime"
"strconv"
"strings"
"sync"
@ -76,11 +78,37 @@ var auditsCmd = &cli.Command{
}
var duplicatedMessagesCmd = &cli.Command{
Name: "duplicate-messages",
Name: "duplicate-messages",
Usage: "Check for duplicate messages included in a tipset.",
UsageText: `Check for duplicate messages included in a tipset.
Due to Filecoin's expected consensus, a tipset may include the same message multiple times in
different blocks. The message will only be executed once.
This command will find such duplicate messages and print them to standard out as newline-delimited
JSON. Status messages in the form of "H: $HEIGHT ($PROGRESS%)" will be printed to standard error for
every day of chain processed.
`,
Flags: []cli.Flag{
&cli.IntFlag{
Name: "count",
Required: true,
Name: "parallel",
Usage: "the number of parallel threads for block processing",
DefaultText: "half the number of cores",
},
&cli.IntFlag{
Name: "start",
Usage: "the first epoch to check",
DefaultText: "genesis",
},
&cli.IntFlag{
Name: "end",
Usage: "the last epoch to check",
DefaultText: "the current head",
},
&cli.IntSliceFlag{
Name: "method",
Usage: "Filter results by method number.",
DefaultText: "all methods",
},
},
Action: func(cctx *cli.Context) error {
@ -92,19 +120,43 @@ var duplicatedMessagesCmd = &cli.Command{
defer closer()
ctx := lcli.ReqContext(cctx)
head, err := api.ChainHead(ctx)
var head *types.TipSet
if cctx.IsSet("end") {
epoch := abi.ChainEpoch(cctx.Int("end"))
head, err = api.ChainGetTipSetByHeight(ctx, epoch, types.EmptyTSK)
} else {
head, err = api.ChainHead(ctx)
}
if err != nil {
return err
}
var printLk sync.Mutex
threads := 64
mcount := 0
threads := runtime.NumCPU() / 2
if cctx.IsSet("parallel") {
threads = cctx.Int("int")
if threads <= 0 {
return fmt.Errorf("parallelism needs to be at least 1")
}
} else if threads == 0 {
threads = 1 // if we have one core, but who are we kidding...
}
throttle := make(chan struct{}, threads)
for i := 0; i < cctx.Int("count"); i++ {
methods := make(map[abi.MethodNum]bool)
for _, m := range cctx.IntSlice("method") {
if m < 0 {
return fmt.Errorf("expected method numbers to be non-negative")
}
methods[abi.MethodNum(m)] = true
}
target := abi.ChainEpoch(cctx.Int("start"))
totalEpochs := head.Height() - target
for target <= head.Height() {
select {
case throttle <- struct{}{}:
case <-ctx.Done():
@ -127,63 +179,80 @@ var duplicatedMessagesCmd = &cli.Command{
}
}
type mc struct {
m abi.MethodNum
c cid.Cid
}
msgs := map[addrNonce]map[cid.Cid]*types.Message{}
msgs := map[addrNonce]mc{}
encoder := json.NewEncoder(os.Stdout)
for _, bh := range ts.Blocks() {
bms, err := api.ChainGetBlockMessages(ctx, bh.Cid())
if err != nil {
fmt.Println("ERROR: ", err)
fmt.Fprintln(os.Stderr, "ERROR: ", err)
return
}
for _, m := range bms.SecpkMessages {
c, found := msgs[anonce(&m.Message)]
if found {
if c.c == m.Cid() {
continue
}
printLk.Lock()
fmt.Printf("DUPE: M:%d %s / %s ; val: %s\tto: %s\n", c.m, c.c, m.Message.Cid(), types.FIL(m.Message.Value), m.Message.To)
printLk.Unlock()
for i, m := range bms.BlsMessages {
if len(methods) > 0 && !methods[m.Method] {
continue
}
msgs[anonce(&m.Message)] = mc{
m: m.Message.Method,
c: m.Cid(),
c, ok := msgs[anonce(m)]
if !ok {
c = make(map[cid.Cid]*types.Message, 1)
msgs[anonce(m)] = c
}
c[bms.Cids[i]] = m
}
for _, m := range bms.BlsMessages {
c, found := msgs[anonce(m)]
if found {
if c.c == m.Cid() {
continue
}
printLk.Lock()
fmt.Printf("DUPE: M:%d %s / %s ; val: %s\tto: %s\n", c.m, c.c, m.Cid(), types.FIL(m.Value), m.To)
printLk.Unlock()
for i, m := range bms.SecpkMessages {
if len(methods) > 0 && !methods[m.Message.Method] {
continue
}
msgs[anonce(m)] = mc{
m: m.Method,
c: m.Cid(),
c, ok := msgs[anonce(&m.Message)]
if !ok {
c = make(map[cid.Cid]*types.Message, 1)
msgs[anonce(&m.Message)] = c
}
c[bms.Cids[len(bms.BlsMessages)+i]] = &m.Message
}
mcount += len(bms.SecpkMessages) + len(bms.BlsMessages)
}
for _, ms := range msgs {
if len(ms) == 1 {
continue
}
type Msg struct {
Cid string
Value string
Method uint64
}
grouped := map[string][]Msg{}
for c, m := range ms {
addr := m.To.String()
grouped[addr] = append(grouped[addr], Msg{
Cid: c.String(),
Value: types.FIL(m.Value).String(),
Method: uint64(m.Method),
})
}
printLk.Lock()
err := encoder.Encode(grouped)
if err != nil {
fmt.Fprintln(os.Stderr, "ERROR: ", err)
}
printLk.Unlock()
}
}(head)
if head.Parents().IsEmpty() {
break
}
head, err = api.ChainGetTipSet(ctx, head.Parents())
if err != nil {
return err
}
if head.Height() % 20 == 0 {
if head.Height()%2880 == 0 {
printLk.Lock()
//fmt.Printf("H:%d; Ms: %d\n", head.Height(), mcount)
fmt.Fprintf(os.Stderr, "H: %s (%d%%)\n", head.Height(), (100*(head.Height()-target))/totalEpochs)
printLk.Unlock()
}
}
@ -195,13 +264,12 @@ var duplicatedMessagesCmd = &cli.Command{
return ctx.Err()
}
if head.Height() % 20 == 0 {
printLk.Lock()
//fmt.Printf("finH:%d; Ms: %d\n", head.Height(), mcount)
printLk.Unlock()
}
}
printLk.Lock()
fmt.Fprintf(os.Stderr, "H: %s (100%%)\n", head.Height())
printLk.Unlock()
return nil
},
}