chainwatch: Fix postgres data import

This commit is contained in:
Łukasz Magiera 2019-12-12 19:34:28 +01:00
parent e34e5b27ba
commit 04691a13da
3 changed files with 213 additions and 90 deletions

View File

@ -21,7 +21,7 @@ func subBlocks(ctx context.Context, api aapi.FullNode, st *storage) {
bh.Cid(): bh, bh.Cid(): bh,
}, false) }, false)
if err != nil { if err != nil {
log.Errorf("%+v", err) //log.Errorf("%+v", err)
} }
} }
} }

View File

@ -2,6 +2,7 @@ package main
import ( import (
"database/sql" "database/sql"
"golang.org/x/xerrors"
"sync" "sync"
"time" "time"
@ -64,16 +65,16 @@ create table if not exists blocks
primary key, primary key,
parentWeight numeric not null, parentWeight numeric not null,
parentStateRoot text not null, parentStateRoot text not null,
height int not null, height bigint not null,
miner text not null, miner text not null,
timestamp int not null, timestamp bigint not null,
vrfproof bytea, vrfproof bytea,
tickets int not null, tickets bigint not null,
eprof bytea, eprof bytea,
prand bytea, prand bytea,
ep0partial bytea, ep0partial bytea,
ep0sector int not null, ep0sector bigint not null,
ep0challangei int not null ep0challangei bigint not null
); );
create unique index if not exists block_cid_uindex create unique index if not exists block_cid_uindex
@ -119,12 +120,8 @@ create table if not exists messages
cid text not null cid text not null
constraint messages_pk constraint messages_pk
primary key, primary key,
"from" text not null "from" text not null,
constraint messages_id_address_map_from_fk "to" text not null,
references id_address_map (address),
"to" text not null
constraint messages_id_address_map_to_fk
references id_address_map (address),
nonce int not null, nonce int not null,
value text not null, value text not null,
gasprice int not null, gasprice int not null,
@ -138,12 +135,8 @@ create unique index if not exists messages_cid_uindex
create table if not exists block_messages create table if not exists block_messages
( (
block text not null block text not null,
constraint block_messages_blk_fk message text not null,
references blocks (cid),
message text not null
constraint block_messages_msg_fk
references messages,
constraint block_messages_pk constraint block_messages_pk
primary key (block, message) primary key (block, message)
); );
@ -163,9 +156,7 @@ create unique index if not exists mpool_messages_msg_uindex
create table if not exists receipts create table if not exists receipts
( (
msg text not null msg text not null,
constraint receipts_messages_cid_fk
references messages,
state text not null, state text not null,
idx int not null, idx int not null,
exit int not null, exit int not null,
@ -184,21 +175,17 @@ create table if not exists miner_heads
addr text not null, addr text not null,
stateroot text not null, stateroot text not null,
sectorset text not null, sectorset text not null,
setsize int not null, setsize bigint not null,
provingset text not null, provingset text not null,
provingsize int not null, provingsize bigint not null,
owner text not null, owner text not null,
worker text not null, worker text not null,
peerid text not null, peerid text not null,
sectorsize int not null, sectorsize bigint not null,
power text not null, power text not null,
active int, active bool,
ppe int not null, ppe bigint not null,
slashed_at int not null, slashed_at bigint not null,
constraint miner_heads_id_address_map_owner_fk
foreign key (owner) references id_address_map (address),
constraint miner_heads_id_address_map_worker_fk
foreign key (worker) references id_address_map (address),
constraint miner_heads_pk constraint miner_heads_pk
primary key (head, addr) primary key (head, addr)
); );
@ -242,12 +229,20 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Ci
if err != nil { if err != nil {
return err return err
} }
if _, err := tx.Exec(`
stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values ($1, $2, $3, $4, $5, $6) on conflict do nothing`) create temp table a (like actors excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy a (id, code, head, nonce, balance, stateroot) from stdin `)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for addr, acts := range actors { for addr, acts := range actors {
for act, st := range acts { for act, st := range acts {
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil { if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.String()); err != nil {
@ -256,6 +251,14 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Ci
} }
} }
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into actors select * from a on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit() return tx.Commit()
} }
@ -265,11 +268,19 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
return err return err
} }
stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15) on conflict do nothing`) if _, err := tx.Exec(`
create temp table mh (like miner_heads excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) from STDIN`)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for k, i := range miners { for k, i := range miners {
if _, err := stmt.Exec( if _, err := stmt.Exec(
k.act.Head.String(), k.act.Head.String(),
@ -291,6 +302,13 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
return err return err
} }
} }
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit() return tx.Commit()
} }
@ -301,14 +319,24 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return xerrors.Errorf("begin: %w", err)
} }
stmt, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`) if _, err := tx.Exec(`
create temp table tbp (like block_parents excluding constraints) on commit drop;
create temp table bs (like blocks_synced excluding constraints) on commit drop;
create temp table b (like blocks excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for _, bh := range bhs { for _, bh := range bhs {
for _, parent := range bh.Parents { for _, parent := range bh.Parents {
@ -318,27 +346,41 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e
} }
} }
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil {
return xerrors.Errorf("parent put: %w", err)
}
if sync { if sync {
now := time.Now().Unix() now := time.Now().Unix()
stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`) stmt, err := tx.Prepare(`copy bs (cid, add_ts) from stdin `)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for _, bh := range bhs { for _, bh := range bhs {
if _, err := tx.Exec(bh.Cid().String(), now); err != nil { if _, err := stmt.Exec(bh.Cid().String(), now); err != nil {
log.Error(err) log.Error(err)
} }
} }
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil {
return xerrors.Errorf("syncd put: %w", err)
}
} }
stmt2, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp", vrfproof, tickets, eprof, prand, ep0partial, ep0sector, ep0challangei) values ($1, $2, $3, $4, $5, $6,$7,$8,$9,$10,$11,$12,$13) on conflict do nothing`) stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", vrfproof, tickets, eprof, prand, ep0partial, ep0sector, ep0challangei) from stdin `)
if err != nil { if err != nil {
return err return err
} }
defer stmt2.Close()
for _, bh := range bhs { for _, bh := range bhs {
l := len(bh.EPostProof.Candidates) l := len(bh.EPostProof.Candidates)
@ -364,6 +406,18 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e
} }
} }
if err := stmt2.Close(); err != nil {
return xerrors.Errorf("s2 close: %w", err)
}
if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil {
return xerrors.Errorf("blk put: %w", err)
}
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit: %w", err)
}
return nil return nil
} }
@ -373,11 +427,19 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
return err return err
} }
stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) on conflict do nothing`) if _, err := tx.Exec(`
create temp table msgs (like messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy msgs (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) from stdin `)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for c, m := range msgs { for c, m := range msgs {
if _, err := stmt.Exec( if _, err := stmt.Exec(
@ -394,6 +456,13 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
return err return err
} }
} }
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into messages select * from msgs on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit() return tx.Commit()
} }
@ -404,11 +473,19 @@ func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
return err return err
} }
stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES ($1,$2,$3,$4,$5,$6) on conflict do nothing`) if _, err := tx.Exec(`
create temp table recs (like receipts excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy recs (msg, state, idx, exit, gas_used, return) from stdin `)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for c, m := range recs { for c, m := range recs {
if _, err := stmt.Exec( if _, err := stmt.Exec(
@ -422,6 +499,13 @@ func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
return err return err
} }
} }
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into receipts select * from recs on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit() return tx.Commit()
} }
@ -432,11 +516,19 @@ func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) er
return err return err
} }
stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES ($1, $2) on conflict do nothing`) if _, err := tx.Exec(`
create temp table iam (like id_address_map excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy iam (id, address) from STDIN `)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for a, i := range addrs { for a, i := range addrs {
if i == address.Undef { if i == address.Undef {
@ -449,6 +541,13 @@ func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) er
return err return err
} }
} }
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit() return tx.Commit()
} }
@ -459,11 +558,19 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
return err return err
} }
stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES ($1, $2) on conflict do nothing`) if _, err := tx.Exec(`
create temp table mi (like block_messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mi (block, message) from STDIN `)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for b, msgs := range incls { for b, msgs := range incls {
for _, msg := range msgs { for _, msg := range msgs {
@ -475,6 +582,13 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
} }
} }
} }
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into block_messages select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit() return tx.Commit()
} }

View File

@ -101,14 +101,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Syncing %d blocks", len(toSync)) log.Infof("Syncing %d blocks", len(toSync))
log.Infof("Persisting headers")
if err := st.storeHeaders(toSync, true); err != nil {
log.Error(err)
return
}
log.Infof("Persisting actors")
paDone := 0 paDone := 0
par(50, maparr(toSync), func(bh *types.BlockHeader) { par(50, maparr(toSync), func(bh *types.BlockHeader) {
paDone++ paDone++
@ -173,12 +165,29 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
} }
}) })
if err := st.storeActors(actors); err != nil { log.Infof("Getting messages")
log.Error(err)
return msgs, incls := fetchMessages(ctx, api, toSync)
log.Infof("Resolving addresses")
for _, message := range msgs {
addresses[message.To] = address.Undef
addresses[message.From] = address.Undef
} }
log.Infof("Persisting miners") par(50, kmaparr(addresses), func(addr address.Address) {
raddr, err := api.StateLookupID(ctx, addr, nil)
if err != nil {
log.Warn(err)
return
}
alk.Lock()
addresses[addr] = raddr
alk.Unlock()
})
log.Infof("Getting miner info")
miners := map[minerKey]*minerInfo{} miners := map[minerKey]*minerInfo{}
@ -230,53 +239,53 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
} }
}) })
if err := st.storeMiners(miners); err != nil { log.Info("Getting receipts")
log.Error(err)
receipts := fetchParentReceipts(ctx, api, toSync)
log.Info("Storing headers")
if err := st.storeHeaders(toSync, true); err != nil {
log.Errorf("%+v", err)
return return
} }
log.Infof("Getting messages") log.Info("Storing address mapping")
msgs, incls := fetchMessages(ctx, api, toSync)
log.Infof("Resolving addresses")
for _, message := range msgs {
addresses[message.To] = address.Undef
addresses[message.From] = address.Undef
}
par(50, kmaparr(addresses), func(addr address.Address) {
raddr, err := api.StateLookupID(ctx, addr, nil)
if err != nil {
log.Warn(err)
return
}
alk.Lock()
addresses[addr] = raddr
alk.Unlock()
})
if err := st.storeAddressMap(addresses); err != nil { if err := st.storeAddressMap(addresses); err != nil {
log.Error(err) log.Error(err)
return return
} }
log.Infof("Persisting messages") log.Info("Storing actors")
if err := st.storeActors(actors); err != nil {
log.Error(err)
return
}
log.Info("Storing miners")
if err := st.storeMiners(miners); err != nil {
log.Error(err)
return
}
log.Infof("Storing messages")
if err := st.storeMessages(msgs); err != nil { if err := st.storeMessages(msgs); err != nil {
log.Error(err) log.Error(err)
return return
} }
log.Info("Storing message inclusions")
if err := st.storeMsgInclusions(incls); err != nil { if err := st.storeMsgInclusions(incls); err != nil {
log.Error(err) log.Error(err)
return return
} }
log.Infof("Getting parent receipts") log.Infof("Storing parent receipts")
receipts := fetchParentReceipts(ctx, api, toSync)
if err := st.storeReceipts(receipts); err != nil { if err := st.storeReceipts(receipts); err != nil {
log.Error(err) log.Error(err)