Merge pull request #3137 from filecoin-project/fix/chainwatch/calibration-improvements-2

More Chainwatch Improvements
This commit is contained in:
Łukasz Magiera 2020-08-18 15:10:42 +02:00 committed by GitHub
commit 2d73b04f7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 14 deletions

View File

@ -16,7 +16,6 @@ import (
_init "github.com/filecoin-project/specs-actors/actors/builtin/init" _init "github.com/filecoin-project/specs-actors/actors/builtin/init"
"github.com/filecoin-project/specs-actors/actors/util/adt" "github.com/filecoin-project/specs-actors/actors/util/adt"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
typegen "github.com/whyrusleeping/cbor-gen" typegen "github.com/whyrusleeping/cbor-gen"
) )
@ -232,20 +231,20 @@ func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
return err return err
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`
create temp table a (like actors excluding constraints) on commit drop; create temp table a_tmp (like actors excluding constraints) on commit drop;
`); err != nil { `); err != nil {
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy a (id, code, head, nonce, balance, stateroot) from stdin `) stmt, err := tx.Prepare(`copy a_tmp (id, code, head, nonce, balance, stateroot) from stdin `)
if err != nil { if err != nil {
return err return err
} }
for code, actTips := range actors { for code, actTips := range actors {
actorName := code.String() actorName := code.String()
if s, err := multihash.Decode(code.Hash()); err != nil { if builtin.IsBuiltinActor(code) {
actorName = string(s.Digest) actorName = builtin.ActorNameByCode(code)
} }
for _, actorInfo := range actTips { for _, actorInfo := range actTips {
for _, a := range actorInfo { for _, a := range actorInfo {
@ -260,7 +259,7 @@ func (p *Processor) storeActorHeads(actors map[cid.Cid]ActorTips) error {
return err return err
} }
if _, err := tx.Exec(`insert into actors select * from a on conflict do nothing `); err != nil { if _, err := tx.Exec(`insert into actors select * from a_tmp on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err) return xerrors.Errorf("actor put: %w", err)
} }
@ -278,20 +277,20 @@ func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error {
return err return err
} }
if _, err := tx.Exec(` if _, err := tx.Exec(`
create temp table a (like actor_states excluding constraints) on commit drop; create temp table as_tmp (like actor_states excluding constraints) on commit drop;
`); err != nil { `); err != nil {
return xerrors.Errorf("prep temp: %w", err) return xerrors.Errorf("prep temp: %w", err)
} }
stmt, err := tx.Prepare(`copy a (head, code, state) from stdin `) stmt, err := tx.Prepare(`copy as_tmp (head, code, state) from stdin `)
if err != nil { if err != nil {
return err return err
} }
for code, actTips := range actors { for code, actTips := range actors {
actorName := code.String() actorName := code.String()
if s, err := multihash.Decode(code.Hash()); err != nil { if builtin.IsBuiltinActor(code) {
actorName = string(s.Digest) actorName = builtin.ActorNameByCode(code)
} }
for _, actorInfo := range actTips { for _, actorInfo := range actTips {
for _, a := range actorInfo { for _, a := range actorInfo {
@ -306,7 +305,7 @@ func (p *Processor) storeActorStates(actors map[cid.Cid]ActorTips) error {
return err return err
} }
if _, err := tx.Exec(`insert into actor_states select * from a on conflict do nothing `); err != nil { if _, err := tx.Exec(`insert into actor_states select * from as_tmp on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err) return xerrors.Errorf("actor put: %w", err)
} }

View File

@ -336,16 +336,19 @@ where rnk <= $1
} }
var c string var c string
if err := rows.Scan(&c); err != nil { if err := rows.Scan(&c); err != nil {
return nil, xerrors.Errorf("Failed to scan unprocessed blocks: %w", err) log.Errorf("Failed to scan unprocessed blocks: %s", err.Error())
continue
} }
ci, err := cid.Parse(c) ci, err := cid.Parse(c)
if err != nil { if err != nil {
return nil, xerrors.Errorf("Failed to parse unprocessed blocks: %w", err) log.Errorf("Failed to parse unprocessed blocks: %s", err.Error())
continue
} }
bh, err := p.node.ChainGetBlock(ctx, ci) bh, err := p.node.ChainGetBlock(ctx, ci)
if err != nil { if err != nil {
// this is a pretty serious issue. // this is a pretty serious issue.
return nil, xerrors.Errorf("Failed to get block header %s: %w", ci.String(), err) log.Errorf("Failed to get block header %s: %s", ci.String(), err.Error())
continue
} }
out[ci] = bh out[ci] = bh
if bh.Height < minBlock { if bh.Height < minBlock {