From c8fcab5d22ae832488aeaf50f9f034aaee946ce3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 18 Mar 2021 23:12:57 +0100 Subject: [PATCH 1/4] shed: Command to list duplicate messages in tipsets --- cmd/lotus-shed/balances.go | 133 +++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index bce7be3d5..76ca9d6ae 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -8,6 +8,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/filecoin-project/lotus/build" @@ -70,6 +71,138 @@ var auditsCmd = &cli.Command{ chainBalanceStateCmd, chainPledgeCmd, fillBalancesCmd, + duplicatedMessagesCmd, + }, +} + +var duplicatedMessagesCmd = &cli.Command{ + Name: "duplicate-messages", + Flags: []cli.Flag{ + &cli.IntFlag{ + Name: "count", + Required: true, + }, + }, + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + head, err := api.ChainHead(ctx) + if err != nil { + return err + } + + var printLk sync.Mutex + + threads := 64 + mcount := 0 + + throttle := make(chan struct{}, threads) + + for i := 0; i < cctx.Int("count"); i++ { + select { + case throttle <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + go func(ts *types.TipSet) { + defer func() { + <-throttle + }() + + type addrNonce struct { + s address.Address + n uint64 + } + anonce := func(m *types.Message) addrNonce { + return addrNonce{ + s: m.From, + n: m.Nonce, + } + } + + type mc struct { + m abi.MethodNum + c cid.Cid + } + + msgs := map[addrNonce]mc{} + + for _, bh := range ts.Blocks() { + bms, err := api.ChainGetBlockMessages(ctx, bh.Cid()) + if err != nil { + fmt.Println("ERROR: ", err) + return + } + + for _, m := range bms.SecpkMessages { + c, found := msgs[anonce(&m.Message)] + if found { + if c.c == m.Cid() { + continue + } + printLk.Lock() + fmt.Printf("DUPE: M:%d %s / %s ; val: %s\tto: %s\n", c.m, c.c, m.Message.Cid(), types.FIL(m.Message.Value), m.Message.To) + printLk.Unlock() + } + msgs[anonce(&m.Message)] = mc{ + m: m.Message.Method, + c: m.Cid(), + } + } + + for _, m := range bms.BlsMessages { + c, found := msgs[anonce(m)] + if found { + if c.c == m.Cid() { + continue + } + printLk.Lock() + fmt.Printf("DUPE: M:%d %s / %s ; val: %s\tto: %s\n", c.m, c.c, m.Cid(), types.FIL(m.Value), m.To) + printLk.Unlock() + } + msgs[anonce(m)] = mc{ + m: m.Method, + c: m.Cid(), + } + } + + mcount += len(bms.SecpkMessages) + len(bms.BlsMessages) + } + }(head) + head, err = api.ChainGetTipSet(ctx, head.Parents()) + if err != nil { + return err + } + + if head.Height() % 20 == 0 { + printLk.Lock() + //fmt.Printf("H:%d; Ms: %d\n", head.Height(), mcount) + printLk.Unlock() + } + } + + for i := 0; i < threads; i++ { + select { + case throttle <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + if head.Height() % 20 == 0 { + printLk.Lock() + //fmt.Printf("finH:%d; Ms: %d\n", head.Height(), mcount) + printLk.Unlock() + } + } + + return nil }, } From 77eefcd6d8905e0cd0c2386c650c4d94582e6d80 Mon Sep 17 00:00:00 2001 From: lotus Date: Fri, 19 Mar 2021 03:30:22 +0000 Subject: [PATCH 2/4] feat(lotus-shed): improve duplicate-messages audit command - Allow a start/end epoch. - Print one line per duplicate set. - Allow filtering duplicate messages by method number. - Make the number of threads configurable. --- cmd/lotus-shed/balances.go | 160 ++++++++++++++++++++++++++----------- 1 file changed, 114 insertions(+), 46 deletions(-) diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 76ca9d6ae..248257c7b 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -3,9 +3,11 @@ package main import ( "context" "encoding/csv" + "encoding/json" "fmt" "io" "os" + "runtime" "strconv" "strings" "sync" @@ -76,11 +78,37 @@ var auditsCmd = &cli.Command{ } var duplicatedMessagesCmd = &cli.Command{ - Name: "duplicate-messages", + Name: "duplicate-messages", + Usage: "Check for duplicate messages included in a tipset.", + UsageText: `Check for duplicate messages included in a tipset. + +Due to Filecoin's expected consensus, a tipset may include the same message multiple times in +different blocks. The message will only be executed once. + +This command will find such duplicate messages and print them to standard out as newline-delimited +JSON. Status messages in the form of "H: $HEIGHT ($PROGRESS%)" will be printed to standard error for +every day of chain processed. +`, Flags: []cli.Flag{ &cli.IntFlag{ - Name: "count", - Required: true, + Name: "parallel", + Usage: "the number of parallel threads for block processing", + DefaultText: "half the number of cores", + }, + &cli.IntFlag{ + Name: "start", + Usage: "the first epoch to check", + DefaultText: "genesis", + }, + &cli.IntFlag{ + Name: "end", + Usage: "the last epoch to check", + DefaultText: "the current head", + }, + &cli.IntSliceFlag{ + Name: "method", + Usage: "Filter results by method number.", + DefaultText: "all methods", }, }, Action: func(cctx *cli.Context) error { @@ -92,19 +120,43 @@ var duplicatedMessagesCmd = &cli.Command{ defer closer() ctx := lcli.ReqContext(cctx) - head, err := api.ChainHead(ctx) + var head *types.TipSet + if cctx.IsSet("end") { + epoch := abi.ChainEpoch(cctx.Int("end")) + head, err = api.ChainGetTipSetByHeight(ctx, epoch, types.EmptyTSK) + } else { + head, err = api.ChainHead(ctx) + } if err != nil { return err } var printLk sync.Mutex - threads := 64 - mcount := 0 + threads := runtime.NumCPU() / 2 + if cctx.IsSet("parallel") { + threads = cctx.Int("int") + if threads <= 0 { + return fmt.Errorf("parallelism needs to be at least 1") + } + } else if threads == 0 { + threads = 1 // if we have one core, but who are we kidding... + } throttle := make(chan struct{}, threads) - for i := 0; i < cctx.Int("count"); i++ { + methods := make(map[abi.MethodNum]bool) + for _, m := range cctx.IntSlice("method") { + if m < 0 { + return fmt.Errorf("expected method numbers to be non-negative") + } + methods[abi.MethodNum(m)] = true + } + + target := abi.ChainEpoch(cctx.Int("start")) + totalEpochs := head.Height() - target + + for target <= head.Height() { select { case throttle <- struct{}{}: case <-ctx.Done(): @@ -127,63 +179,80 @@ var duplicatedMessagesCmd = &cli.Command{ } } - type mc struct { - m abi.MethodNum - c cid.Cid - } + msgs := map[addrNonce]map[cid.Cid]*types.Message{} - msgs := map[addrNonce]mc{} + encoder := json.NewEncoder(os.Stdout) for _, bh := range ts.Blocks() { bms, err := api.ChainGetBlockMessages(ctx, bh.Cid()) if err != nil { - fmt.Println("ERROR: ", err) + fmt.Fprintln(os.Stderr, "ERROR: ", err) return } - for _, m := range bms.SecpkMessages { - c, found := msgs[anonce(&m.Message)] - if found { - if c.c == m.Cid() { - continue - } - printLk.Lock() - fmt.Printf("DUPE: M:%d %s / %s ; val: %s\tto: %s\n", c.m, c.c, m.Message.Cid(), types.FIL(m.Message.Value), m.Message.To) - printLk.Unlock() + for i, m := range bms.BlsMessages { + if len(methods) > 0 && !methods[m.Method] { + continue } - msgs[anonce(&m.Message)] = mc{ - m: m.Message.Method, - c: m.Cid(), + c, ok := msgs[anonce(m)] + if !ok { + c = make(map[cid.Cid]*types.Message, 1) + msgs[anonce(m)] = c } + c[bms.Cids[i]] = m } - for _, m := range bms.BlsMessages { - c, found := msgs[anonce(m)] - if found { - if c.c == m.Cid() { - continue - } - printLk.Lock() - fmt.Printf("DUPE: M:%d %s / %s ; val: %s\tto: %s\n", c.m, c.c, m.Cid(), types.FIL(m.Value), m.To) - printLk.Unlock() + for i, m := range bms.SecpkMessages { + if len(methods) > 0 && !methods[m.Message.Method] { + continue } - msgs[anonce(m)] = mc{ - m: m.Method, - c: m.Cid(), + c, ok := msgs[anonce(&m.Message)] + if !ok { + c = make(map[cid.Cid]*types.Message, 1) + msgs[anonce(&m.Message)] = c } + c[bms.Cids[len(bms.BlsMessages)+i]] = &m.Message } - - mcount += len(bms.SecpkMessages) + len(bms.BlsMessages) + } + for _, ms := range msgs { + if len(ms) == 1 { + continue + } + type Msg struct { + Cid string + Value string + Method uint64 + } + grouped := map[string][]Msg{} + for c, m := range ms { + addr := m.To.String() + grouped[addr] = append(grouped[addr], Msg{ + Cid: c.String(), + Value: types.FIL(m.Value).String(), + Method: uint64(m.Method), + }) + } + printLk.Lock() + err := encoder.Encode(grouped) + if err != nil { + fmt.Fprintln(os.Stderr, "ERROR: ", err) + } + printLk.Unlock() } }(head) + + if head.Parents().IsEmpty() { + break + } + head, err = api.ChainGetTipSet(ctx, head.Parents()) if err != nil { return err } - if head.Height() % 20 == 0 { + if head.Height()%2880 == 0 { printLk.Lock() - //fmt.Printf("H:%d; Ms: %d\n", head.Height(), mcount) + fmt.Fprintf(os.Stderr, "H: %s (%d%%)\n", head.Height(), (100*(head.Height()-target))/totalEpochs) printLk.Unlock() } } @@ -195,13 +264,12 @@ var duplicatedMessagesCmd = &cli.Command{ return ctx.Err() } - if head.Height() % 20 == 0 { - printLk.Lock() - //fmt.Printf("finH:%d; Ms: %d\n", head.Height(), mcount) - printLk.Unlock() - } } + printLk.Lock() + fmt.Fprintf(os.Stderr, "H: %s (100%%)\n", head.Height()) + printLk.Unlock() + return nil }, } From 701682c98ad12ef90f82c35c7fe76c50e829cbeb Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 28 Apr 2021 17:41:29 -0700 Subject: [PATCH 3/4] feat(lotus-shed): make it possible to filter by to/from when checking dups --- cmd/lotus-shed/balances.go | 99 ++++++++++++++++++++++++++++++-------- 1 file changed, 79 insertions(+), 20 deletions(-) diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 248257c7b..669e8590d 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -107,9 +107,27 @@ every day of chain processed. }, &cli.IntSliceFlag{ Name: "method", - Usage: "Filter results by method number.", + Usage: "filter results by method number", DefaultText: "all methods", }, + &cli.StringSliceFlag{ + Name: "include-to", + Usage: "include only messages to the given address (does not perform address resolution)", + DefaultText: "all recipients", + }, + &cli.StringSliceFlag{ + Name: "include-from", + Usage: "include only messages from the given address (does not perform address resolution)", + DefaultText: "all senders", + }, + &cli.StringSliceFlag{ + Name: "exclude-to", + Usage: "exclude messages to the given address (does not perform address resolution)", + }, + &cli.StringSliceFlag{ + Name: "exclude-from", + Usage: "exclude messages from the given address (does not perform address resolution)", + }, }, Action: func(cctx *cli.Context) error { api, closer, err := lcli.GetFullNodeAPI(cctx) @@ -145,7 +163,7 @@ every day of chain processed. throttle := make(chan struct{}, threads) - methods := make(map[abi.MethodNum]bool) + methods := map[abi.MethodNum]bool{} for _, m := range cctx.IntSlice("method") { if m < 0 { return fmt.Errorf("expected method numbers to be non-negative") @@ -153,6 +171,39 @@ every day of chain processed. methods[abi.MethodNum(m)] = true } + addressSet := func(flag string) (map[address.Address]bool, error) { + if !cctx.IsSet(flag) { + return nil, nil + } + addrs := cctx.StringSlice(flag) + set := make(map[address.Address]bool, len(addrs)) + for _, addrStr := range addrs { + addr, err := address.NewFromString(addrStr) + if err != nil { + return nil, fmt.Errorf("failed to parse address %s: %w", addrStr, err) + } + set[addr] = true + } + return set, nil + } + + onlyFrom, err := addressSet("include-from") + if err != nil { + return err + } + onlyTo, err := addressSet("include-to") + if err != nil { + return err + } + excludeFrom, err := addressSet("exclude-from") + if err != nil { + return err + } + excludeTo, err := addressSet("exclude-to") + if err != nil { + return err + } + target := abi.ChainEpoch(cctx.Int("start")) totalEpochs := head.Height() - target @@ -181,6 +232,30 @@ every day of chain processed. msgs := map[addrNonce]map[cid.Cid]*types.Message{} + processMessage := func(c cid.Cid, m *types.Message) { + // Filter + if len(methods) > 0 && !methods[m.Method] { + return + } + if len(onlyFrom) > 0 && !onlyFrom[m.From] { + return + } + if len(onlyTo) > 0 && !onlyTo[m.To] { + return + } + if excludeFrom[m.From] || excludeTo[m.To] { + return + } + + // Record + msgSet, ok := msgs[anonce(m)] + if !ok { + msgSet = make(map[cid.Cid]*types.Message, 1) + msgs[anonce(m)] = msgSet + } + msgSet[c] = m + } + encoder := json.NewEncoder(os.Stdout) for _, bh := range ts.Blocks() { @@ -191,27 +266,11 @@ every day of chain processed. } for i, m := range bms.BlsMessages { - if len(methods) > 0 && !methods[m.Method] { - continue - } - c, ok := msgs[anonce(m)] - if !ok { - c = make(map[cid.Cid]*types.Message, 1) - msgs[anonce(m)] = c - } - c[bms.Cids[i]] = m + processMessage(bms.Cids[i], m) } for i, m := range bms.SecpkMessages { - if len(methods) > 0 && !methods[m.Message.Method] { - continue - } - c, ok := msgs[anonce(&m.Message)] - if !ok { - c = make(map[cid.Cid]*types.Message, 1) - msgs[anonce(&m.Message)] = c - } - c[bms.Cids[len(bms.BlsMessages)+i]] = &m.Message + processMessage(bms.Cids[len(bms.BlsMessages)+i], &m.Message) } } for _, ms := range msgs { From bcfad6b2bbba7b345d671e18e8c8016f89d18a9a Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Wed, 28 Apr 2021 17:51:25 -0700 Subject: [PATCH 4/4] fix(lotus-shed): sanity check start height for dup check --- cmd/lotus-shed/balances.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/lotus-shed/balances.go b/cmd/lotus-shed/balances.go index 669e8590d..87530c666 100644 --- a/cmd/lotus-shed/balances.go +++ b/cmd/lotus-shed/balances.go @@ -205,6 +205,9 @@ every day of chain processed. } target := abi.ChainEpoch(cctx.Int("start")) + if target < 0 || target > head.Height() { + return fmt.Errorf("start height must be greater than 0 and less than the end height") + } totalEpochs := head.Height() - target for target <= head.Height() {