This commit is contained in:
Łukasz Magiera 2019-12-11 23:17:44 +01:00
parent 830f2cbdd3
commit 66ee9d209d
4 changed files with 89 additions and 50 deletions

View File

@ -30,6 +30,8 @@ var dotCmd = &cli.Command{
fmt.Println("digraph D {") fmt.Println("digraph D {")
hl := st.hasList()
for res.Next() { for res.Next() {
var block, parent, miner string var block, parent, miner string
var height uint64 var height uint64
@ -42,7 +44,7 @@ var dotCmd = &cli.Command{
return err return err
} }
has := st.hasBlock(bc) _, has := hl[bc]
col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0xc0c0c0c0 + 0x30303030 col := crc32.Checksum([]byte(miner), crc32.MakeTable(crc32.Castagnoli))&0xc0c0c0c0 + 0x30303030

View File

@ -26,7 +26,7 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) {
change.Message.Message.Cid(): &change.Message.Message, change.Message.Message.Cid(): &change.Message.Message,
}) })
if err != nil { if err != nil {
log.Error(err) //log.Error(err)
continue continue
} }

View File

@ -61,11 +61,7 @@ create table if not exists blocks
( (
cid text not null cid text not null
constraint blocks_pk constraint blocks_pk
primary key primary key,
constraint blocks_synced_blocks_cid_fk
references blocks_synced (cid)
constraint block_parents_blocks_cid_fk
references block_parents (block),
parentWeight numeric not null, parentWeight numeric not null,
parentStateRoot text not null, parentStateRoot text not null,
height int not null, height int not null,
@ -205,14 +201,31 @@ create table if not exists miner_heads
return tx.Commit() return tx.Commit()
} }
func (st *storage) hasBlock(bh cid.Cid) bool { func (st *storage) hasList() map[cid.Cid]struct{} {
var exitsts bool rws, err := st.db.Query(`select cid FROM blocks_synced`)
err := st.db.QueryRow(`select exists (select 1 FROM blocks_synced where cid=$1)`, bh.String()).Scan(&exitsts)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
return false return map[cid.Cid]struct{}{}
} }
return exitsts out := map[cid.Cid]struct{}{}
for rws.Next() {
var c string
if err := rws.Scan(&c); err != nil {
log.Error(err)
continue
}
ci, err := cid.Parse(c)
if err != nil {
log.Error(err)
continue
}
out[ci] = struct{}{}
}
return out
} }
func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error { func (st *storage) storeActors(actors map[address.Address]map[types.Actor]cid.Cid) error {
@ -271,55 +284,74 @@ func (st *storage) storeMiners(miners map[minerKey]*minerInfo) error {
return tx.Commit() return tx.Commit()
} }
func (st *storage) batch(n int) chan *sql.Tx {
out := make(chan *sql.Tx, n)
for i := 0; i < n; i++ {
tx, err := st.db.Begin()
if err != nil {
log.Error(err)
}
out <- tx
}
return out
}
func endbatch(b chan *sql.Tx) {
n := len(b)
for i := 0; i < n; i++ {
tx := <- b
if err := tx.Commit(); err != nil {
log.Error(err)
}
}
}
func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error { func (st *storage) storeHeaders(bhs map[cid.Cid]*types.BlockHeader, sync bool) error {
st.headerLk.Lock() st.headerLk.Lock()
defer st.headerLk.Unlock() defer st.headerLk.Unlock()
tx, err := st.db.Begin() tb := st.batch(50)
if err != nil {
return err
}
stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`) par(100, maparr(bhs), func(bh *types.BlockHeader) {
if err != nil {
return err
}
defer stmt2.Close()
for _, bh := range bhs {
for _, parent := range bh.Parents { for _, parent := range bh.Parents {
if _, err := stmt2.Exec(bh.Cid().String(), parent.String()); err != nil { log.Info("bps ", bh.Cid())
tx := <-tb
stmt2, err := tx.Prepare(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`)
if err != nil {
return err return err
} }
if _, err := tx.Exec(`insert into block_parents (block, parent) values ($1, $2) on conflict do nothing`, bh.Cid().String(), parent.String()); err != nil {
log.Error(err)
}
tb <- tx
} }
} })
if sync { if sync {
stmt, err := tx.Prepare(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
now := time.Now().Unix() now := time.Now().Unix()
for _, bh := range bhs { par(50, maparr(bhs), func(bh *types.BlockHeader) {
if _, err := stmt.Exec(bh.Cid().String(), now); err != nil { tx := <-tb
return err if _, err := tx.Exec(`insert into blocks_synced (cid, add_ts) values ($1, $2) on conflict do nothing`, bh.Cid().String(), now); err != nil {
log.Error(err)
} }
} tb <- tx
})
} }
stmt, err := tx.Prepare(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values ($1, $2, $3, $4, $5, $6) on conflict do nothing`) par(50, maparr(bhs), func(bh *types.BlockHeader) {
if err != nil { log.Info("bh", bh.Cid())
return err tx := <-tb
} if _, err := tx.Exec(`insert into blocks (cid, parentWeight, parentStateRoot, height, miner, "timestamp") values ($1, $2, $3, $4, $5, $6) on conflict do nothing`, bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil {
defer stmt.Close() log.Error(err)
for _, bh := range bhs {
if _, err := stmt.Exec(bh.Cid().String(), bh.ParentWeight.String(), bh.ParentStateRoot.String(), bh.Height, bh.Miner.String(), bh.Timestamp); err != nil {
return err
} }
} tb <- tx
})
return tx.Commit() endbatch(tb)
return nil
} }
func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error { func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {

View File

@ -53,6 +53,10 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
actors := map[address.Address]map[types.Actor]cid.Cid{} actors := map[address.Address]map[types.Actor]cid.Cid{}
var alk sync.Mutex var alk sync.Mutex
log.Infof("Getting synced block list")
hazlist := st.hasList()
log.Infof("Getting headers / actors") log.Infof("Getting headers / actors")
toSync := map[cid.Cid]*types.BlockHeader{} toSync := map[cid.Cid]*types.BlockHeader{}
@ -65,7 +69,8 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
for toVisit.Len() > 0 { for toVisit.Len() > 0 {
bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader)
if _, seen := toSync[bh.Cid()]; seen || st.hasBlock(bh.Cid()) { _, has := hazlist[bh.Cid()]
if _, seen := toSync[bh.Cid()]; seen || has {
continue continue
} }
@ -93,6 +98,12 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
log.Infof("Syncing %d blocks", len(toSync)) log.Infof("Syncing %d blocks", len(toSync))
log.Infof("Persisting headers")
if err := st.storeHeaders(toSync, true); err != nil {
log.Error(err)
return
}
log.Infof("Persisting actors") log.Infof("Persisting actors")
paDone := 0 paDone := 0
@ -213,12 +224,6 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
return return
} }
log.Infof("Persisting headers")
if err := st.storeHeaders(toSync, true); err != nil {
log.Error(err)
return
}
log.Infof("Getting messages") log.Infof("Getting messages")
msgs, incls := fetchMessages(ctx, api, toSync) msgs, incls := fetchMessages(ctx, api, toSync)