package cli import ( "encoding/json" "fmt" "sort" "github.com/urfave/cli/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/lotus/chain/types" ) var mpoolCmd = &cli.Command{ Name: "mpool", Usage: "Manage message pool", Subcommands: []*cli.Command{ mpoolPending, mpoolSub, mpoolStat, }, } var mpoolPending = &cli.Command{ Name: "pending", Usage: "Get pending messages", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "local", Usage: "print pending messages for addresses in local wallet only", }, }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } defer closer() ctx := ReqContext(cctx) var filter map[address.Address]struct{} if cctx.Bool("local") { filter = map[address.Address]struct{}{} addrss, err := api.WalletList(ctx) if err != nil { return xerrors.Errorf("getting local addresses: %w", err) } for _, a := range addrss { filter[a] = struct{}{} } } msgs, err := api.MpoolPending(ctx, types.EmptyTSK) if err != nil { return err } for _, msg := range msgs { if filter != nil { if _, has := filter[msg.Message.From]; !has { continue } } out, err := json.MarshalIndent(msg, "", " ") if err != nil { return err } fmt.Println(string(out)) } return nil }, } var mpoolSub = &cli.Command{ Name: "sub", Usage: "Subscribe to mpool changes", Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } defer closer() ctx := ReqContext(cctx) sub, err := api.MpoolSub(ctx) if err != nil { return err } for { select { case update := <-sub: out, err := json.MarshalIndent(update, "", " ") if err != nil { return err } fmt.Println(string(out)) case <-ctx.Done(): return nil } } }, } type statBucket struct { msgs map[uint64]*types.SignedMessage } type mpStat struct { addr string past, cur, future uint64 } var mpoolStat = &cli.Command{ Name: "stat", Usage: "print mempool stats", Flags: []cli.Flag{ &cli.BoolFlag{ Name: "local", Usage: "print stats for addresses in local wallet only", }, }, Action: func(cctx *cli.Context) error { api, closer, err := GetFullNodeAPI(cctx) if err != nil { return err } defer closer() ctx := ReqContext(cctx) ts, err := api.ChainHead(ctx) if err != nil { return xerrors.Errorf("getting chain head: %w", err) } var filter map[address.Address]struct{} if cctx.Bool("local") { filter = map[address.Address]struct{}{} addrss, err := api.WalletList(ctx) if err != nil { return xerrors.Errorf("getting local addresses: %w", err) } for _, a := range addrss { filter[a] = struct{}{} } } msgs, err := api.MpoolPending(ctx, types.EmptyTSK) if err != nil { return err } buckets := map[address.Address]*statBucket{} for _, v := range msgs { if filter != nil { if _, has := filter[v.Message.From]; !has { continue } } bkt, ok := buckets[v.Message.From] if !ok { bkt = &statBucket{ msgs: map[uint64]*types.SignedMessage{}, } buckets[v.Message.From] = bkt } bkt.msgs[v.Message.Nonce] = v } var out []mpStat for a, bkt := range buckets { act, err := api.StateGetActor(ctx, a, ts.Key()) if err != nil { fmt.Printf("%s, err: %s\n", a, err) continue } cur := act.Nonce for { _, ok := bkt.msgs[cur] if !ok { break } cur++ } past := uint64(0) future := uint64(0) for _, m := range bkt.msgs { if m.Message.Nonce < act.Nonce { past++ } if m.Message.Nonce > cur { future++ } } out = append(out, mpStat{ addr: a.String(), past: past, cur: cur - act.Nonce, future: future, }) } sort.Slice(out, func(i, j int) bool { return out[i].addr < out[j].addr }) for _, stat := range out { fmt.Printf("%s, past: %d, cur: %d, future: %d\n", stat.addr, stat.past, stat.cur, stat.future) } return nil }, }