- When chainwatch is ran it will first start a Syncer that continuously collects blocks from the ChainNotify channel and persists them to the blocks_synced table. Once the Syncer has caught the blocks_synced table up to the lotus daemons current head a Processor is started. The Processor selects a batch of contiguous blocks and extracts and stores their data. It attempts to do as much work as it can in parallel. When the blocks are done being processed their corresponding processed_at and is_processed fields in the blocks_synced table are filled out.
317 lines
6.7 KiB
317 lines
6.7 KiB
package processor
import (
func (p *Processor) setupMessages() error {
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`
create table if not exists messages
cid text not null
constraint messages_pk
primary key,
"from" text not null,
"to" text not null,
nonce bigint not null,
value text not null,
gasprice bigint not null,
gaslimit bigint not null,
method bigint,
params bytea
create unique index if not exists messages_cid_uindex
on messages (cid);
create index if not exists messages_from_index
on messages ("from");
create index if not exists messages_to_index
on messages ("to");
create table if not exists block_messages
block text not null
constraint blocks_block_cids_cid_fk
references block_cids (cid),
message text not null,
constraint block_messages_pk
primary key (block, message)
create table if not exists mpool_messages
msg text not null
constraint mpool_messages_pk
primary key
constraint mpool_messages_messages_cid_fk
references messages,
add_ts int not null
create unique index if not exists mpool_messages_msg_uindex
on mpool_messages (msg);
create table if not exists receipts
msg text not null,
state text not null,
idx int not null,
exit int not null,
gas_used int not null,
return bytea,
constraint receipts_pk
primary key (msg, state)
create index if not exists receipts_msg_state_index
on receipts (msg, state);
`); err != nil {
return err
return tx.Commit()
func (p *Processor) HandleMessageChanges(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) error {
if err := p.persistMessagesAndReceipts(ctx, blocks); err != nil {
return err
return nil
func (p *Processor) persistMessagesAndReceipts(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) error {
messages, inclusions := p.fetchMessages(ctx, blocks)
receipts := p.fetchParentReceipts(ctx, blocks)
grp, _ := errgroup.WithContext(ctx)
grp.Go(func() error {
return p.storeMessages(messages)
grp.Go(func() error {
return p.storeMsgInclusions(inclusions)
grp.Go(func() error {
return p.storeReceipts(receipts)
return grp.Wait()
func (p *Processor) storeReceipts(recs map[mrec]*types.MessageReceipt) error {
start := time.Now()
defer func() {
log.Infow("Persisted Receipts", "duration", time.Since(start).String())
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`
create temp table recs (like receipts excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
stmt, err := tx.Prepare(`copy recs (msg, state, idx, exit, gas_used, return) from stdin `)
if err != nil {
return err
for c, m := range recs {
if _, err := stmt.Exec(
); err != nil {
return err
if err := stmt.Close(); err != nil {
return err
if _, err := tx.Exec(`insert into receipts select * from recs on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
return tx.Commit()
func (p *Processor) storeMsgInclusions(incls map[cid.Cid][]cid.Cid) error {
start := time.Now()
defer func() {
log.Infow("Persisted Message Inclusions", "duration", time.Since(start).String())
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`
create temp table mi (like block_messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
stmt, err := tx.Prepare(`copy mi (block, message) from STDIN `)
if err != nil {
return err
for b, msgs := range incls {
for _, msg := range msgs {
if _, err := stmt.Exec(
); err != nil {
return err
if err := stmt.Close(); err != nil {
return err
if _, err := tx.Exec(`insert into block_messages select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
return tx.Commit()
func (p *Processor) storeMessages(msgs map[cid.Cid]*types.Message) error {
start := time.Now()
defer func() {
log.Debugw("Persisted Messages", "duration", time.Since(start).String())
tx, err := p.db.Begin()
if err != nil {
return err
if _, err := tx.Exec(`
create temp table msgs (like messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
stmt, err := tx.Prepare(`copy msgs (cid, "from", "to", nonce, "value", gasprice, gaslimit, method, params) from stdin `)
if err != nil {
return err
for c, m := range msgs {
if _, err := stmt.Exec(
); err != nil {
return err
if err := stmt.Close(); err != nil {
return err
if _, err := tx.Exec(`insert into messages select * from msgs on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
return tx.Commit()
func (p *Processor) fetchMessages(ctx context.Context, blocks map[cid.Cid]*types.BlockHeader) (map[cid.Cid]*types.Message, map[cid.Cid][]cid.Cid) {
var lk sync.Mutex
messages := map[cid.Cid]*types.Message{}
inclusions := map[cid.Cid][]cid.Cid{} // block -> msgs
parmap.Par(50, parmap.MapArr(blocks), func(header *types.BlockHeader) {
msgs, err := p.node.ChainGetBlockMessages(ctx, header.Cid())
if err != nil {
vmm := make([]*types.Message, 0, len(msgs.Cids))
for _, m := range msgs.BlsMessages {
vmm = append(vmm, m)
for _, m := range msgs.SecpkMessages {
vmm = append(vmm, &m.Message)
for _, message := range vmm {
messages[message.Cid()] = message
inclusions[header.Cid()] = append(inclusions[header.Cid()], message.Cid())
return messages, inclusions
type mrec struct {
msg cid.Cid
state cid.Cid
idx int
func (p *Processor) fetchParentReceipts(ctx context.Context, toSync map[cid.Cid]*types.BlockHeader) map[mrec]*types.MessageReceipt {
var lk sync.Mutex
out := map[mrec]*types.MessageReceipt{}
parmap.Par(50, parmap.MapArr(toSync), func(header *types.BlockHeader) {
recs, err := p.node.ChainGetParentReceipts(ctx, header.Cid())
if err != nil {
msgs, err := p.node.ChainGetParentMessages(ctx, header.Cid())
if err != nil {
for i, r := range recs {
msg: msgs[i].Cid,
state: header.ParentStateRoot,
idx: i,
}] = r
return out