From 3edc93f117354e7395eba7cdaf7e60508b3206d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 7 Dec 2019 23:44:45 +0100 Subject: [PATCH 01/15] wip chainwatch ui --- cmd/lotus-chainwatch/site/block.html | 61 +++++++++++++++++++++++++++ cmd/lotus-chainwatch/site/blocks.html | 43 +++++++++++++++++++ cmd/lotus-chainwatch/site/index.html | 5 +++ cmd/lotus-chainwatch/site/key.html | 4 +- cmd/lotus-chainwatch/site/keys.html | 7 ++- cmd/lotus-chainwatch/site/main.css | 4 ++ cmd/lotus-chainwatch/templates.go | 46 +++++++++++++++++++- 7 files changed, 166 insertions(+), 4 deletions(-) create mode 100644 cmd/lotus-chainwatch/site/block.html create mode 100644 cmd/lotus-chainwatch/site/blocks.html diff --git a/cmd/lotus-chainwatch/site/block.html b/cmd/lotus-chainwatch/site/block.html new file mode 100644 index 000000000..b9aa09c32 --- /dev/null +++ b/cmd/lotus-chainwatch/site/block.html @@ -0,0 +1,61 @@ + + + + Lotus ChainWatch + + + +{{$cid := param "cid"}} + +
+
+
+ Lotus ChainWatch - Wallets +
+
+
+
+
Miner: {{index (strings "blocks" "miner" "cid=?" $cid) 0}}
+
Parents:
+
+ {{range strings "block_parents" "parent" "block=?" $cid}} + {{$parent := .}} + {{. | substr 54 62}} + {{end}} +
+
Messages:
+ + {{range strings "block_messages" "message" "block=?" $cid}} + {{$msg := .}} + + + + + + + {{$rec := qstrs `select r.exit, r.gas_used from messages + inner join block_messages bm on messages.cid = bm.message + inner join blocks b on bm.block = b.cid + inner join block_parents bp on b.cid = bp.parent + inner join blocks chd on bp.block = chd.cid + inner join receipts r on messages.cid = r.msg and chd.parentStateRoot = r.state + where messages.cid=? and b.cid=?` 2 $msg $cid}} + + + + {{end}} +
{{$msg | substr 54 62}} + {{$from := qstr "select \"from\" from messages where cid=?" $msg}} + {{$nonce := qstr "select nonce from messages where cid=?" $msg}} + {{$from}} (N:{{$nonce}}) + -> + {{$to := qstr "select \"to\" from messages where cid=?" $msg}} + {{$to}} + + Method:{{qstr "select method from messages where cid=?" $msg}} + exit:{{index $rec 0}}gasUsed:{{index $rec 1}}
+
+
+
+ + \ No newline at end of file diff --git a/cmd/lotus-chainwatch/site/blocks.html b/cmd/lotus-chainwatch/site/blocks.html new file mode 100644 index 000000000..6e38d9e2d --- /dev/null +++ b/cmd/lotus-chainwatch/site/blocks.html @@ -0,0 +1,43 @@ + + + + Lotus ChainWatch + + + +{{$start := param "start" | parseInt}} + +
+
+
+ Lotus ChainWatch - Wallets +
+
+
+
+ + {{range pageDown $start 50}} + + + + + + {{end}} +
+ {{$h := .}} + {{$h}}; + + {{qstr `select count(distinct block_messages.message) from block_messages + inner join blocks b on block_messages.block = b.cid + where b.height = ?` $h}} Msgs + + {{range strings "blocks" "cid" "height = ?" $h}} + {{. | substr 54 62}} + {{end}} +
+ Next 50 +
+
+
+ + diff --git a/cmd/lotus-chainwatch/site/index.html b/cmd/lotus-chainwatch/site/index.html index 1a1c2813a..315ba5705 100644 --- a/cmd/lotus-chainwatch/site/index.html +++ b/cmd/lotus-chainwatch/site/index.html @@ -26,6 +26,11 @@ {{count "id_address_map" "id != address"}} Keys; E% FIL in wallets; F% FIL in miners; M% in market; %G Other actors; %H FIL it treasury +
+ {{$maxH := queryNum "select max(height) from blocks inner join blocks_synced bs on blocks.cid = bs.cid"}} + + {{count "blocks"}} Blocks; Current Height: {{$maxH}}; +
diff --git a/cmd/lotus-chainwatch/site/key.html b/cmd/lotus-chainwatch/site/key.html index 697838ed2..7ae65b13f 100644 --- a/cmd/lotus-chainwatch/site/key.html +++ b/cmd/lotus-chainwatch/site/key.html @@ -24,9 +24,9 @@ {{ range messages "`from` = ? or `to` = ?" $wallet $wallet $wallet}} {{ if eq .From.String $wallet }} - To{{.To.String}} + To{{.To.String}} {{else}} - From{{.From.String}} + From{{.From.String}} {{end}} {{.Nonce}} {{.Value}} diff --git a/cmd/lotus-chainwatch/site/keys.html b/cmd/lotus-chainwatch/site/keys.html index adf96adb1..01ba3d241 100644 --- a/cmd/lotus-chainwatch/site/keys.html +++ b/cmd/lotus-chainwatch/site/keys.html @@ -14,7 +14,12 @@
{{range strings "id_address_map" "address" "address != id"}} -
{{.}}
+ {{$addr := .}} +
+ {{$addr}} + {{qstr "select count(distinct cid) from messages where \"from\"=?" $addr}} outmsgs; + {{qstr "select count(distinct cid) from messages where \"to\"=?" $addr}} inmsgs +
{{end}}
diff --git a/cmd/lotus-chainwatch/site/main.css b/cmd/lotus-chainwatch/site/main.css index 820625360..cc9da2f35 100644 --- a/cmd/lotus-chainwatch/site/main.css +++ b/cmd/lotus-chainwatch/site/main.css @@ -6,6 +6,10 @@ body { margin: 0; } +b { + color: #aff; +} + .Index { width: 100vw; height: 100vh; diff --git a/cmd/lotus-chainwatch/templates.go b/cmd/lotus-chainwatch/templates.go index 02e85d6b0..c255e8399 100644 --- a/cmd/lotus-chainwatch/templates.go +++ b/cmd/lotus-chainwatch/templates.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "path/filepath" + "strconv" rice "github.com/GeertJohan/go.rice" "github.com/ipfs/go-cid" @@ -42,8 +43,15 @@ func newHandler(api api.FullNode, st *storage) (*handler, error) { "queryNum": h.queryNum, "sizeStr": sizeStr, "strings": h.strings, + "qstr": h.qstr, + "qstrs": h.qstrs, "messages": h.messages, + "pageDown": pageDown, + "parseInt": func(s string) (int, error) {i, e := strconv.ParseInt(s, 10, 64); return int(i), e}, + "substr": func(i, j int, s string,) string {return s[i:j]}, + "sub": func(a, b int) int {return a - b}, // TODO: really not builtin? + "param": func(string) string { return "" }, // replaced in request handler } @@ -193,7 +201,7 @@ func (h *handler) strings(table string, col string, filter string, args ...inter if len(filter) > 0 { filter = " where " + filter } - log.Info("strings qstr ", "select "+col+" from "+table+filter) + log.Info("strings qstr ", "select "+col+" from "+table+filter, args) rws, err := h.st.db.Query("select "+col+" from "+table+filter, args...) if err != nil { return nil, err @@ -209,6 +217,33 @@ func (h *handler) strings(table string, col string, filter string, args ...inter return } +func (h *handler) qstr(q string, p ...interface{}) (string, error) { + // explicitly not caring about sql injection too much, this doesn't take user input + + r, err := h.qstrs(q, 1, p...) + if err != nil { + return "", err + } + return r[0], nil +} + +func (h *handler) qstrs(q string, n int, p ...interface{}) ([]string, error) { + // explicitly not caring about sql injection too much, this doesn't take user input + + c := make([]string, n) + ia := make([]interface{}, n) + for i := range c { + ia[i] = &c[i] + } + err := h.st.db.QueryRow(q, p...).Scan(ia...) + if err != nil { + log.Error("qnum ", q, p, err) + return nil, err + } + + return c, nil +} + func (h *handler) messages(filter string, args ...interface{}) (out []types.Message, err error) { if len(filter) > 0 { filter = " where " + filter @@ -250,4 +285,13 @@ func (h *handler) messages(filter string, args ...interface{}) (out []types.Mess return } +func pageDown(base, n int) []int { + out := make([]int, n) + for i := range out { + out[i] = base - i + } + + return out +} + var _ http.Handler = &handler{} From b6b06f67dc80afb246c9bf5b7d04a63a54a331bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 11 Dec 2019 00:42:36 +0100 Subject: [PATCH 02/15] chainwatch: Postgres --- cmd/lotus-chainwatch/blockssub.go | 2 +- cmd/lotus-chainwatch/main.go | 2 +- cmd/lotus-chainwatch/storage.go | 186 ++++++++++++++---------------- cmd/lotus-chainwatch/sync.go | 40 ++++--- go.mod | 1 + go.sum | 2 + 6 files changed, 113 insertions(+), 120 deletions(-) diff --git a/cmd/lotus-chainwatch/blockssub.go b/cmd/lotus-chainwatch/blockssub.go index 9c5957f67..c569f1885 100644 --- a/cmd/lotus-chainwatch/blockssub.go +++ b/cmd/lotus-chainwatch/blockssub.go @@ -21,7 +21,7 @@ func subBlocks(ctx context.Context, api aapi.FullNode, st *storage) { bh.Cid(): bh, }, false) if err != nil { - log.Error(err) + log.Errorf("%+v", err) } } } diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index 187f2a17d..5d8df92b3 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -38,7 +38,7 @@ func main() { &cli.StringFlag{ Name: "db", EnvVars: []string{"LOTUS_DB"}, - Value: "./chainwatch.db", + Value: "", }, }, diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 59183eb18..2d2d56656 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -6,7 +6,7 @@ import ( "time" "github.com/ipfs/go-cid" - _ "github.com/mattn/go-sqlite3" + _ "github.com/lib/pq" "github.com/filecoin-project/lotus/chain/address" "github.com/filecoin-project/lotus/chain/types" @@ -19,7 +19,7 @@ type storage struct { } func openStorage(dbSource string) (*storage, error) { - db, err := sql.Open("sqlite3", dbSource) + db, err := sql.Open("postgres", dbSource) if err != nil { return nil, err } @@ -35,33 +35,76 @@ func (st *storage) setup() error { return err } _, err = tx.Exec(` +create table if not exists blocks_synced +( + cid text not null + constraint blocks_synced_pk + primary key, + add_ts int not null +); + +create unique index if not exists blocks_synced_cid_uindex + on blocks_synced (cid); + +create table if not exists block_parents +( + block text not null + constraint block_parents_pk + primary key, + parent text not null +); + +create unique index if not exists block_parents_block_parent_uindex + on block_parents (block, parent); + +create table if not exists blocks +( + cid text not null + constraint blocks_pk + primary key + constraint blocks_synced_blocks_cid_fk + references blocks_synced (cid) + constraint block_parents_blocks_cid_fk + references block_parents (block), + parentWeight numeric not null, + parentStateRoot text not null, + height int not null, + miner text not null, + timestamp int not null +); + +create unique index if not exists block_cid_uindex + on blocks (cid); + +create table if not exists id_address_map +( + id text not null, + address text not null, + constraint id_address_map_pk + primary key (id, address) +); + +create unique index if not exists id_address_map_id_uindex + on id_address_map (id); + +create unique index if not exists id_address_map_address_uindex + on id_address_map (address); + create table if not exists actors ( - id text not null, + id text not null + constraint id_address_map_actors_id_fk + references id_address_map (id), code text not null, head text not null, nonce int not null, balance text not null, stateroot text - constraint actors_blocks_stateroot_fk - references blocks (parentStateRoot), - constraint actors_pk - primary key (id, nonce, balance, stateroot) ); create index if not exists actors_id_index on actors (id); -create table if not exists id_address_map -( - id text not null - constraint id_address_map_actors_id_fk - references actors (id), - address text not null, - constraint id_address_map_pk - primary key (id, address) -); - create index if not exists id_address_map_address_index on id_address_map (address); @@ -84,57 +127,11 @@ create table if not exists messages gasprice int not null, gaslimit int not null, method int, - params blob + params bytea ); create unique index if not exists messages_cid_uindex on messages (cid); - -create table if not exists blocks -( - cid text not null - constraint blocks_pk - primary key, - parentWeight numeric not null, - parentStateRoot text not null, - height int not null, - miner text not null - constraint blocks_id_address_map_miner_fk - references id_address_map (address), - timestamp int not null -); - -create unique index if not exists block_cid_uindex - on blocks (cid); - -create table if not exists blocks_synced -( - cid text not null - constraint blocks_synced_pk - primary key - constraint blocks_synced_blocks_cid_fk - references blocks, - add_ts int not null -); - -create unique index if not exists blocks_synced_cid_uindex - on blocks_synced (cid); - -create table if not exists block_parents -( - block text not null - constraint block_parents_blocks_cid_fk - references blocks, - parent text not null - constraint block_parents_blocks_cid_fk_2 - references blocks -); - -create unique index if not exists block_parents_block_parent_uindex - on block_parents (block, parent); - -create unique index if not exists blocks_cid_uindex - on blocks (cid); create table if not exists block_messages ( @@ -166,13 +163,11 @@ create table if not exists receipts msg text not null constraint receipts_messages_cid_fk references messages, - state text not null - constraint receipts_blocks_parentStateRoot_fk - references blocks (parentStateRoot), + state text not null, idx int not null, exit int not null, gas_used int not null, - return blob, + return bytea, constraint receipts_pk primary key (msg, state) ); @@ -180,18 +175,11 @@ create table if not exists receipts create index if not exists receipts_msg_state_index on receipts (msg, state); - create table if not exists miner_heads ( - head text not null - constraint miner_heads_actors_head_fk - references actors (head), - addr text not null - constraint miner_heads_actors_id_fk - references actors (id), - stateroot text not null - constraint miner_heads_blocks_stateroot_fk - references blocks (parentStateRoot), + head text not null, + addr text not null, + stateroot text not null, sectorset text not null, provingset text not null, owner text not null, @@ -219,7 +207,7 @@ create table if not exists miner_heads func (st *storage) hasBlock(bh cid.Cid) bool { var exitsts bool - err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=?)`, bh.String()).Scan(&exitsts) + err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=$1)`, bh.String()).Scan(&exitsts) if err != nil { log.Error(err) return false @@ -233,7 +221,7 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Ci return err } - stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values (?, ?, ?, ?, ?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values ($1, $2, $3, $4, $5, $6) on conflict do nothing`) if err != nil { return err } @@ -255,7 +243,7 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { return err } - stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, provingset, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, provingset, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) on conflict do nothing`) if err != nil { return err } @@ -292,18 +280,7 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e return err } - stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values (?, ?, ?, ?, ?, ?) on conflict do nothing`) - if err != nil { - return err - } - defer stmt.Close() - for _, bh := range bhs { - if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil { - return err - } - } - - stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values (?, ?) on conflict do nothing`) + stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`) if err != nil { return err } @@ -317,7 +294,7 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e } if sync { - stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values (?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`) if err != nil { return err } @@ -331,6 +308,17 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e } } + stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values ($1, $2, $3, $4, $5, $6) on conflict do nothing`) + if err != nil { + return err + } + defer stmt.Close() + for _, bh := range bhs { + if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil { + return err + } + } + return tx.Commit() } @@ -340,7 +328,7 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { return err } - stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) on conflict do nothing`) if err != nil { return err } @@ -371,7 +359,7 @@ func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error { return err } - stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES (?, ?, ?, ?, ?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES ($1,$2,$3,$4,$5,$6) on conflict do nothing`) if err != nil { return err } @@ -399,7 +387,7 @@ func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) er return err } - stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES (?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES ($1, $2) on conflict do nothing`) if err != nil { return err } @@ -426,7 +414,7 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { return err } - stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES (?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES ($1, $2) on conflict do nothing`) if err != nil { return err } @@ -452,7 +440,7 @@ func (st *storage) storeMpoolInclusion(msg cid.Cid) error { return err } - stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES (?, ?) on conflict do nothing`) + stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES ($1, $2) on conflict do nothing`) if err != nil { return err } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index f9dcf44e3..3f92c99d6 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -223,25 +223,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS msgs, incls := fetchMessages(ctx, api, toSync) - if err := st.storeMessages(msgs); err != nil { - log.Error(err) - return - } - - if err := st.storeMsgInclusions(incls); err != nil { - log.Error(err) - return - } - - log.Infof("Getting parent receipts") - - receipts := fetchParentReceipts(ctx, api, toSync) - - if err := st.storeReceipts(receipts); err != nil { - log.Error(err) - return - } - log.Infof("Resolving addresses") for _, message := range msgs { @@ -265,6 +246,27 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS return } + log.Infof("Persisting messages") + + if err := st.storeMessages(msgs); err != nil { + log.Error(err) + return + } + + if err := st.storeMsgInclusions(incls); err != nil { + log.Error(err) + return + } + + log.Infof("Getting parent receipts") + + receipts := fetchParentReceipts(ctx, api, toSync) + + if err := st.storeReceipts(receipts); err != nil { + log.Error(err) + return + } + log.Infof("Sync done") } diff --git a/go.mod b/go.mod index 1dfb070ee..fbe1dded4 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 + github.com/lib/pq v1.2.0 github.com/libp2p/go-libp2p v0.3.0 github.com/libp2p/go-libp2p-circuit v0.1.1 github.com/libp2p/go-libp2p-connmgr v0.1.0 diff --git a/go.sum b/go.sum index 9edc75a8c..f592fff7b 100644 --- a/go.sum +++ b/go.sum @@ -282,6 +282,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/libp2p/go-addr-util v0.0.1 h1:TpTQm9cXVRVSKsYbgQ7GKc3KbbHVTnbostgGaDEP+88= github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ= github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ= From 66ee9d209d2490428cf8fe1cc5dd40743d4402b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 11 Dec 2019 23:17:44 +0100 Subject: [PATCH 03/15] wip --- cmd/lotus-chainwatch/dot.go | 4 +- cmd/lotus-chainwatch/mpool.go | 2 +- cmd/lotus-chainwatch/storage.go | 114 ++++++++++++++++++++------------ cmd/lotus-chainwatch/sync.go | 19 ++++-- 4 files changed, 89 insertions(+), 50 deletions(-) diff --git a/cmd/lotus-chainwatch/dot.go b/cmd/lotus-chainwatch/dot.go index dc93c0c58..374b5e8f5 100644 --- a/cmd/lotus-chainwatch/dot.go +++ b/cmd/lotus-chainwatch/dot.go @@ -30,6 +30,8 @@ var dotCmd = &cli.Command{ fmt.Println("digraph D {") + hl := st.hasList() + for res.Next() { var block, parent, miner string var height uint64 @@ -42,7 +44,7 @@ var dotCmd = &cli.Command{ return err } - has := st.hasBlock(bc) + _, has := hl[bc] col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0xc0c0c0c0 + 0x30303030 diff --git a/cmd/lotus-chainwatch/mpool.go b/cmd/lotus-chainwatch/mpool.go index ce9e39b84..aced80324 100644 --- a/cmd/lotus-chainwatch/mpool.go +++ b/cmd/lotus-chainwatch/mpool.go @@ -26,7 +26,7 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) { change.Message.Message.Cid(): &change.Message.Message, }) if err != nil { - log.Error(err) + //log.Error(err) continue } diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 2d2d56656..1efac40d9 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -61,11 +61,7 @@ create table if not exists blocks ( cid text not null constraint blocks_pk - primary key - constraint blocks_synced_blocks_cid_fk - references blocks_synced (cid) - constraint block_parents_blocks_cid_fk - references block_parents (block), + primary key, parentWeight numeric not null, parentStateRoot text not null, height int not null, @@ -205,14 +201,31 @@ create table if not exists miner_heads return tx.Commit() } -func (st *storage) hasBlock(bh cid.Cid) bool { - var exitsts bool - err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=$1)`, bh.String()).Scan(&exitsts) +func (st *storage) hasList() map[cid.Cid]struct{} { + rws, err := st.db.Query(`select cid FROM blocks_synced`) if err != nil { log.Error(err) - return false + return map[cid.Cid]struct{}{} } - return exitsts + out := map[cid.Cid]struct{}{} + + for rws.Next() { + var c string + if err := rws.Scan(&c); err != nil { + log.Error(err) + continue + } + + ci, err := cid.Parse(c) + if err != nil { + log.Error(err) + continue + } + + out[ci] = struct{}{} + } + + return out } func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error { @@ -271,55 +284,74 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { return tx.Commit() } +func (st *storage) batch(n int) chan *sql.Tx { + out := make(chan *sql.Tx, n) + for i := 0; i < n; i++ { + tx, err := st.db.Begin() + if err != nil { + log.Error(err) + } + + out <- tx + } + + return out +} + +func endbatch(b chan *sql.Tx) { + n := len(b) + for i := 0; i < n; i++ { + tx := <- b + if err := tx.Commit(); err != nil { + log.Error(err) + } + } +} + func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { st.headerLk.Lock() defer st.headerLk.Unlock() - tx, err := st.db.Begin() - if err != nil { - return err - } + tb := st.batch(50) - stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`) - if err != nil { - return err - } - defer stmt2.Close() - for _, bh := range bhs { + par(100, maparr(bhs), func(bh *types.BlockHeader) { for _, parent := range bh.Parents { - if _, err := stmt2.Exec(bh.Cid().String(), parent.String()); err != nil { + log.Info("bps ", bh.Cid()) + tx := <-tb + stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`) + if err != nil { return err } + if _, err := tx.Exec(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`, bh.Cid().String(), parent.String()); err != nil { + log.Error(err) + } + tb <- tx } - } + }) if sync { - stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`) - if err != nil { - return err - } - defer stmt.Close() now := time.Now().Unix() - for _, bh := range bhs { - if _, err := stmt.Exec(bh.Cid().String(), now); err != nil { - return err + par(50, maparr(bhs), func(bh *types.BlockHeader) { + tx := <-tb + if _, err := tx.Exec(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`, bh.Cid().String(), now); err != nil { + log.Error(err) } - } + tb <- tx + }) } - stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values ($1, $2, $3, $4, $5, $6) on conflict do nothing`) - if err != nil { - return err - } - defer stmt.Close() - for _, bh := range bhs { - if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil { - return err + par(50, maparr(bhs), func(bh *types.BlockHeader) { + log.Info("bh", bh.Cid()) + tx := <-tb + if _, err := tx.Exec(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values ($1, $2, $3, $4, $5, $6) on conflict do nothing`, bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil { + log.Error(err) } - } + tb <- tx + }) - return tx.Commit() + endbatch(tb) + return nil } func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 3f92c99d6..f2ce31c12 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -53,6 +53,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS actors := map[address.Address]map[types.Actor]cid.Cid{} var alk sync.Mutex + log.Infof("Getting synced block list") + + hazlist := st.hasList() + log.Infof("Getting headers / actors") toSync := map[cid.Cid]*types.BlockHeader{} @@ -65,7 +69,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS for toVisit.Len() > 0 { bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) - if _, seen := toSync[bh.Cid()]; seen || st.hasBlock(bh.Cid()) { + _, has := hazlist[bh.Cid()] + if _, seen := toSync[bh.Cid()]; seen || has { continue } @@ -93,6 +98,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Syncing %d blocks", len(toSync)) + log.Infof("Persisting headers") + if err := st.storeHeaders(toSync, true); err != nil { + log.Error(err) + return + } + log.Infof("Persisting actors") paDone := 0 @@ -213,12 +224,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS return } - log.Infof("Persisting headers") - if err := st.storeHeaders(toSync, true); err != nil { - log.Error(err) - return - } - log.Infof("Getting messages") msgs, incls := fetchMessages(ctx, api, toSync) From 04691a13dae09516bb68ee3fabef3663bd90ea51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 12 Dec 2019 19:34:28 +0100 Subject: [PATCH 04/15] chainwatch: Fix postgres data import --- cmd/lotus-chainwatch/blockssub.go | 2 +- cmd/lotus-chainwatch/storage.go | 214 +++++++++++++++++++++++------- cmd/lotus-chainwatch/sync.go | 87 ++++++------ 3 files changed, 213 insertions(+), 90 deletions(-) diff --git a/cmd/lotus-chainwatch/blockssub.go b/cmd/lotus-chainwatch/blockssub.go index c569f1885..2147639a3 100644 --- a/cmd/lotus-chainwatch/blockssub.go +++ b/cmd/lotus-chainwatch/blockssub.go @@ -21,7 +21,7 @@ func subBlocks(ctx context.Context, api aapi.FullNode, st *storage) { bh.Cid(): bh, }, false) if err != nil { - log.Errorf("%+v", err) + //log.Errorf("%+v", err) } } } diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index c0bed9f81..2376c6d64 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -2,6 +2,7 @@ package main import ( "database/sql" + "golang.org/x/xerrors" "sync" "time" @@ -64,16 +65,16 @@ create table if not exists blocks primary key, parentWeight numeric not null, parentStateRoot text not null, - height int not null, + height bigint not null, miner text not null, - timestamp int not null, + timestamp bigint not null, vrfproof bytea, - tickets int not null, + tickets bigint not null, eprof bytea, prand bytea, ep0partial bytea, - ep0sector int not null, - ep0challangei int not null + ep0sector bigint not null, + ep0challangei bigint not null ); create unique index if not exists block_cid_uindex @@ -119,12 +120,8 @@ create table if not exists messages cid text not null constraint messages_pk primary key, - "from" text not null - constraint messages_id_address_map_from_fk - references id_address_map (address), - "to" text not null - constraint messages_id_address_map_to_fk - references id_address_map (address), + "from" text not null, + "to" text not null, nonce int not null, value text not null, gasprice int not null, @@ -138,12 +135,8 @@ create unique index if not exists messages_cid_uindex create table if not exists block_messages ( - block text not null - constraint block_messages_blk_fk - references blocks (cid), - message text not null - constraint block_messages_msg_fk - references messages, + block text not null, + message text not null, constraint block_messages_pk primary key (block, message) ); @@ -163,9 +156,7 @@ create unique index if not exists mpool_messages_msg_uindex create table if not exists receipts ( - msg text not null - constraint receipts_messages_cid_fk - references messages, + msg text not null, state text not null, idx int not null, exit int not null, @@ -184,21 +175,17 @@ create table if not exists miner_heads addr text not null, stateroot text not null, sectorset text not null, - setsize int not null, + setsize bigint not null, provingset text not null, - provingsize int not null, + provingsize bigint not null, owner text not null, worker text not null, peerid text not null, - sectorsize int not null, + sectorsize bigint not null, power text not null, - active int, - ppe int not null, - slashed_at int not null, - constraint miner_heads_id_address_map_owner_fk - foreign key (owner) references id_address_map (address), - constraint miner_heads_id_address_map_worker_fk - foreign key (worker) references id_address_map (address), + active bool, + ppe bigint not null, + slashed_at bigint not null, constraint miner_heads_pk primary key (head, addr) ); @@ -242,12 +229,20 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Ci if err != nil { return err } + if _, err := tx.Exec(` - stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values ($1, $2, $3, $4, $5, $6) on conflict do nothing`) +create temp table a (like actors excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy a (id, code, head, nonce, balance, stateroot) from stdin `) if err != nil { return err } - defer stmt.Close() + for addr, acts := range actors { for act, st := range acts { if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil { @@ -256,6 +251,14 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Ci } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into actors select * from a on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + return tx.Commit() } @@ -265,11 +268,19 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { return err } - stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table mh (like miner_heads excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) from STDIN`) if err != nil { return err } - defer stmt.Close() for k, i := range miners { if _, err := stmt.Exec( k.act.Head.String(), @@ -291,6 +302,13 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { return err } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } @@ -301,14 +319,24 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e tx, err := st.db.Begin() if err != nil { - return err + return xerrors.Errorf("begin: %w", err) } - stmt, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table tbp (like block_parents excluding constraints) on commit drop; +create temp table bs (like blocks_synced excluding constraints) on commit drop; +create temp table b (like blocks excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`) if err != nil { return err } - defer stmt.Close() for _, bh := range bhs { for _, parent := range bh.Parents { @@ -318,27 +346,41 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil { + return xerrors.Errorf("parent put: %w", err) + } + if sync { now := time.Now().Unix() - stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`) + stmt, err := tx.Prepare(`copy bs (cid, add_ts) from stdin `) if err != nil { return err } - defer stmt.Close() for _, bh := range bhs { - if _, err := tx.Exec(bh.Cid().String(), now); err != nil { + if _, err := stmt.Exec(bh.Cid().String(), now); err != nil { log.Error(err) } } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil { + return xerrors.Errorf("syncd put: %w", err) + } } - stmt2, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp", vrfproof, tickets, eprof, prand, ep0partial, ep0sector, ep0challangei) values ($1, $2, $3, $4, $5, $6,$7,$8,$9,$10,$11,$12,$13) on conflict do nothing`) + stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", vrfproof, tickets, eprof, prand, ep0partial, ep0sector, ep0challangei) from stdin `) if err != nil { return err } - defer stmt2.Close() for _, bh := range bhs { l := len(bh.EPostProof.Candidates) @@ -364,6 +406,18 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e } } + if err := stmt2.Close(); err != nil { + return xerrors.Errorf("s2 close: %w", err) + } + + if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil { + return xerrors.Errorf("blk put: %w", err) + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit: %w", err) + } return nil } @@ -373,11 +427,19 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { return err } - stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table msgs (like messages excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy msgs (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) from stdin `) if err != nil { return err } - defer stmt.Close() for c, m := range msgs { if _, err := stmt.Exec( @@ -394,6 +456,13 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { return err } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into messages select * from msgs on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } @@ -404,11 +473,19 @@ func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error { return err } - stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES ($1,$2,$3,$4,$5,$6) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table recs (like receipts excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy recs (msg, state, idx, exit, gas_used, return) from stdin `) if err != nil { return err } - defer stmt.Close() for c, m := range recs { if _, err := stmt.Exec( @@ -422,6 +499,13 @@ func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error { return err } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into receipts select * from recs on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } @@ -432,11 +516,19 @@ func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) er return err } - stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES ($1, $2) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table iam (like id_address_map excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy iam (id, address) from STDIN `) if err != nil { return err } - defer stmt.Close() for a, i := range addrs { if i == address.Undef { @@ -449,6 +541,13 @@ func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) er return err } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } @@ -459,11 +558,19 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { return err } - stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES ($1, $2) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table mi (like block_messages excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mi (block, message) from STDIN `) if err != nil { return err } - defer stmt.Close() for b, msgs := range incls { for _, msg := range msgs { @@ -475,6 +582,13 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { } } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into block_messages select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 666f82a15..0854cc802 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -101,14 +101,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Syncing %d blocks", len(toSync)) - log.Infof("Persisting headers") - if err := st.storeHeaders(toSync, true); err != nil { - log.Error(err) - return - } - - log.Infof("Persisting actors") - paDone := 0 par(50, maparr(toSync), func(bh *types.BlockHeader) { paDone++ @@ -173,12 +165,29 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } }) - if err := st.storeActors(actors); err != nil { - log.Error(err) - return + log.Infof("Getting messages") + + msgs, incls := fetchMessages(ctx, api, toSync) + + log.Infof("Resolving addresses") + + for _, message := range msgs { + addresses[message.To] = address.Undef + addresses[message.From] = address.Undef } - log.Infof("Persisting miners") + par(50, kmaparr(addresses), func(addr address.Address) { + raddr, err := api.StateLookupID(ctx, addr, nil) + if err != nil { + log.Warn(err) + return + } + alk.Lock() + addresses[addr] = raddr + alk.Unlock() + }) + + log.Infof("Getting miner info") miners := map[minerKey]*minerInfo{} @@ -230,53 +239,53 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } }) - if err := st.storeMiners(miners); err != nil { - log.Error(err) + log.Info("Getting receipts") + + receipts := fetchParentReceipts(ctx, api, toSync) + + log.Info("Storing headers") + + if err := st.storeHeaders(toSync, true); err != nil { + log.Errorf("%+v", err) return } - log.Infof("Getting messages") - - msgs, incls := fetchMessages(ctx, api, toSync) - - log.Infof("Resolving addresses") - - for _, message := range msgs { - addresses[message.To] = address.Undef - addresses[message.From] = address.Undef - } - - par(50, kmaparr(addresses), func(addr address.Address) { - raddr, err := api.StateLookupID(ctx, addr, nil) - if err != nil { - log.Warn(err) - return - } - alk.Lock() - addresses[addr] = raddr - alk.Unlock() - }) + log.Info("Storing address mapping") if err := st.storeAddressMap(addresses); err != nil { log.Error(err) return } - log.Infof("Persisting messages") + log.Info("Storing actors") + + if err := st.storeActors(actors); err != nil { + log.Error(err) + return + } + + log.Info("Storing miners") + + if err := st.storeMiners(miners); err != nil { + log.Error(err) + return + } + + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil { log.Error(err) return } + log.Info("Storing message inclusions") + if err := st.storeMsgInclusions(incls); err != nil { log.Error(err) return } - log.Infof("Getting parent receipts") - - receipts := fetchParentReceipts(ctx, api, toSync) + log.Infof("Storing parent receipts") if err := st.storeReceipts(receipts); err != nil { log.Error(err) From 10189990967b9fce80c3f8328d0fdd7a51365995 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 13 Dec 2019 10:30:51 +0100 Subject: [PATCH 05/15] WIP fixing queries --- cmd/lotus-chainwatch/dot.go | 2 +- cmd/lotus-chainwatch/mpool.go | 24 +++++++++++++----------- cmd/lotus-chainwatch/storage.go | 4 +++- cmd/lotus-chainwatch/templates.go | 8 ++++---- 4 files changed, 21 insertions(+), 17 deletions(-) diff --git a/cmd/lotus-chainwatch/dot.go b/cmd/lotus-chainwatch/dot.go index 4cd415b12..bf81e817c 100644 --- a/cmd/lotus-chainwatch/dot.go +++ b/cmd/lotus-chainwatch/dot.go @@ -23,7 +23,7 @@ var dotCmd = &cli.Command{ tosee, err := strconv.ParseInt(cctx.Args().Get(1), 10, 32) maxH := minH + tosee - res, err := st.db.Query("select block, parent, b.miner, b.height from block_parents inner join blocks b on block_parents.block = b.cid where b.height > ? and b.height < ?", minH, maxH) + res, err := st.db.Query("select block, parent, b.miner, b.height from block_parents inner join blocks b on block_parents.block = b.cid where b.height > $1 and b.height < $2", minH, maxH) if err != nil { return err } diff --git a/cmd/lotus-chainwatch/mpool.go b/cmd/lotus-chainwatch/mpool.go index aced80324..aaea80c75 100644 --- a/cmd/lotus-chainwatch/mpool.go +++ b/cmd/lotus-chainwatch/mpool.go @@ -22,17 +22,19 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) { log.Info("mpool message") - err := st.storeMessages(map[cid.Cid]*types.Message{ - change.Message.Message.Cid(): &change.Message.Message, - }) - if err != nil { - //log.Error(err) - continue - } + go func() { + err := st.storeMessages(map[cid.Cid]*types.Message{ + change.Message.Message.Cid(): &change.Message.Message, + }) + if err != nil { + //log.Error(err) + return + } + + if err := st.storeMpoolInclusion(change.Message.Message.Cid()); err != nil { + log.Error(err) + } + }() - if err := st.storeMpoolInclusion(change.Message.Message.Cid()); err != nil { - log.Error(err) - continue - } } } diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 2376c6d64..477eb50a6 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -25,6 +25,8 @@ func openStorage(dbSource string) (*storage, error) { return nil, err } + db.SetMaxOpenConns(1350) + st := &storage{db: db} return st, st.setup() @@ -182,7 +184,7 @@ create table if not exists miner_heads worker text not null, peerid text not null, sectorsize bigint not null, - power text not null, + power bigint not null, active bool, ppe bigint not null, slashed_at bigint not null, diff --git a/cmd/lotus-chainwatch/templates.go b/cmd/lotus-chainwatch/templates.go index 02e85d6b0..b336953c5 100644 --- a/cmd/lotus-chainwatch/templates.go +++ b/cmd/lotus-chainwatch/templates.go @@ -153,10 +153,10 @@ func (h *handler) netPower(slashFilt string) (types.BigInt, error) { if slashFilt != "" { slashFilt = " where " + slashFilt } - return h.queryNum(`select sum(power) from ( - select miner_heads.power, miner_heads.slashed_at, max(height) from miner_heads - inner join blocks b on miner_heads.stateroot = b.parentStateRoot - group by miner_heads.addr)` + slashFilt) + return h.queryNum(`select sum(a.power) from ( + select max(miner_heads.power), miner_heads.slashed_at, height as power from miner_heads + inner join blocks b on miner_heads.stateroot = b.parentStateRoot + group by miner_heads.addr) a` + slashFilt) } func (h *handler) queryNum(q string, p ...interface{}) (types.BigInt, error) { From f2ea12571fdba34ba0509aee0edae7d32d10c72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 13 Dec 2019 12:04:24 +0100 Subject: [PATCH 06/15] Optimise mpool update processing --- cmd/lotus-chainwatch/mpool.go | 50 +++++++++++++++++++++--------- cmd/lotus-chainwatch/site/key.html | 4 +-- cmd/lotus-chainwatch/storage.go | 38 ++++++++++++++++++----- 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/cmd/lotus-chainwatch/mpool.go b/cmd/lotus-chainwatch/mpool.go index aaea80c75..ea45380b7 100644 --- a/cmd/lotus-chainwatch/mpool.go +++ b/cmd/lotus-chainwatch/mpool.go @@ -2,6 +2,7 @@ package main import ( "context" + "time" "github.com/ipfs/go-cid" @@ -15,26 +16,45 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) { return } - for change := range sub { - if change.Type != aapi.MpoolAdd { - continue + for { + var updates []aapi.MpoolUpdate + + select { + case update := <-sub: + updates = append(updates, update) + case <-ctx.Done(): + return } - log.Info("mpool message") + loop: + for { + time.Sleep(10 * time.Millisecond) + select { + case update := <-sub: + updates = append(updates, update) + default: + break loop + } + } - go func() { - err := st.storeMessages(map[cid.Cid]*types.Message{ - change.Message.Message.Cid(): &change.Message.Message, - }) - if err != nil { - //log.Error(err) - return + msgs := map[cid.Cid]*types.Message{} + for _, v := range updates { + if v.Type != aapi.MpoolAdd { + continue } - if err := st.storeMpoolInclusion(change.Message.Message.Cid()); err != nil { - log.Error(err) - } - }() + msgs[v.Message.Message.Cid()] = &v.Message.Message + } + log.Infof("Processing %d mpool updates", len(msgs)) + + err := st.storeMessages(msgs) + if err != nil { + log.Error(err) + } + + if err := st.storeMpoolInclusions(updates); err != nil { + log.Error(err) + } } } diff --git a/cmd/lotus-chainwatch/site/key.html b/cmd/lotus-chainwatch/site/key.html index 697838ed2..40ec02cb6 100644 --- a/cmd/lotus-chainwatch/site/key.html +++ b/cmd/lotus-chainwatch/site/key.html @@ -15,13 +15,13 @@
- Balance: {{queryNum "select balance from actors inner join main.id_address_map m on m.address = ? where actors.id = m.id order by nonce desc limit 1" $wallet }} + Balance: {{queryNum "select balance from actors inner join id_address_map m on m.address = $1 where actors.id = m.id order by nonce desc limit 1" $wallet }}
Messages: - {{ range messages "`from` = ? or `to` = ?" $wallet $wallet $wallet}} + {{ range messages "`from` = $1 or `to` = $2" $wallet $wallet $wallet}} {{ if eq .From.String $wallet }} diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 477eb50a6..01c93c231 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -2,6 +2,7 @@ package main import ( "database/sql" + "github.com/filecoin-project/lotus/api" "golang.org/x/xerrors" "sync" "time" @@ -595,24 +596,47 @@ create temp table mi (like block_messages excluding constraints) on commit drop; return tx.Commit() } -func (st *storage) storeMpoolInclusion(msg cid.Cid) error { +func (st *storage) storeMpoolInclusions(msgs []api.MpoolUpdate) error { tx, err := st.db.Begin() if err != nil { return err } - stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES ($1, $2) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table mi (like mpool_messages excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mi (msg, add_ts) from stdin `) if err != nil { return err } - defer stmt.Close() - if _, err := stmt.Exec( - msg.String(), - time.Now().Unix(), - ); err != nil { + for _, msg := range msgs { + if msg.Type != api.MpoolAdd { + continue + } + + if _, err := stmt.Exec( + msg.Message.Message.Cid().String(), + time.Now().Unix(), + ); err != nil { + return err + } + } + + if err := stmt.Close(); err != nil { return err } + + if _, err := tx.Exec(`insert into mpool_messages select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + return tx.Commit() } From a246be587e37a62675d0837edb3deb5da3bb8f4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 13 Dec 2019 12:48:42 +0100 Subject: [PATCH 07/15] chainwatch: fix ui queries --- cmd/lotus-chainwatch/site/key.html | 2 +- cmd/lotus-chainwatch/templates.go | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/lotus-chainwatch/site/key.html b/cmd/lotus-chainwatch/site/key.html index 40ec02cb6..e6e5297e1 100644 --- a/cmd/lotus-chainwatch/site/key.html +++ b/cmd/lotus-chainwatch/site/key.html @@ -21,7 +21,7 @@ Messages:
DirPeerNonceValueBlockMpool Wait
To{{.To.String}}
- {{ range messages "`from` = $1 or `to` = $2" $wallet $wallet $wallet}} + {{ range messages "\"from\" = $1 or \"to\" = $1" $wallet}} {{ if eq .From.String $wallet }} diff --git a/cmd/lotus-chainwatch/templates.go b/cmd/lotus-chainwatch/templates.go index b336953c5..4873bbcb1 100644 --- a/cmd/lotus-chainwatch/templates.go +++ b/cmd/lotus-chainwatch/templates.go @@ -153,10 +153,9 @@ func (h *handler) netPower(slashFilt string) (types.BigInt, error) { if slashFilt != "" { slashFilt = " where " + slashFilt } - return h.queryNum(`select sum(a.power) from ( - select max(miner_heads.power), miner_heads.slashed_at, height as power from miner_heads + return h.queryNum(`select sum(power) from (select distinct on (addr) power, slashed_at from miner_heads inner join blocks b on miner_heads.stateroot = b.parentStateRoot - group by miner_heads.addr) a` + slashFilt) +order by addr, height desc) as p` + slashFilt) } func (h *handler) queryNum(q string, p ...interface{}) (types.BigInt, error) { @@ -214,6 +213,8 @@ func (h *handler) messages(filter string, args ...interface{}) (out []types.Mess filter = " where " + filter } + log.Info("select * from messages "+filter) + rws, err := h.st.db.Query("select * from messages "+filter, args...) if err != nil { return nil, err From 2e9655a4e044e27221f601a1bbedc39774309730 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 13 Dec 2019 14:42:01 +0100 Subject: [PATCH 08/15] More fixes --- cmd/lotus-chainwatch/site/block.html | 16 ++++++++-------- cmd/lotus-chainwatch/site/blocks.html | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cmd/lotus-chainwatch/site/block.html b/cmd/lotus-chainwatch/site/block.html index b9aa09c32..9d247b88b 100644 --- a/cmd/lotus-chainwatch/site/block.html +++ b/cmd/lotus-chainwatch/site/block.html @@ -15,32 +15,32 @@
-
Miner: {{index (strings "blocks" "miner" "cid=?" $cid) 0}}
+
Miner: {{index (strings "blocks" "miner" "cid=$1" $cid) 0}}
Parents:
- {{range strings "block_parents" "parent" "block=?" $cid}} + {{range strings "block_parents" "parent" "block=$1" $cid}} {{$parent := .}} {{. | substr 54 62}} {{end}}
Messages:
DirPeerNonceValueBlockMpool Wait
To{{.To.String}}
- {{range strings "block_messages" "message" "block=?" $cid}} + {{range strings "block_messages" "message" "block=$1" $cid}} {{$msg := .}} {{$rec := qstrs `select r.exit, r.gas_used from messages inner join block_messages bm on messages.cid = bm.message @@ -48,7 +48,7 @@ inner join block_parents bp on b.cid = bp.parent inner join blocks chd on bp.block = chd.cid inner join receipts r on messages.cid = r.msg and chd.parentStateRoot = r.state - where messages.cid=? and b.cid=?` 2 $msg $cid}} + where messages.cid=$1 and b.cid=$2` 2 $msg $cid}} diff --git a/cmd/lotus-chainwatch/site/blocks.html b/cmd/lotus-chainwatch/site/blocks.html index 6e38d9e2d..296d0d460 100644 --- a/cmd/lotus-chainwatch/site/blocks.html +++ b/cmd/lotus-chainwatch/site/blocks.html @@ -25,10 +25,10 @@ From 97cf3f70a59665d37b4163dd456733af2dffd93b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 13 Dec 2019 15:47:51 +0100 Subject: [PATCH 09/15] chainwatch: sub to mpool/blocks afetr initial sync --- cmd/lotus-chainwatch/main.go | 2 -- cmd/lotus-chainwatch/sync.go | 6 ++++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cmd/lotus-chainwatch/main.go b/cmd/lotus-chainwatch/main.go index f0b0fd0a3..7de753a7a 100644 --- a/cmd/lotus-chainwatch/main.go +++ b/cmd/lotus-chainwatch/main.go @@ -82,8 +82,6 @@ var runCmd = &cli.Command{ defer st.close() runSyncer(ctx, api, st) - go subMpool(ctx, api, st) - go subBlocks(ctx, api, st) h, err := newHandler(api, st) if err != nil { diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 0854cc802..496903084 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -32,6 +32,12 @@ func runSyncer(ctx context.Context, api api.FullNode, st *storage) { case store.HCRevert: log.Warnf("revert todo") } + + if change.Type == store.HCCurrent { + go subMpool(ctx, api, st) + go subBlocks(ctx, api, st) + } + } } }() From 855fb26a81762b6ba994815c8f2b800345e203ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jan 2020 15:18:13 +0100 Subject: [PATCH 10/15] mod tidy --- go.mod | 2 -- go.sum | 14 +++++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 008983b18..6046a9bcf 100644 --- a/go.mod +++ b/go.mod @@ -51,7 +51,6 @@ require ( github.com/ipfs/go-path v0.0.7 github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b - github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 github.com/lib/pq v1.2.0 github.com/libp2p/go-libp2p v0.4.2 github.com/libp2p/go-libp2p-circuit v0.1.4 @@ -71,7 +70,6 @@ require ( github.com/libp2p/go-libp2p-tls v0.1.0 github.com/libp2p/go-libp2p-yamux v0.2.1 github.com/libp2p/go-maddr-filter v0.0.5 - github.com/mattn/go-sqlite3 v1.12.0 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/minio/sha256-simd v0.1.1 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index aeae4ee03..a552dcd4f 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,7 @@ github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkBy github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/GeertJohan/go.incremental v1.0.0 h1:7AH+pY1XUgQE4Y1HcXYaMqAI0m9yrFqo/jt0CW30vsg= github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0= github.com/GeertJohan/go.rice v1.0.0 h1:KkI6O9uMaQU3VEKaj01ulavtF7o1fWT7+pk/4voiMLQ= github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0= @@ -20,6 +21,7 @@ github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrU github.com/Stebalien/go-bitfield v0.0.1 h1:X3kbSSPUaJK60wV2hjOPZwmpljr6VGCqdq4cBLhbQBo= github.com/Stebalien/go-bitfield v0.0.1/go.mod h1:GNjFpasyUVkHMsfEOk8EFLJ9syQ6SI+XWrX9Wf2XH0s= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= +github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw= github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc h1:cAKDfWh5VpdgMhJosfJnn5/FoN2SRZ4p7fJNX58YPaU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -88,12 +90,12 @@ github.com/filecoin-project/go-amt-ipld v0.0.0-20191205011053-79efc22d6cdc/go.mo github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8= github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= -github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= -github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA= -github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus= github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= +github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA= +github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878 h1:YicJT9xhPzZ1SBGiJFNUCkfwqK/G9vFyY1ytKBSjNJA= +github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= github.com/filecoin-project/go-paramfetch v0.0.0-20200102181131-b20d579f2878/go.mod h1:40kI2Gv16mwcRsHptI3OAV4nlOEU7wVDc4RgMylNFjU= github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107152336-0cbb2c483013 h1:OGpRq3HRxyrxZJtbNKCOsb5YTmc+RBLLwdAgwZfkRnY= github.com/filecoin-project/go-sectorbuilder v0.0.0-20200107152336-0cbb2c483013/go.mod h1:3OZ4E3B2OuwhJjtxR4r7hPU9bCfB+A+hm4alLEsaeDc= @@ -285,6 +287,7 @@ github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsj github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -496,8 +499,6 @@ github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54= github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= -github.com/mattn/go-sqlite3 v1.12.0 h1:u/x3mp++qUxvYfulZ4HKOvVO0JWhk7HtE8lWhbGz/Do= -github.com/mattn/go-sqlite3 v1.12.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/miekg/dns v1.1.12/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -552,6 +553,7 @@ github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS github.com/multiformats/go-varint v0.0.2 h1:6sUvyh2YHpJCb8RZ6eYzj6iJQ4+chWYmyIHxszqlPTA= github.com/multiformats/go-varint v0.0.2/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg= github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -630,7 +632,9 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830 h1:8kxMKmKzXXL4Ru1nyhvdms/JjWt+3YLpvRb/bAjO/y0= From 1b5e1c47537420018f00fe4bc13c04ca311710ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jan 2020 15:18:30 +0100 Subject: [PATCH 11/15] gofmt --- cmd/lotus-chainwatch/templates.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/lotus-chainwatch/templates.go b/cmd/lotus-chainwatch/templates.go index 40020da78..32b9d398c 100644 --- a/cmd/lotus-chainwatch/templates.go +++ b/cmd/lotus-chainwatch/templates.go @@ -43,14 +43,14 @@ func newHandler(api api.FullNode, st *storage) (*handler, error) { "queryNum": h.queryNum, "sizeStr": sizeStr, "strings": h.strings, - "qstr": h.qstr, - "qstrs": h.qstrs, + "qstr": h.qstr, + "qstrs": h.qstrs, "messages": h.messages, "pageDown": pageDown, - "parseInt": func(s string) (int, error) {i, e := strconv.ParseInt(s, 10, 64); return int(i), e}, - "substr": func(i, j int, s string,) string {return s[i:j]}, - "sub": func(a, b int) int {return a - b}, // TODO: really not builtin? + "parseInt": func(s string) (int, error) { i, e := strconv.ParseInt(s, 10, 64); return int(i), e }, + "substr": func(i, j int, s string) string { return s[i:j] }, + "sub": func(a, b int) int { return a - b }, // TODO: really not builtin? "param": func(string) string { return "" }, // replaced in request handler } @@ -248,7 +248,7 @@ func (h *handler) messages(filter string, args ...interface{}) (out []types.Mess filter = " where " + filter } - log.Info("select * from messages "+filter) + log.Info("select * from messages " + filter) rws, err := h.st.db.Query("select * from messages "+filter, args...) if err != nil { From 79028397ada0c005939a67c78e7630ad00624226 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 8 Jan 2020 17:29:46 +0100 Subject: [PATCH 12/15] chainwatch: sync in batches --- cmd/lotus-chainwatch/sync.go | 304 +++++++++++++++++++---------------- 1 file changed, 164 insertions(+), 140 deletions(-) diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index dd525249b..9464edc81 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -4,6 +4,7 @@ import ( "bytes" "container/list" "context" + "math" "sync" "github.com/filecoin-project/go-address" @@ -16,6 +17,8 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +const maxBatch = 10000 + func runSyncer(ctx context.Context, api api.FullNode, st *storage) { notifs, err := api.ChainNotify(ctx) if err != nil { @@ -68,7 +71,7 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Getting headers / actors") - toSync := map[cid.Cid]*types.BlockHeader{} + allToSync := map[cid.Cid]*types.BlockHeader{} toVisit := list.New() for _, header := range ts.Blocks() { @@ -79,15 +82,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) _, has := hazlist[bh.Cid()] - if _, seen := toSync[bh.Cid()]; seen || has { + if _, seen := allToSync[bh.Cid()]; seen || has { continue } - toSync[bh.Cid()] = bh + allToSync[bh.Cid()] = bh addresses[bh.Miner] = address.Undef - if len(toSync)%500 == 10 { - log.Infof("todo: (%d) %s", len(toSync), bh.Cid()) + if len(allToSync)%500 == 10 { + log.Infof("todo: (%d) %s @%d", len(allToSync), bh.Cid(), bh.Height) } if len(bh.Parents) == 0 { @@ -105,197 +108,218 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } } - log.Infof("Syncing %d blocks", len(toSync)) + for len(allToSync) > 0 { + minH := uint64(math.MaxUint64) - paDone := 0 - par(50, maparr(toSync), func(bh *types.BlockHeader) { - paDone++ - if paDone%100 == 0 { - log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync)) + for _, header := range allToSync { + if header.Height < minH { + minH = header.Height + } } - if len(bh.Parents) == 0 { // genesis case - ts, err := types.NewTipSet([]*types.BlockHeader{bh}) - aadrs, err := api.StateListActors(ctx, ts) + toSync := map[cid.Cid]*types.BlockHeader{} + for c, header := range allToSync { + if header.Height < minH+maxBatch { + toSync[c] = header + } + } + for c := range toSync { + delete(allToSync, c) + } + + log.Infof("Syncing %d blocks", len(toSync)) + + paDone := 0 + par(50, maparr(toSync), func(bh *types.BlockHeader) { + paDone++ + if paDone%100 == 0 { + log.Infof("pa: %d %d%%", paDone, (paDone*100)/len(toSync)) + } + + if len(bh.Parents) == 0 { // genesis case + ts, err := types.NewTipSet([]*types.BlockHeader{bh}) + aadrs, err := api.StateListActors(ctx, ts) + if err != nil { + log.Error(err) + return + } + + par(50, aadrs, func(addr address.Address) { + act, err := api.StateGetActor(ctx, addr, ts) + if err != nil { + log.Error(err) + return + } + alk.Lock() + _, ok := actors[addr] + if !ok { + actors[addr] = map[types.Actor]cid.Cid{} + } + actors[addr][*act] = bh.ParentStateRoot + addresses[addr] = address.Undef + alk.Unlock() + }) + + return + } + + pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) if err != nil { log.Error(err) return } - par(50, aadrs, func(addr address.Address) { - act, err := api.StateGetActor(ctx, addr, ts) + changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) + if err != nil { + log.Error(err) + return + } + + for a, act := range changes { + addr, err := address.NewFromString(a) if err != nil { log.Error(err) return } + alk.Lock() _, ok := actors[addr] if !ok { actors[addr] = map[types.Actor]cid.Cid{} } - actors[addr][*act] = bh.ParentStateRoot + actors[addr][act] = bh.ParentStateRoot addresses[addr] = address.Undef alk.Unlock() - }) + } + }) - return + log.Infof("Getting messages") + + msgs, incls := fetchMessages(ctx, api, toSync) + + log.Infof("Resolving addresses") + + for _, message := range msgs { + addresses[message.To] = address.Undef + addresses[message.From] = address.Undef } - pts, err := api.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) - if err != nil { - log.Error(err) - return + par(50, kmaparr(addresses), func(addr address.Address) { + raddr, err := api.StateLookupID(ctx, addr, nil) + if err != nil { + log.Warn(err) + return + } + alk.Lock() + addresses[addr] = raddr + alk.Unlock() + }) + + log.Infof("Getting miner info") + + miners := map[minerKey]*minerInfo{} + + for addr, m := range actors { + for actor, c := range m { + if actor.Code != actors2.StorageMinerCodeCid { + continue + } + + miners[minerKey{ + addr: addr, + act: actor, + stateroot: c, + }] = &minerInfo{} + } } - changes, err := api.StateChangedActors(ctx, pts.ParentState(), bh.ParentStateRoot) - if err != nil { - log.Error(err) - return - } + par(50, kvmaparr(miners), func(it func() (minerKey, *minerInfo)) { + k, info := it() - for a, act := range changes { - addr, err := address.NewFromString(a) + sszs, err := api.StateMinerSectorCount(ctx, k.addr, nil) + if err != nil { + log.Error(err) + return + } + info.psize = sszs.Pset + info.ssize = sszs.Sset + + astb, err := api.ChainReadObj(ctx, k.act.Head) if err != nil { log.Error(err) return } - alk.Lock() - _, ok := actors[addr] - if !ok { - actors[addr] = map[types.Actor]cid.Cid{} - } - actors[addr][act] = bh.ParentStateRoot - addresses[addr] = address.Undef - alk.Unlock() - } - }) - - log.Infof("Getting messages") - - msgs, incls := fetchMessages(ctx, api, toSync) - - log.Infof("Resolving addresses") - - for _, message := range msgs { - addresses[message.To] = address.Undef - addresses[message.From] = address.Undef - } - - par(50, kmaparr(addresses), func(addr address.Address) { - raddr, err := api.StateLookupID(ctx, addr, nil) - if err != nil { - log.Warn(err) - return - } - alk.Lock() - addresses[addr] = raddr - alk.Unlock() - }) - - log.Infof("Getting miner info") - - miners := map[minerKey]*minerInfo{} - - for addr, m := range actors { - for actor, c := range m { - if actor.Code != actors2.StorageMinerCodeCid { - continue + if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { + log.Error(err) + return } - miners[minerKey{ - addr: addr, - act: actor, - stateroot: c, - }] = &minerInfo{} - } - } + ib, err := api.ChainReadObj(ctx, info.state.Info) + if err != nil { + log.Error(err) + return + } - par(50, kvmaparr(miners), func(it func() (minerKey, *minerInfo)) { - k, info := it() + if err := info.info.UnmarshalCBOR(bytes.NewReader(ib)); err != nil { + log.Error(err) + return + } + }) - sszs, err := api.StateMinerSectorCount(ctx, k.addr, nil) - if err != nil { - log.Error(err) + log.Info("Getting receipts") + + receipts := fetchParentReceipts(ctx, api, toSync) + + log.Info("Storing headers") + + if err := st.storeHeaders(toSync, true); err != nil { + log.Errorf("%+v", err) return } - info.psize = sszs.Pset - info.ssize = sszs.Sset - astb, err := api.ChainReadObj(ctx, k.act.Head) - if err != nil { + log.Info("Storing address mapping") + + if err := st.storeAddressMap(addresses); err != nil { log.Error(err) return } - if err := info.state.UnmarshalCBOR(bytes.NewReader(astb)); err != nil { + log.Info("Storing actors") + + if err := st.storeActors(actors); err != nil { log.Error(err) return } - ib, err := api.ChainReadObj(ctx, info.state.Info) - if err != nil { + log.Info("Storing miners") + + if err := st.storeMiners(miners); err != nil { log.Error(err) return } - if err := info.info.UnmarshalCBOR(bytes.NewReader(ib)); err != nil { + log.Infof("Storing messages") + + if err := st.storeMessages(msgs); err != nil { log.Error(err) return } - }) - log.Info("Getting receipts") + log.Info("Storing message inclusions") - receipts := fetchParentReceipts(ctx, api, toSync) + if err := st.storeMsgInclusions(incls); err != nil { + log.Error(err) + return + } - log.Info("Storing headers") + log.Infof("Storing parent receipts") - if err := st.storeHeaders(toSync, true); err != nil { - log.Errorf("%+v", err) - return - } - - log.Info("Storing address mapping") - - if err := st.storeAddressMap(addresses); err != nil { - log.Error(err) - return - } - - log.Info("Storing actors") - - if err := st.storeActors(actors); err != nil { - log.Error(err) - return - } - - log.Info("Storing miners") - - if err := st.storeMiners(miners); err != nil { - log.Error(err) - return - } - - log.Infof("Storing messages") - - if err := st.storeMessages(msgs); err != nil { - log.Error(err) - return - } - - log.Info("Storing message inclusions") - - if err := st.storeMsgInclusions(incls); err != nil { - log.Error(err) - return - } - - log.Infof("Storing parent receipts") - - if err := st.storeReceipts(receipts); err != nil { - log.Error(err) - return + if err := st.storeReceipts(receipts); err != nil { + log.Error(err) + return + } + log.Infof("Sync stage done") } log.Infof("Sync done") From 719171faa3d7e288fd32ec87d61e6c9349ab003f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 13 Jan 2020 00:46:54 +0100 Subject: [PATCH 13/15] Set smaller batch size --- cmd/lotus-chainwatch/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 9464edc81..e851dbf9f 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -17,7 +17,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -const maxBatch = 10000 +const maxBatch = 3000 func runSyncer(ctx context.Context, api api.FullNode, st *storage) { notifs, err := api.ChainNotify(ctx) From d78a5e0a9f5bbc50f1f6c595702dbe2fd4b7a128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 13 Jan 2020 14:01:11 +0100 Subject: [PATCH 14/15] Decimal types for some miner columns --- cmd/lotus-chainwatch/storage.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index cede11e84..af74f2765 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -2,6 +2,7 @@ package main import ( "database/sql" + "fmt" "github.com/filecoin-project/lotus/api" "golang.org/x/xerrors" "sync" @@ -178,9 +179,9 @@ create table if not exists miner_heads addr text not null, stateroot text not null, sectorset text not null, - setsize bigint not null, + setsize decimal not null, provingset text not null, - provingsize bigint not null, + provingsize decimal not null, owner text not null, worker text not null, peerid text not null, @@ -290,9 +291,9 @@ create temp table mh (like miner_heads excluding constraints) on commit drop; k.addr.String(), k.stateroot.String(), i.state.Sectors.String(), - i.ssize, + fmt.Sprint(i.ssize), i.state.ProvingSet.String(), - i.psize, + fmt.Sprint(i.psize), i.info.Owner.String(), i.info.Worker.String(), i.info.PeerID.String(), From 14128819dee03d30b88d49425ee226c6809414f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 14 Jan 2020 06:17:31 +0100 Subject: [PATCH 15/15] Correctly handle multi parent blocks --- cmd/lotus-chainwatch/storage.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index af74f2765..9c8e7e7a2 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -53,9 +53,7 @@ create unique index if not exists blocks_synced_cid_uindex create table if not exists block_parents ( - block text not null - constraint block_parents_pk - primary key, + block text not null, parent text not null );
{{$msg | substr 54 62}} - {{$from := qstr "select \"from\" from messages where cid=?" $msg}} - {{$nonce := qstr "select nonce from messages where cid=?" $msg}} + {{$from := qstr "select \"from\" from messages where cid=$1" $msg}} + {{$nonce := qstr "select nonce from messages where cid=$1" $msg}} {{$from}} (N:{{$nonce}}) -> - {{$to := qstr "select \"to\" from messages where cid=?" $msg}} + {{$to := qstr "select \"to\" from messages where cid=$1" $msg}} {{$to}} - Method:{{qstr "select method from messages where cid=?" $msg}} + Method:{{qstr "select method from messages where cid=$1" $msg}} exit:{{index $rec 0}} gasUsed:{{index $rec 1}}
{{qstr `select count(distinct block_messages.message) from block_messages inner join blocks b on block_messages.block = b.cid - where b.height = ?` $h}} Msgs + where b.height = $1` $h}} Msgs - {{range strings "blocks" "cid" "height = ?" $h}} + {{range strings "blocks" "cid" "height = $1" $h}} {{. | substr 54 62}} {{end}}