chainwatch: Postgres

This commit is contained in:
Łukasz Magiera 2019-12-11 00:42:36 +01:00
parent ce6912047d
commit b6b06f67dc
6 changed files with 113 additions and 120 deletions

View File

@ -21,7 +21,7 @@ func subBlocks(ctx context.Context, api aapi.FullNode, st *storage) {
bh.Cid(): bh,
}, false)
if err != nil {
log.Error(err)
log.Errorf("%+v", err)
}
}
}

View File

@ -38,7 +38,7 @@ func main() {
&cli.StringFlag{
Name: "db",
EnvVars: []string{"LOTUS_DB"},
Value: "./chainwatch.db",
Value: "",
},
},

View File

@ -6,7 +6,7 @@ import (
"time"
"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
_ "github.com/lib/pq"
"github.com/filecoin-project/lotus/chain/address"
"github.com/filecoin-project/lotus/chain/types"
@ -19,7 +19,7 @@ type storage struct {
}
func openStorage(dbSource string) (*storage, error) {
db, err := sql.Open("sqlite3", dbSource)
db, err := sql.Open("postgres", dbSource)
if err != nil {
return nil, err
}
@ -35,33 +35,76 @@ func (st *storage) setup() error {
return err
}
_, err = tx.Exec(`
create table if not exists blocks_synced
(
cid text not null
constraint blocks_synced_pk
primary key,
add_ts int not null
);
create unique index if not exists blocks_synced_cid_uindex
on blocks_synced (cid);
create table if not exists block_parents
(
block text not null
constraint block_parents_pk
primary key,
parent text not null
);
create unique index if not exists block_parents_block_parent_uindex
on block_parents (block, parent);
create table if not exists blocks
(
cid text not null
constraint blocks_pk
primary key
constraint blocks_synced_blocks_cid_fk
references blocks_synced (cid)
constraint block_parents_blocks_cid_fk
references block_parents (block),
parentWeight numeric not null,
parentStateRoot text not null,
height int not null,
miner text not null,
timestamp int not null
);
create unique index if not exists block_cid_uindex
on blocks (cid);
create table if not exists id_address_map
(
id text not null,
address text not null,
constraint id_address_map_pk
primary key (id, address)
);
create unique index if not exists id_address_map_id_uindex
on id_address_map (id);
create unique index if not exists id_address_map_address_uindex
on id_address_map (address);
create table if not exists actors
(
id text not null,
id text not null
constraint id_address_map_actors_id_fk
references id_address_map (id),
code text not null,
head text not null,
nonce int not null,
balance text not null,
stateroot text
constraint actors_blocks_stateroot_fk
references blocks (parentStateRoot),
constraint actors_pk
primary key (id, nonce, balance, stateroot)
);
create index if not exists actors_id_index
on actors (id);
create table if not exists id_address_map
(
id text not null
constraint id_address_map_actors_id_fk
references actors (id),
address text not null,
constraint id_address_map_pk
primary key (id, address)
);
create index if not exists id_address_map_address_index
on id_address_map (address);
@ -84,57 +127,11 @@ create table if not exists messages
gasprice int not null,
gaslimit int not null,
method int,
params blob
params bytea
);
create unique index if not exists messages_cid_uindex
on messages (cid);
create table if not exists blocks
(
cid text not null
constraint blocks_pk
primary key,
parentWeight numeric not null,
parentStateRoot text not null,
height int not null,
miner text not null
constraint blocks_id_address_map_miner_fk
references id_address_map (address),
timestamp int not null
);
create unique index if not exists block_cid_uindex
on blocks (cid);
create table if not exists blocks_synced
(
cid text not null
constraint blocks_synced_pk
primary key
constraint blocks_synced_blocks_cid_fk
references blocks,
add_ts int not null
);
create unique index if not exists blocks_synced_cid_uindex
on blocks_synced (cid);
create table if not exists block_parents
(
block text not null
constraint block_parents_blocks_cid_fk
references blocks,
parent text not null
constraint block_parents_blocks_cid_fk_2
references blocks
);
create unique index if not exists block_parents_block_parent_uindex
on block_parents (block, parent);
create unique index if not exists blocks_cid_uindex
on blocks (cid);
create table if not exists block_messages
(
@ -166,13 +163,11 @@ create table if not exists receipts
msg text not null
constraint receipts_messages_cid_fk
references messages,
state text not null
constraint receipts_blocks_parentStateRoot_fk
references blocks (parentStateRoot),
state text not null,
idx int not null,
exit int not null,
gas_used int not null,
return blob,
return bytea,
constraint receipts_pk
primary key (msg, state)
);
@ -180,18 +175,11 @@ create table if not exists receipts
create index if not exists receipts_msg_state_index
on receipts (msg, state);
create table if not exists miner_heads
(
head text not null
constraint miner_heads_actors_head_fk
references actors (head),
addr text not null
constraint miner_heads_actors_id_fk
references actors (id),
stateroot text not null
constraint miner_heads_blocks_stateroot_fk
references blocks (parentStateRoot),
head text not null,
addr text not null,
stateroot text not null,
sectorset text not null,
provingset text not null,
owner text not null,
@ -219,7 +207,7 @@ create table if not exists miner_heads
func (st *storage) hasBlock(bh cid.Cid) bool {
var exitsts bool
err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=?)`, bh.String()).Scan(&exitsts)
err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=$1)`, bh.String()).Scan(&exitsts)
if err != nil {
log.Error(err)
return false
@ -233,7 +221,7 @@ func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Ci
return err
}
stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values (?, ?, ?, ?, ?, ?) on conflict do nothing`)
stmt, err := tx.Prepare(`insert into actors (id, code, head, nonce, balance, stateroot) values ($1, $2, $3, $4, $5, $6) on conflict do nothing`)
if err != nil {
return err
}
@ -255,7 +243,7 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
return err
}
stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, provingset, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`)
stmt, err := tx.Prepare(`insert into miner_heads (head, addr, stateroot, sectorset, provingset, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13) on conflict do nothing`)
if err != nil {
return err
}
@ -292,18 +280,7 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e
return err
}
stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values (?, ?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for _, bh := range bhs {
if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil {
return err
}
}
stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values (?, ?) on conflict do nothing`)
stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`)
if err != nil {
return err
}
@ -317,7 +294,7 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e
}
if sync {
stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values (?, ?) on conflict do nothing`)
stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`)
if err != nil {
return err
}
@ -331,6 +308,17 @@ func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) e
}
}
stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values ($1, $2, $3, $4, $5, $6) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for _, bh := range bhs {
if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil {
return err
}
}
return tx.Commit()
}
@ -340,7 +328,7 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
return err
}
stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) on conflict do nothing`)
stmt, err := tx.Prepare(`insert into messages (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9) on conflict do nothing`)
if err != nil {
return err
}
@ -371,7 +359,7 @@ func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
return err
}
stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES (?, ?, ?, ?, ?, ?) on conflict do nothing`)
stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES ($1,$2,$3,$4,$5,$6) on conflict do nothing`)
if err != nil {
return err
}
@ -399,7 +387,7 @@ func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) er
return err
}
stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES (?, ?) on conflict do nothing`)
stmt, err := tx.Prepare(`insert into id_address_map (id, address) VALUES ($1, $2) on conflict do nothing`)
if err != nil {
return err
}
@ -426,7 +414,7 @@ func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
return err
}
stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES (?, ?) on conflict do nothing`)
stmt, err := tx.Prepare(`insert into block_messages (block, message) VALUES ($1, $2) on conflict do nothing`)
if err != nil {
return err
}
@ -452,7 +440,7 @@ func (st *storage) storeMpoolInclusion(msg cid.Cid) error {
return err
}
stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES (?, ?) on conflict do nothing`)
stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES ($1, $2) on conflict do nothing`)
if err != nil {
return err
}

View File

@ -223,25 +223,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
msgs, incls := fetchMessages(ctx, api, toSync)
if err := st.storeMessages(msgs); err != nil {
log.Error(err)
return
}
if err := st.storeMsgInclusions(incls); err != nil {
log.Error(err)
return
}
log.Infof("Getting parent receipts")
receipts := fetchParentReceipts(ctx, api, toSync)
if err := st.storeReceipts(receipts); err != nil {
log.Error(err)
return
}
log.Infof("Resolving addresses")
for _, message := range msgs {
@ -265,6 +246,27 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
return
}
log.Infof("Persisting messages")
if err := st.storeMessages(msgs); err != nil {
log.Error(err)
return
}
if err := st.storeMsgInclusions(incls); err != nil {
log.Error(err)
return
}
log.Infof("Getting parent receipts")
receipts := fetchParentReceipts(ctx, api, toSync)
if err := st.storeReceipts(receipts); err != nil {
log.Error(err)
return
}
log.Infof("Sync done")
}

1
go.mod
View File

@ -46,6 +46,7 @@ require (
github.com/ipfs/go-unixfs v0.2.2-0.20190827150610-868af2e9e5cb
github.com/ipld/go-ipld-prime v0.0.2-0.20191025154717-8dff1cbec43b
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52
github.com/lib/pq v1.2.0
github.com/libp2p/go-libp2p v0.3.0
github.com/libp2p/go-libp2p-circuit v0.1.1
github.com/libp2p/go-libp2p-connmgr v0.1.0

2
go.sum
View File

@ -282,6 +282,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/libp2p/go-addr-util v0.0.1 h1:TpTQm9cXVRVSKsYbgQ7GKc3KbbHVTnbostgGaDEP+88=
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg0a8KiIvDp3TzQ=