From 02db37bf373fdb55f025a6a5b492a30b17316afb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 20 Mar 2023 10:37:48 +0100 Subject: [PATCH 1/2] feat: shed: incoming block-sub chainwatch tool --- cmd/lotus-shed/chainwatch.go | 377 +++++++++++++++++++++++++++++++++++ cmd/lotus-shed/main.go | 1 + 2 files changed, 378 insertions(+) create mode 100644 cmd/lotus-shed/chainwatch.go diff --git a/cmd/lotus-shed/chainwatch.go b/cmd/lotus-shed/chainwatch.go new file mode 100644 index 000000000..9f6897027 --- /dev/null +++ b/cmd/lotus-shed/chainwatch.go @@ -0,0 +1,377 @@ +package main + +import ( + "container/list" + "context" + "database/sql" + "fmt" + "hash/crc32" + "strconv" + "sync" + "time" + + "github.com/ipfs/go-cid" + "github.com/urfave/cli/v2" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/store" + "github.com/filecoin-project/lotus/chain/types" + cliutil "github.com/filecoin-project/lotus/cli/util" +) + +var chainwatchCmd = &cli.Command{ + Name: "chainwatch", + Usage: "lotus chainwatch", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "db", + EnvVars: []string{"CHAINWATCH_DB"}, + Value: "./chainwatch.db", + }, + }, + Subcommands: []*cli.Command{ + chainwatchRunCmd, + chainwatchDotCmd, + }, +} + +var chainwatchDotCmd = &cli.Command{ + Name: "dot", + Usage: "generate dot graphs", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + st, err := cwOpenStorage(cctx.String("db")) + if err != nil { + return err + } + + minH, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32) + tosee, err := strconv.ParseInt(cctx.Args().Get(1), 10, 32) + maxH := minH + tosee + + res, err := st.db.Query(`select block, parent, b.miner, b.height, p.height from block_parents + inner join blocks b on block_parents.block = b.cid + inner join blocks p on block_parents.parent = p.cid +where b.height > ? and b.height < ?`, minH, maxH) + + if err != nil { + return err + } + + fmt.Println("digraph D {") + + for res.Next() { + var block, parent, miner string + var height, ph uint64 + if err := res.Scan(&block, &parent, &miner, &height, &ph); err != nil { + return err + } + + bc, err := cid.Parse(block) + if err != nil { + return err + } + + has := st.hasBlock(bc) + + col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0xc0c0c0c0 + 0x30303030 + + hasstr := "" + if !has { + //col = 0xffffffff + hasstr = " UNSYNCED" + } + + nulls := height - ph - 1 + for i := uint64(0); i < nulls; i++ { + name := block + "NP" + fmt.Sprint(i) + + fmt.Printf("%s [label = \"NULL:%d\", fillcolor = \"#ffddff\", style=filled, forcelabels=true]\n%s -> %s\n", + name, height-nulls+i, name, parent) + + parent = name + } + + fmt.Printf("%s [label = \"%s:%d%s\", fillcolor = \"#%06x\", style=filled, forcelabels=true]\n%s -> %s\n", block, miner, height, hasstr, col, block, parent) + } + if res.Err() != nil { + return res.Err() + } + + fmt.Println("}") + + return nil + }, +} + +var chainwatchRunCmd = &cli.Command{ + Name: "run", + Usage: "Start lotus chainwatch", + + Action: func(cctx *cli.Context) error { + api, closer, err := cliutil.GetFullNodeAPIV1(cctx) + if err != nil { + return err + } + defer closer() + ctx := cliutil.ReqContext(cctx) + + v, err := api.Version(ctx) + if err != nil { + return err + } + + log.Infof("Remote version: %s", v.Version) + + st, err := cwOpenStorage(cctx.String("db")) // todo flag + if err != nil { + return err + } + defer st.close() + + cwRunSyncer(ctx, api, st) + go cwSubBlocks(ctx, api, st) + + <-ctx.Done() + return nil + }, +} + +func cwSubBlocks(ctx context.Context, api api.FullNode, st *cwStorage) { + sub, err := api.SyncIncomingBlocks(ctx) + if err != nil { + log.Error(err) + return + } + + for bh := range sub { + err := st.storeHeaders(map[cid.Cid]*types.BlockHeader{ + bh.Cid(): bh, + }, false) + if err != nil { + log.Error(err) + } + } +} + +func cwRunSyncer(ctx context.Context, api api.FullNode, st *cwStorage) { + notifs, err := api.ChainNotify(ctx) + if err != nil { + panic(err) + } + go func() { + for notif := range notifs { + for _, change := range notif { + switch change.Type { + case store.HCCurrent: + fallthrough + case store.HCApply: + syncHead(ctx, api, st, change.Val) + case store.HCRevert: + log.Warnf("revert todo") + } + } + } + }() +} + +func syncHead(ctx context.Context, api api.FullNode, st *cwStorage, ts *types.TipSet) { + log.Infof("Getting headers / actors") + + toSync := map[cid.Cid]*types.BlockHeader{} + toVisit := list.New() + + for _, header := range ts.Blocks() { + toVisit.PushBack(header) + } + + for toVisit.Len() > 0 { + bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) + + if _, seen := toSync[bh.Cid()]; seen || st.hasBlock(bh.Cid()) { + continue + } + + toSync[bh.Cid()] = bh + + if len(toSync)%500 == 10 { + log.Infof("todo: (%d) %s", len(toSync), bh.Cid()) + } + + if len(bh.Parents) == 0 { + continue + } + + if bh.Height <= 530000 { + continue + } + + pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) + if err != nil { + log.Error(err) + continue + } + + for _, header := range pts.Blocks() { + toVisit.PushBack(header) + } + } + + log.Infof("Syncing %d blocks", len(toSync)) + + log.Infof("Persisting headers") + if err := st.storeHeaders(toSync, true); err != nil { + log.Error(err) + return + } + + log.Infof("Sync done") +} + +type cwStorage struct { + db *sql.DB + + headerLk sync.Mutex +} + +func cwOpenStorage(dbSource string) (*cwStorage, error) { + db, err := sql.Open("sqlite3", dbSource) + if err != nil { + return nil, err + } + + st := &cwStorage{db: db} + + return st, st.setup() +} + +func (st *cwStorage) setup() error { + tx, err := st.db.Begin() + if err != nil { + return err + } + _, err = tx.Exec(` +create table if not exists blocks +( + cid text not null + constraint blocks_pk + primary key, + parentWeight numeric not null, + parentStateRoot text not null, + height int not null, + miner text not null + constraint blocks_id_address_map_miner_fk + references id_address_map (address), + timestamp int not null, + vrfproof blob +); + +create unique index if not exists block_cid_uindex + on blocks (cid); + +create table if not exists blocks_synced +( + cid text not null + constraint blocks_synced_pk + primary key + constraint blocks_synced_blocks_cid_fk + references blocks, + add_ts int not null +); + +create unique index if not exists blocks_synced_cid_uindex + on blocks_synced (cid); + +create table if not exists block_parents +( + block text not null + constraint block_parents_blocks_cid_fk + references blocks, + parent text not null + constraint block_parents_blocks_cid_fk_2 + references blocks +); + +create unique index if not exists block_parents_block_parent_uindex + on block_parents (block, parent); + +create unique index if not exists blocks_cid_uindex + on blocks (cid); +`) + if err != nil { + return err + } + return tx.Commit() +} + +func (st *cwStorage) hasBlock(bh cid.Cid) bool { + var exitsts bool + err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=?)`, bh.String()).Scan(&exitsts) + if err != nil { + log.Error(err) + return false + } + return exitsts +} + +func (st *cwStorage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { + st.headerLk.Lock() + defer st.headerLk.Unlock() + + tx, err := st.db.Begin() + if err != nil { + return err + } + + stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp", vrfproof) values (?, ?, ?, ?, ?, ?, ?) on conflict do nothing`) + if err != nil { + return err + } + defer stmt.Close() + for _, bh := range bhs { + if _, err := stmt.Exec(bh.Cid().String(), + bh.ParentWeight.String(), + bh.ParentStateRoot.String(), + bh.Height, + bh.Miner.String(), + bh.Timestamp, + bh.Ticket.VRFProof, + ); err != nil { + return err + } + } + + stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values (?, ?) on conflict do nothing`) + if err != nil { + return err + } + defer stmt2.Close() + for _, bh := range bhs { + for _, parent := range bh.Parents { + if _, err := stmt2.Exec(bh.Cid().String(), parent.String()); err != nil { + return err + } + } + } + + if sync { + stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values (?, ?) on conflict do nothing`) + if err != nil { + return err + } + defer stmt.Close() + now := time.Now().Unix() + + for _, bh := range bhs { + if _, err := stmt.Exec(bh.Cid().String(), now); err != nil { + return err + } + } + } + + return tx.Commit() +} + +func (st *cwStorage) close() error { + return st.db.Close() +} diff --git a/cmd/lotus-shed/main.go b/cmd/lotus-shed/main.go index 19072dd71..d984aa6bd 100644 --- a/cmd/lotus-shed/main.go +++ b/cmd/lotus-shed/main.go @@ -26,6 +26,7 @@ func main() { base32Cmd, base16Cmd, bitFieldCmd, + chainwatchCmd, cronWcCmd, frozenMinersCmd, dealLabelCmd, From e2ff9027f95bf2d63705e895073e4fb7856a0bc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Mar 2023 12:33:00 +0100 Subject: [PATCH 2/2] shed chainwatch: Appease the linter --- cmd/lotus-shed/chainwatch.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/cmd/lotus-shed/chainwatch.go b/cmd/lotus-shed/chainwatch.go index 9f6897027..a7435a63f 100644 --- a/cmd/lotus-shed/chainwatch.go +++ b/cmd/lotus-shed/chainwatch.go @@ -46,7 +46,13 @@ var chainwatchDotCmd = &cli.Command{ } minH, err := strconv.ParseInt(cctx.Args().Get(0), 10, 32) + if err != nil { + return err + } tosee, err := strconv.ParseInt(cctx.Args().Get(1), 10, 32) + if err != nil { + return err + } maxH := minH + tosee res, err := st.db.Query(`select block, parent, b.miner, b.height, p.height from block_parents @@ -127,7 +133,7 @@ var chainwatchRunCmd = &cli.Command{ if err != nil { return err } - defer st.close() + defer st.close() // nolint:errcheck cwRunSyncer(ctx, api, st) go cwSubBlocks(ctx, api, st) @@ -327,7 +333,7 @@ func (st *cwStorage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) if err != nil { return err } - defer stmt.Close() + defer stmt.Close() // nolint:errcheck for _, bh := range bhs { if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), @@ -345,7 +351,7 @@ func (st *cwStorage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) if err != nil { return err } - defer stmt2.Close() + defer stmt2.Close() // nolint:errcheck for _, bh := range bhs { for _, parent := range bh.Parents { if _, err := stmt2.Exec(bh.Cid().String(), parent.String()); err != nil { @@ -359,7 +365,7 @@ func (st *cwStorage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) if err != nil { return err } - defer stmt.Close() + defer stmt.Close() // nolint:errcheck now := time.Now().Unix() for _, bh := range bhs {