lotus/cmd/lotus-chainwatch/storage.go

778 lines
16 KiB
Go
Raw Normal View History

2019-11-15 16:38:56 +00:00
package main
import (
"database/sql"
2020-01-20 00:49:52 +00:00
"encoding/hex"
2020-01-13 13:01:11 +00:00
"fmt"
2019-11-19 12:57:16 +00:00
"sync"
"time"
2019-11-15 18:37:57 +00:00
2020-01-20 00:49:52 +00:00
"github.com/filecoin-project/go-address"
2019-11-15 16:38:56 +00:00
"github.com/ipfs/go-cid"
2019-12-10 23:42:36 +00:00
_ "github.com/lib/pq"
2020-01-20 00:49:52 +00:00
"golang.org/x/xerrors"
2019-11-15 18:37:57 +00:00
2020-01-20 00:49:52 +00:00
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/actors"
2019-11-15 18:37:57 +00:00
"github.com/filecoin-project/lotus/chain/types"
2019-11-15 16:38:56 +00:00
)
type storage struct {
db *sql.DB
2019-11-19 12:57:16 +00:00
headerLk sync.Mutex
2019-11-15 16:38:56 +00:00
}
2019-12-05 11:58:19 +00:00
func openStorage(dbSource string) (*storage, error) {
2019-12-10 23:42:36 +00:00
db, err := sql.Open("postgres", dbSource)
2019-11-15 16:38:56 +00:00
if err != nil {
return nil, err
}
2019-12-13 09:30:51 +00:00
db.SetMaxOpenConns(1350)
2019-11-15 16:38:56 +00:00
st := &storage{db: db}
return st, st.setup()
}
func (st *storage) setup() error {
tx, err := st.db.Begin()
if err != nil {
return err
}
_, err = tx.Exec(`
2019-12-10 23:42:36 +00:00
create table if not exists blocks_synced
(
cid text not null
constraint blocks_synced_pk
primary key,
add_ts int not null
);
create unique index if not exists blocks_synced_cid_uindex
on blocks_synced (cid);
create table if not exists block_parents
(
2020-01-14 05:17:31 +00:00
block text not null,
2019-12-10 23:42:36 +00:00
parent text not null
);
create unique index if not exists block_parents_block_parent_uindex
on block_parents (block, parent);
create table if not exists blocks
(
cid text not null
constraint blocks_pk
2019-12-11 22:17:44 +00:00
primary key,
2019-12-10 23:42:36 +00:00
parentWeight numeric not null,
parentStateRoot text not null,
2019-12-12 18:34:28 +00:00
height bigint not null,
2019-12-10 23:42:36 +00:00
miner text not null,
2019-12-12 18:34:28 +00:00
timestamp bigint not null,
vrfproof bytea,
2019-12-12 18:34:28 +00:00
tickets bigint not null,
eprof bytea,
prand bytea,
ep0partial bytea,
2020-01-19 16:18:47 +00:00
ep0sector numeric not null,
ep0challangei numeric not null
2019-12-10 23:42:36 +00:00
);
create unique index if not exists block_cid_uindex
on blocks (cid);
create table if not exists id_address_map
(
id text not null,
address text not null,
constraint id_address_map_pk
primary key (id, address)
);
create unique index if not exists id_address_map_id_uindex
on id_address_map (id);
create unique index if not exists id_address_map_address_uindex
on id_address_map (address);
create table if not exists actors
2019-11-15 18:37:57 +00:00
(
2019-12-10 23:42:36 +00:00
id text not null
constraint id_address_map_actors_id_fk
references id_address_map (id),
2019-11-15 18:37:57 +00:00
code text not null,
head text not null,
nonce int not null,
balance text not null,
stateroot text
2019-11-15 18:37:57 +00:00
);
create index if not exists actors_id_index
on actors (id);
2019-11-15 18:37:57 +00:00
create index if not exists id_address_map_address_index
on id_address_map (address);
create index if not exists id_address_map_id_index
on id_address_map (id);
2020-01-19 16:18:47 +00:00
create table if not exists actor_states
(
head text not null,
code text not null,
state json not null
);
create unique index if not exists actor_states_head_code_uindex
on actor_states (head, code);
create index if not exists actor_states_head_index
on actor_states (head);
create index if not exists actor_states_code_head_index
on actor_states (head, code);
create table if not exists messages
2019-11-15 16:38:56 +00:00
(
cid text not null
constraint messages_pk
primary key,
2019-12-12 18:34:28 +00:00
"from" text not null,
"to" text not null,
2020-01-19 16:18:47 +00:00
nonce bigint not null,
2019-11-15 16:38:56 +00:00
value text not null,
2020-01-19 16:18:47 +00:00
gasprice bigint not null,
gaslimit bigint not null,
method bigint,
2019-12-10 23:42:36 +00:00
params bytea
2019-11-15 16:38:56 +00:00
);
create unique index if not exists messages_cid_uindex
2019-11-15 16:38:56 +00:00
on messages (cid);
create table if not exists block_messages
2019-11-15 16:38:56 +00:00
(
2019-12-12 18:34:28 +00:00
block text not null,
message text not null,
2019-11-15 16:38:56 +00:00
constraint block_messages_pk
primary key (block, message)
);
create table if not exists mpool_messages
(
msg text not null
constraint mpool_messages_pk
primary key
constraint mpool_messages_messages_cid_fk
references messages,
add_ts int not null
);
create unique index if not exists mpool_messages_msg_uindex
on mpool_messages (msg);
2019-12-03 11:05:12 +00:00
create table if not exists receipts
(
2019-12-12 18:34:28 +00:00
msg text not null,
2019-12-10 23:42:36 +00:00
state text not null,
2019-12-03 11:05:12 +00:00
idx int not null,
exit int not null,
gas_used int not null,
2019-12-10 23:42:36 +00:00
return bytea,
2019-12-03 11:05:12 +00:00
constraint receipts_pk
primary key (msg, state)
);
create index if not exists receipts_msg_state_index
on receipts (msg, state);
create table if not exists miner_heads
(
2019-12-10 23:42:36 +00:00
head text not null,
addr text not null,
stateroot text not null,
sectorset text not null,
2020-01-13 13:01:11 +00:00
setsize decimal not null,
provingset text not null,
2020-01-13 13:01:11 +00:00
provingsize decimal not null,
owner text not null,
worker text not null,
peerid text not null,
2019-12-12 18:34:28 +00:00
sectorsize bigint not null,
2020-01-19 16:18:47 +00:00
power decimal not null,
2019-12-12 18:34:28 +00:00
active bool,
ppe bigint not null,
slashed_at bigint not null,
constraint miner_heads_pk
primary key (head, addr)
2019-11-15 16:38:56 +00:00
);
2020-01-20 00:49:52 +00:00
create table if not exists deals
(
id int not null,
pieceRef text not null,
pieceSize bigint not null,
client text not null,
provider text not null,
expiration decimal not null,
duration decimal not null,
epochPrice decimal not null,
collateral decimal not null,
constraint deals_pk
primary key (id)
);
create index if not exists deals_client_index
on deals (client);
create unique index if not exists deals_id_uindex
on deals (id);
create index if not exists deals_pieceRef_index
on deals (pieceRef);
create index if not exists deals_provider_index
on deals (provider);
2019-11-15 16:38:56 +00:00
`)
if err != nil {
return err
}
return tx.Commit()
}
2019-12-11 22:17:44 +00:00
func (st *storage) hasList() map[cid.Cid]struct{} {
rws, err := st.db.Query(`select cid FROM blocks_synced`)
2019-11-15 16:38:56 +00:00
if err != nil {
log.Error(err)
2019-12-11 22:17:44 +00:00
return map[cid.Cid]struct{}{}
2019-11-15 16:38:56 +00:00
}
2019-12-11 22:17:44 +00:00
out := map[cid.Cid]struct{}{}
for rws.Next() {
var c string
if err := rws.Scan(&c); err != nil {
log.Error(err)
continue
}
ci, err := cid.Parse(c)
if err != nil {
log.Error(err)
continue
}
out[ci] = struct{}{}
}
return out
2019-11-15 16:38:56 +00:00
}
2020-01-19 16:18:47 +00:00
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]actorInfo) error {
// Basic
2019-11-15 16:38:56 +00:00
tx, err := st.db.Begin()
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _, err := tx.Exec(`
2020-01-19 16:18:47 +00:00
create temp table a (like actors excluding constraints) on commit drop;
`); err != nil {
2019-12-12 18:34:28 +00:00
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy a (id, code, head, nonce, balance, stateroot) from stdin `)
2019-11-15 18:37:57 +00:00
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
2019-11-15 18:37:57 +00:00
for addr, acts := range actors {
for act, st := range acts {
2020-01-19 16:18:47 +00:00
if _, err := stmt.Exec(addr.String(), act.Code.String(), act.Head.String(), act.Nonce, act.Balance.String(), st.stateroot.String()); err != nil {
2019-11-15 18:37:57 +00:00
return err
}
}
}
2019-12-12 18:34:28 +00:00
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into actors select * from a on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
2020-01-19 16:18:47 +00:00
if err := tx.Commit(); err != nil {
return err
}
// States
tx, err = st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table a (like actor_states excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err = tx.Prepare(`copy a (head, code, state) from stdin `)
if err != nil {
return err
}
for _, acts := range actors {
for act, st := range acts {
if _, err := stmt.Exec(act.Head.String(), act.Code.String(), st.state); err != nil {
return err
}
}
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
if err := tx.Commit(); err != nil {
return err
}
return nil
2019-11-15 18:37:57 +00:00
}
func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _, err := tx.Exec(`
create temp table mh (like miner_heads excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mh (head, addr, stateroot, sectorset, setsize, provingset, provingsize, owner, worker, peerid, sectorsize, power, active, ppe, slashed_at) from STDIN`)
if err != nil {
return err
}
for k, i := range miners {
if _, err := stmt.Exec(
k.act.Head.String(),
k.addr.String(),
k.stateroot.String(),
i.state.Sectors.String(),
2020-01-13 13:01:11 +00:00
fmt.Sprint(i.ssize),
i.state.ProvingSet.String(),
2020-01-13 13:01:11 +00:00
fmt.Sprint(i.psize),
i.info.Owner.String(),
i.info.Worker.String(),
i.info.PeerID.String(),
i.info.SectorSize,
i.state.Power.String(),
i.state.Active,
i.state.ElectionPeriodStart,
i.state.SlashedAt,
); err != nil {
return err
}
}
2019-12-12 18:34:28 +00:00
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into miner_heads select * from mh on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
2019-11-19 12:57:16 +00:00
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
st.headerLk.Lock()
defer st.headerLk.Unlock()
2019-11-15 18:37:57 +00:00
tx, err := st.db.Begin()
if err != nil {
2019-12-12 18:34:28 +00:00
return xerrors.Errorf("begin: %w", err)
2019-11-15 18:37:57 +00:00
}
2019-12-12 18:34:28 +00:00
if _, err := tx.Exec(`
create temp table tbp (like block_parents excluding constraints) on commit drop;
create temp table bs (like blocks_synced excluding constraints) on commit drop;
create temp table b (like blocks excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy tbp (block, parent) from STDIN`)
2019-11-15 16:38:56 +00:00
if err != nil {
return err
}
2019-11-15 18:37:57 +00:00
for _, bh := range bhs {
for _, parent := range bh.Parents {
if _, err := stmt.Exec(bh.Cid().String(), parent.String()); err != nil {
2019-12-11 22:17:44 +00:00
log.Error(err)
}
}
}
2019-12-12 18:34:28 +00:00
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into block_parents select * from tbp on conflict do nothing `); err != nil {
return xerrors.Errorf("parent put: %w", err)
}
2019-11-19 12:57:16 +00:00
if sync {
now := time.Now().Unix()
2019-12-12 18:34:28 +00:00
stmt, err := tx.Prepare(`copy bs (cid, add_ts) from stdin `)
if err != nil {
return err
}
for _, bh := range bhs {
2019-12-12 18:34:28 +00:00
if _, err := stmt.Exec(bh.Cid().String(), now); err != nil {
2019-12-11 22:17:44 +00:00
log.Error(err)
2019-11-19 12:57:16 +00:00
}
}
2019-12-12 18:34:28 +00:00
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into blocks_synced select * from bs on conflict do nothing `); err != nil {
return xerrors.Errorf("syncd put: %w", err)
}
2019-11-19 12:57:16 +00:00
}
2019-12-12 18:34:28 +00:00
stmt2, err := tx.Prepare(`copy b (cid, parentWeight, parentStateRoot, height, miner, "timestamp", vrfproof, tickets, eprof, prand, ep0partial, ep0sector, ep0challangei) from stdin `)
if err != nil {
return err
}
2019-11-15 16:38:56 +00:00
for _, bh := range bhs {
2019-12-12 13:53:38 +00:00
l := len(bh.EPostProof.Candidates)
if len(bh.EPostProof.Candidates) == 0 {
bh.EPostProof.Candidates = append(bh.EPostProof.Candidates, types.EPostTicket{})
}
if _, err := stmt2.Exec(
bh.Cid().String(),
2019-12-12 13:53:38 +00:00
bh.ParentWeight.String(),
bh.ParentStateRoot.String(),
bh.Height,
bh.Miner.String(),
bh.Timestamp,
bh.Ticket.VRFProof,
l,
bh.EPostProof.Proof,
bh.EPostProof.PostRand,
bh.EPostProof.Candidates[0].Partial,
bh.EPostProof.Candidates[0].SectorID,
bh.EPostProof.Candidates[0].ChallengeIndex); err != nil {
2019-12-11 22:17:44 +00:00
log.Error(err)
2019-12-10 23:42:36 +00:00
}
2019-11-15 16:38:56 +00:00
}
2019-12-10 23:42:36 +00:00
2019-12-12 18:34:28 +00:00
if err := stmt2.Close(); err != nil {
return xerrors.Errorf("s2 close: %w", err)
}
if _, err := tx.Exec(`insert into blocks select * from b on conflict do nothing `); err != nil {
return xerrors.Errorf("blk put: %w", err)
}
err = tx.Commit()
if err != nil {
return xerrors.Errorf("commit: %w", err)
}
2019-12-11 22:17:44 +00:00
return nil
2019-11-15 16:38:56 +00:00
}
func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _, err := tx.Exec(`
create temp table msgs (like messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy msgs (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) from stdin `)
2019-11-15 16:38:56 +00:00
if err != nil {
return err
}
for c, m := range msgs {
if _, err := stmt.Exec(
c.String(),
m.From.String(),
m.To.String(),
m.Nonce,
m.Value.String(),
m.GasPrice.String(),
m.GasLimit.String(),
m.Method,
m.Params,
); err != nil {
return err
}
}
2019-12-12 18:34:28 +00:00
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into messages select * from msgs on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
2019-11-15 16:38:56 +00:00
return tx.Commit()
}
2019-12-03 11:05:12 +00:00
func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _, err := tx.Exec(`
create temp table recs (like receipts excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy recs (msg, state, idx, exit, gas_used, return) from stdin `)
2019-12-03 11:05:12 +00:00
if err != nil {
return err
}
for c, m := range recs {
if _, err := stmt.Exec(
c.msg.String(),
c.state.String(),
c.idx,
m.ExitCode,
m.GasUsed.String(),
m.Return,
); err != nil {
return err
}
}
2019-12-12 18:34:28 +00:00
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into receipts select * from recs on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
2019-12-03 11:05:12 +00:00
return tx.Commit()
}
2019-11-15 18:37:57 +00:00
func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _, err := tx.Exec(`
create temp table iam (like id_address_map excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy iam (id, address) from STDIN `)
2019-11-15 18:37:57 +00:00
if err != nil {
return err
}
for a, i := range addrs {
if i == address.Undef {
continue
}
if _, err := stmt.Exec(
i.String(),
a.String(),
); err != nil {
return err
}
}
2019-12-12 18:34:28 +00:00
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into id_address_map select * from iam on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
2019-11-15 18:37:57 +00:00
return tx.Commit()
}
2019-11-15 16:38:56 +00:00
func (st *storage) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
2019-12-12 18:34:28 +00:00
if _, err := tx.Exec(`
create temp table mi (like block_messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mi (block, message) from STDIN `)
2019-11-15 16:38:56 +00:00
if err != nil {
return err
}
for b, msgs := range incls {
for _, msg := range msgs {
if _, err := stmt.Exec(
b.String(),
msg.String(),
); err != nil {
return err
}
}
}
2019-12-12 18:34:28 +00:00
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into block_messages select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
2019-11-15 16:38:56 +00:00
return tx.Commit()
}
2019-12-13 11:04:24 +00:00
func (st *storage) storeMpoolInclusions(msgs []api.MpoolUpdate) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
2019-12-13 11:04:24 +00:00
if _, err := tx.Exec(`
2020-01-20 00:49:52 +00:00
create temp table mi (like mpool_messages excluding constraints) on commit drop;
`); err != nil {
2019-12-13 11:04:24 +00:00
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mi (msg, add_ts) from stdin `)
if err != nil {
return err
}
2019-12-13 11:04:24 +00:00
for _, msg := range msgs {
if msg.Type != api.MpoolAdd {
continue
}
if _, err := stmt.Exec(
msg.Message.Message.Cid().String(),
time.Now().Unix(),
); err != nil {
return err
}
}
if err := stmt.Close(); err != nil {
return err
}
2019-12-13 11:04:24 +00:00
if _, err := tx.Exec(`insert into mpool_messages select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
2020-01-20 00:49:52 +00:00
func (st *storage) storeDeals(deals map[string]actors.OnChainDeal) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
if _, err := tx.Exec(`
create temp table d (like deals excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy d (id, pieceref, piecesize, client, "provider", expiration, duration, epochprice, collateral) from stdin `)
if err != nil {
return err
}
var bloat uint64
for id, deal := range deals {
if len(deal.PieceRef) > 40 {
bloat += uint64(len(deal.PieceRef))
continue
}
if _, err := stmt.Exec(
id,
hex.EncodeToString(deal.PieceRef),
deal.PieceSize,
deal.Client.String(),
deal.Provider.String(),
fmt.Sprint(deal.ProposalExpiration),
fmt.Sprint(deal.Duration),
deal.StoragePricePerEpoch.String(),
deal.StorageCollateral.String(),
); err != nil {
return err
}
}
if bloat > 0 {
log.Warnf("deal PieceRefs had %d bytes of garbage", bloat)
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into deals select * from d on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit()
}
2019-11-15 16:38:56 +00:00
func (st *storage) close() error {
return st.db.Close()
}