Merge pull request #705 from filecoin-project/feat/cw-receipts

chainwatch: Gather receipts
This commit is contained in:
Łukasz Magiera 2019-12-04 02:27:20 +01:00 committed by GitHub
commit 0adfff9ae7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 93 additions and 0 deletions

View File

@ -160,6 +160,26 @@ create table if not exists mpool_messages
create unique index if not exists mpool_messages_msg_uindex
on mpool_messages (msg);
create table if not exists receipts
(
msg text not null
constraint receipts_messages_cid_fk
references messages,
state text not null
constraint receipts_blocks_parentStateRoot_fk
references blocks (parentStateRoot),
idx int not null,
exit int not null,
gas_used int not null,
return blob,
constraint receipts_pk
primary key (msg, state)
);
create index if not exists receipts_msg_state_index
on receipts (msg, state);
create table if not exists miner_heads
(
head text not null
@ -342,6 +362,34 @@ func (st *storage) storeMessages(msgs map[cid.Cid]*types.Message) error {
return tx.Commit()
}
func (st *storage) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
tx, err := st.db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare(`insert into receipts (msg, state, idx, exit, gas_used, return) VALUES (?, ?, ?, ?, ?, ?) on conflict do nothing`)
if err != nil {
return err
}
defer stmt.Close()
for c, m := range recs {
if _, err := stmt.Exec(
c.msg.String(),
c.state.String(),
c.idx,
m.ExitCode,
m.GasUsed.String(),
m.Return,
); err != nil {
return err
}
}
return tx.Commit()
}
func (st *storage) storeAddressMap(addrs map[address.Address]address.Address) error {
tx, err := st.db.Begin()
if err != nil {

View File

@ -233,6 +233,15 @@ func syncHead(ctx context.Context, api api.FullNode, st *storage, ts *types.TipS
return
}
log.Infof("Getting parent receipts")
receipts := fetchParentReceipts(ctx, api, toSync)
if err := st.storeReceipts(receipts); err != nil {
log.Error(err)
return
}
log.Infof("Resolving addresses")
for _, message := range msgs {
@ -290,3 +299,39 @@ func fetchMessages(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*ty
return messages, inclusions
}
type mrec struct {
msg cid.Cid
state cid.Cid
idx int
}
func fetchParentReceipts(ctx context.Context, api api.FullNode, toSync map[cid.Cid]*types.BlockHeader) map[mrec]*types.MessageReceipt {
var lk sync.Mutex
out := map[mrec]*types.MessageReceipt{}
par(50, maparr(toSync), func(header *types.BlockHeader) {
recs, err := api.ChainGetParentReceipts(ctx, header.Cid())
if err != nil {
log.Error(err)
return
}
msgs, err := api.ChainGetParentMessages(ctx, header.Cid())
if err != nil {
log.Error(err)
return
}
lk.Lock()
for i, r := range recs {
out[mrec{
msg: msgs[i].Cid,
state: header.ParentStateRoot,
idx: i,
}] = r
}
lk.Unlock()
})
return out
}