From 04691a13dae09516bb68ee3fabef3663bd90ea51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 12 Dec 2019 19:34:28 +0100 Subject: [PATCH] chainwatch: Fix postgres data import --- cmd/lotus-chainwatch/blockssub.go | 2 +- cmd/lotus-chainwatch/storage.go | 214 +++++++++++++++++++++++------- cmd/lotus-chainwatch/sync.go | 87 ++++++------ 3 files changed, 213 insertions(+), 90 deletions(-) diff --git a/cmd/lotus-chainwatch/blockssub.go b/cmd/lotus-chainwatch/blockssub.go index c569f1885..2147639a3 100644 --- a/cmd/lotus-chainwatch/blockssub.go +++ b/cmd/lotus-chainwatch/blockssub.go @@ -21,7 +21,7 @@ func subBlocks(ctx context.Context, api aapi.FullNode, st *storage) { bh.Cid(): bh, }, false) if err != nil { - log.Errorf("%+v", err) + //log.Errorf("%+v", err) } } } diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index c0bed9f81..2376c6d64 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -2,6 +2,7 @@ package main import ( "database/sql" + "golang.org/x/xerrors" "sync" "time" @@ -64,16 +65,16 @@ create table if not exists blocks primary key, parentWeight numeric not null, parentStateRoot text not null, - height int not null, + height bigint not null, miner text not null, - timestamp int not null, + timestamp bigint not null, vrfproof bytea, - tickets int not null, + tickets bigint not null, eprof bytea, prand bytea, ep0partial bytea, - ep0sector int not null, - ep0challangei int not null + ep0sector bigint not null, + ep0challangei bigint not null ); create unique index if not exists block_cid_uindex @@ -119,12 +120,8 @@ create table if not exists messages cid text not null constraint messages_pk primary key, - "from" text not null - constraint messages_id_address_map_from_fk - references id_address_map (address), - "to" text not null - constraint messages_id_address_map_to_fk - references id_address_map (address), + "from" text not null, + "to" text not null, nonce int not null, value text not null, gasprice int not null, @@ -138,12 +135,8 @@ create unique index if not exists messages_cid_uindex create table if not exists block_messages ( - block text not null - constraint block_messages_blk_fk - references blocks (cid), - message text not null - constraint block_messages_msg_fk - references messages, + block text not null, + message text not null, constraint block_messages_pk primary key (block, message) ); @@ -163,9 +156,7 @@ create unique index if not exists mpool_messages_msg_uindex create table if not exists receipts ( - msg text not null - constraint receipts_messages_cid_fk - references messages, + msg text not null, state text not null, idx int not null, exit int not null, @@ -184,21 +175,17 @@ create table if not exists miner_heads addr text not null, stateroot text not null, sectorset text not null, - setsize int not null, + setsize bigint not null, provingset text not null, - provingsize int not null, + provingsize bigint not null, owner text not null, worker text not null, peerid text not null, - sectorsize int not null, + sectorsize bigint not null, power text not null, - active int, - ppe int not null, - slashed_at int not null, - constraint miner_heads_id_address_map_owner_fk - foreign key (owner) references id_address_map (address), - constraint miner_heads_id_address_map_worker_fk - foreign key (worker) references id_address_map (address), + active bool, + ppe bigint not null, + slashed_at bigint not null, constraint miner_heads_pk primary key (head, addr) ); @@ -242,12 +229,20 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Ci if err != nil { return err } + if _, err := tx.Exec(` - stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values ($1, $2, $3, $4, $5, $6) on conflict do nothing`) +create temp table a (like actors excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy a (id, code, head, nonce, balance, stateroot) from stdin `) if err != nil { return err } - defer stmt.Close() + for addr, acts := range actors { for act, st := range acts { if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil { @@ -256,6 +251,14 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Ci } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into actors select * from a on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + return tx.Commit() } @@ -265,11 +268,19 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { return err } - stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table mh (like miner_heads excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) from STDIN`) if err != nil { return err } - defer stmt.Close() for k, i := range miners { if _, err := stmt.Exec( k.act.Head.String(), @@ -291,6 +302,13 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error { return err } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } @@ -301,14 +319,24 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e tx, err := st.db.Begin() if err != nil { - return err + return xerrors.Errorf("begin: %w", err) } - stmt, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table tbp (like block_parents excluding constraints) on commit drop; +create temp table bs (like blocks_synced excluding constraints) on commit drop; +create temp table b (like blocks excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`) if err != nil { return err } - defer stmt.Close() for _, bh := range bhs { for _, parent := range bh.Parents { @@ -318,27 +346,41 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil { + return xerrors.Errorf("parent put: %w", err) + } + if sync { now := time.Now().Unix() - stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`) + stmt, err := tx.Prepare(`copy bs (cid, add_ts) from stdin `) if err != nil { return err } - defer stmt.Close() for _, bh := range bhs { - if _, err := tx.Exec(bh.Cid().String(), now); err != nil { + if _, err := stmt.Exec(bh.Cid().String(), now); err != nil { log.Error(err) } } + + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil { + return xerrors.Errorf("syncd put: %w", err) + } } - stmt2, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp", vrfproof, tickets, eprof, prand, ep0partial, ep0sector, ep0challangei) values ($1, $2, $3, $4, $5, $6,$7,$8,$9,$10,$11,$12,$13) on conflict do nothing`) + stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", vrfproof, tickets, eprof, prand, ep0partial, ep0sector, ep0challangei) from stdin `) if err != nil { return err } - defer stmt2.Close() for _, bh := range bhs { l := len(bh.EPostProof.Candidates) @@ -364,6 +406,18 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e } } + if err := stmt2.Close(); err != nil { + return xerrors.Errorf("s2 close: %w", err) + } + + if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil { + return xerrors.Errorf("blk put: %w", err) + } + + err = tx.Commit() + if err != nil { + return xerrors.Errorf("commit: %w", err) + } return nil } @@ -373,11 +427,19 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { return err } - stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table msgs (like messages excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy msgs (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) from stdin `) if err != nil { return err } - defer stmt.Close() for c, m := range msgs { if _, err := stmt.Exec( @@ -394,6 +456,13 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { return err } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into messages select * from msgs on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } @@ -404,11 +473,19 @@ func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error { return err } - stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES ($1,$2,$3,$4,$5,$6) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table recs (like receipts excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy recs (msg, state, idx, exit, gas_used, return) from stdin `) if err != nil { return err } - defer stmt.Close() for c, m := range recs { if _, err := stmt.Exec( @@ -422,6 +499,13 @@ func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error { return err } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into receipts select * from recs on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } @@ -432,11 +516,19 @@ func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) er return err } - stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES ($1, $2) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table iam (like id_address_map excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy iam (id, address) from STDIN `) if err != nil { return err } - defer stmt.Close() for a, i := range addrs { if i == address.Undef { @@ -449,6 +541,13 @@ func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) er return err } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } @@ -459,11 +558,19 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { return err } - stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES ($1, $2) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table mi (like block_messages excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mi (block, message) from STDIN `) if err != nil { return err } - defer stmt.Close() for b, msgs := range incls { for _, msg := range msgs { @@ -475,6 +582,13 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error { } } } + if err := stmt.Close(); err != nil { + return err + } + + if _, err := tx.Exec(`insert into block_messages select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } return tx.Commit() } diff --git a/cmd/lotus-chainwatch/sync.go b/cmd/lotus-chainwatch/sync.go index 666f82a15..0854cc802 100644 --- a/cmd/lotus-chainwatch/sync.go +++ b/cmd/lotus-chainwatch/sync.go @@ -101,14 +101,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS log.Infof("Syncing %d blocks", len(toSync)) - log.Infof("Persisting headers") - if err := st.storeHeaders(toSync, true); err != nil { - log.Error(err) - return - } - - log.Infof("Persisting actors") - paDone := 0 par(50, maparr(toSync), func(bh *types.BlockHeader) { paDone++ @@ -173,12 +165,29 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } }) - if err := st.storeActors(actors); err != nil { - log.Error(err) - return + log.Infof("Getting messages") + + msgs, incls := fetchMessages(ctx, api, toSync) + + log.Infof("Resolving addresses") + + for _, message := range msgs { + addresses[message.To] = address.Undef + addresses[message.From] = address.Undef } - log.Infof("Persisting miners") + par(50, kmaparr(addresses), func(addr address.Address) { + raddr, err := api.StateLookupID(ctx, addr, nil) + if err != nil { + log.Warn(err) + return + } + alk.Lock() + addresses[addr] = raddr + alk.Unlock() + }) + + log.Infof("Getting miner info") miners := map[minerKey]*minerInfo{} @@ -230,53 +239,53 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS } }) - if err := st.storeMiners(miners); err != nil { - log.Error(err) + log.Info("Getting receipts") + + receipts := fetchParentReceipts(ctx, api, toSync) + + log.Info("Storing headers") + + if err := st.storeHeaders(toSync, true); err != nil { + log.Errorf("%+v", err) return } - log.Infof("Getting messages") - - msgs, incls := fetchMessages(ctx, api, toSync) - - log.Infof("Resolving addresses") - - for _, message := range msgs { - addresses[message.To] = address.Undef - addresses[message.From] = address.Undef - } - - par(50, kmaparr(addresses), func(addr address.Address) { - raddr, err := api.StateLookupID(ctx, addr, nil) - if err != nil { - log.Warn(err) - return - } - alk.Lock() - addresses[addr] = raddr - alk.Unlock() - }) + log.Info("Storing address mapping") if err := st.storeAddressMap(addresses); err != nil { log.Error(err) return } - log.Infof("Persisting messages") + log.Info("Storing actors") + + if err := st.storeActors(actors); err != nil { + log.Error(err) + return + } + + log.Info("Storing miners") + + if err := st.storeMiners(miners); err != nil { + log.Error(err) + return + } + + log.Infof("Storing messages") if err := st.storeMessages(msgs); err != nil { log.Error(err) return } + log.Info("Storing message inclusions") + if err := st.storeMsgInclusions(incls); err != nil { log.Error(err) return } - log.Infof("Getting parent receipts") - - receipts := fetchParentReceipts(ctx, api, toSync) + log.Infof("Storing parent receipts") if err := st.storeReceipts(receipts); err != nil { log.Error(err)