Merge pull request #5847 from filecoin-project/steb/audit-dups

shed: command to list duplicate messages in tipsets (steb)
This commit is contained in:
Steven Allen 2021-04-28 18:31:01 -07:00 committed by GitHub
commit b23837a7d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -3,11 +3,14 @@ package main
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
"github.com/filecoin-project/lotus/build"
@ -70,6 +73,266 @@ var auditsCmd = &cli.Command{
chainBalanceStateCmd,
chainPledgeCmd,
fillBalancesCmd,
duplicatedMessagesCmd,
},
}
var duplicatedMessagesCmd = &cli.Command{
Name: "duplicate-messages",
Usage: "Check for duplicate messages included in a tipset.",
UsageText: `Check for duplicate messages included in a tipset.
Due to Filecoin's expected consensus, a tipset may include the same message multiple times in
different blocks. The message will only be executed once.
This command will find such duplicate messages and print them to standard out as newline-delimited
JSON. Status messages in the form of "H: $HEIGHT ($PROGRESS%)" will be printed to standard error for
every day of chain processed.
`,
Flags: []cli.Flag{
&cli.IntFlag{
Name: "parallel",
Usage: "the number of parallel threads for block processing",
DefaultText: "half the number of cores",
},
&cli.IntFlag{
Name: "start",
Usage: "the first epoch to check",
DefaultText: "genesis",
},
&cli.IntFlag{
Name: "end",
Usage: "the last epoch to check",
DefaultText: "the current head",
},
&cli.IntSliceFlag{
Name: "method",
Usage: "filter results by method number",
DefaultText: "all methods",
},
&cli.StringSliceFlag{
Name: "include-to",
Usage: "include only messages to the given address (does not perform address resolution)",
DefaultText: "all recipients",
},
&cli.StringSliceFlag{
Name: "include-from",
Usage: "include only messages from the given address (does not perform address resolution)",
DefaultText: "all senders",
},
&cli.StringSliceFlag{
Name: "exclude-to",
Usage: "exclude messages to the given address (does not perform address resolution)",
},
&cli.StringSliceFlag{
Name: "exclude-from",
Usage: "exclude messages from the given address (does not perform address resolution)",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
var head *types.TipSet
if cctx.IsSet("end") {
epoch := abi.ChainEpoch(cctx.Int("end"))
head, err = api.ChainGetTipSetByHeight(ctx, epoch, types.EmptyTSK)
} else {
head, err = api.ChainHead(ctx)
}
if err != nil {
return err
}
var printLk sync.Mutex
threads := runtime.NumCPU() / 2
if cctx.IsSet("parallel") {
threads = cctx.Int("int")
if threads <= 0 {
return fmt.Errorf("parallelism needs to be at least 1")
}
} else if threads == 0 {
threads = 1 // if we have one core, but who are we kidding...
}
throttle := make(chan struct{}, threads)
methods := map[abi.MethodNum]bool{}
for _, m := range cctx.IntSlice("method") {
if m < 0 {
return fmt.Errorf("expected method numbers to be non-negative")
}
methods[abi.MethodNum(m)] = true
}
addressSet := func(flag string) (map[address.Address]bool, error) {
if !cctx.IsSet(flag) {
return nil, nil
}
addrs := cctx.StringSlice(flag)
set := make(map[address.Address]bool, len(addrs))
for _, addrStr := range addrs {
addr, err := address.NewFromString(addrStr)
if err != nil {
return nil, fmt.Errorf("failed to parse address %s: %w", addrStr, err)
}
set[addr] = true
}
return set, nil
}
onlyFrom, err := addressSet("include-from")
if err != nil {
return err
}
onlyTo, err := addressSet("include-to")
if err != nil {
return err
}
excludeFrom, err := addressSet("exclude-from")
if err != nil {
return err
}
excludeTo, err := addressSet("exclude-to")
if err != nil {
return err
}
target := abi.ChainEpoch(cctx.Int("start"))
if target < 0 || target > head.Height() {
return fmt.Errorf("start height must be greater than 0 and less than the end height")
}
totalEpochs := head.Height() - target
for target <= head.Height() {
select {
case throttle <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
go func(ts *types.TipSet) {
defer func() {
<-throttle
}()
type addrNonce struct {
s address.Address
n uint64
}
anonce := func(m *types.Message) addrNonce {
return addrNonce{
s: m.From,
n: m.Nonce,
}
}
msgs := map[addrNonce]map[cid.Cid]*types.Message{}
processMessage := func(c cid.Cid, m *types.Message) {
// Filter
if len(methods) > 0 && !methods[m.Method] {
return
}
if len(onlyFrom) > 0 && !onlyFrom[m.From] {
return
}
if len(onlyTo) > 0 && !onlyTo[m.To] {
return
}
if excludeFrom[m.From] || excludeTo[m.To] {
return
}
// Record
msgSet, ok := msgs[anonce(m)]
if !ok {
msgSet = make(map[cid.Cid]*types.Message, 1)
msgs[anonce(m)] = msgSet
}
msgSet[c] = m
}
encoder := json.NewEncoder(os.Stdout)
for _, bh := range ts.Blocks() {
bms, err := api.ChainGetBlockMessages(ctx, bh.Cid())
if err != nil {
fmt.Fprintln(os.Stderr, "ERROR: ", err)
return
}
for i, m := range bms.BlsMessages {
processMessage(bms.Cids[i], m)
}
for i, m := range bms.SecpkMessages {
processMessage(bms.Cids[len(bms.BlsMessages)+i], &m.Message)
}
}
for _, ms := range msgs {
if len(ms) == 1 {
continue
}
type Msg struct {
Cid string
Value string
Method uint64
}
grouped := map[string][]Msg{}
for c, m := range ms {
addr := m.To.String()
grouped[addr] = append(grouped[addr], Msg{
Cid: c.String(),
Value: types.FIL(m.Value).String(),
Method: uint64(m.Method),
})
}
printLk.Lock()
err := encoder.Encode(grouped)
if err != nil {
fmt.Fprintln(os.Stderr, "ERROR: ", err)
}
printLk.Unlock()
}
}(head)
if head.Parents().IsEmpty() {
break
}
head, err = api.ChainGetTipSet(ctx, head.Parents())
if err != nil {
return err
}
if head.Height()%2880 == 0 {
printLk.Lock()
fmt.Fprintf(os.Stderr, "H: %s (%d%%)\n", head.Height(), (100*(head.Height()-target))/totalEpochs)
printLk.Unlock()
}
}
for i := 0; i < threads; i++ {
select {
case throttle <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
}
printLk.Lock()
fmt.Fprintf(os.Stderr, "H: %s (100%%)\n", head.Height())
printLk.Unlock()
return nil
},
}