Optimise mpool update processing

This commit is contained in:
Łukasz Magiera 2019-12-13 12:04:24 +01:00
parent 165ac1d556
commit f2ea12571f
3 changed files with 68 additions and 24 deletions

View File

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
@ -15,26 +16,45 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) {
return return
} }
for change := range sub { for {
if change.Type != aapi.MpoolAdd { var updates []aapi.MpoolUpdate
continue
}
log.Info("mpool message") select {
case update := <-sub:
go func() { updates = append(updates, update)
err := st.storeMessages(map[cid.Cid]*types.Message{ case <-ctx.Done():
change.Message.Message.Cid(): &change.Message.Message,
})
if err != nil {
//log.Error(err)
return return
} }
if err := st.storeMpoolInclusion(change.Message.Message.Cid()); err != nil { loop:
for {
time.Sleep(10 * time.Millisecond)
select {
case update := <-sub:
updates = append(updates, update)
default:
break loop
}
}
msgs := map[cid.Cid]*types.Message{}
for _, v := range updates {
if v.Type != aapi.MpoolAdd {
continue
}
msgs[v.Message.Message.Cid()] = &v.Message.Message
}
log.Infof("Processing %d mpool updates", len(msgs))
err := st.storeMessages(msgs)
if err != nil {
log.Error(err) log.Error(err)
} }
}()
if err := st.storeMpoolInclusions(updates); err != nil {
log.Error(err)
}
} }
} }

View File

@ -15,13 +15,13 @@
</div> </div>
<div class="Index-nodes"> <div class="Index-nodes">
<div class="Index-node"> <div class="Index-node">
Balance: {{queryNum "select balance from actors inner join main.id_address_map m on m.address = ? where actors.id = m.id order by nonce desc limit 1" $wallet }} Balance: {{queryNum "select balance from actors inner join id_address_map m on m.address = $1 where actors.id = m.id order by nonce desc limit 1" $wallet }}
</div> </div>
<div class="Index-node"> <div class="Index-node">
Messages: Messages:
<table> <table>
<tr><td>Dir</td><td>Peer</td><td>Nonce</td><td>Value</td><td>Block</td><td>Mpool Wait</td></tr> <tr><td>Dir</td><td>Peer</td><td>Nonce</td><td>Value</td><td>Block</td><td>Mpool Wait</td></tr>
{{ range messages "`from` = ? or `to` = ?" $wallet $wallet $wallet}} {{ range messages "`from` = $1 or `to` = $2" $wallet $wallet $wallet}}
<tr> <tr>
{{ if eq .From.String $wallet }} {{ if eq .From.String $wallet }}
<td>To</td><td>{{.To.String}}</td> <td>To</td><td>{{.To.String}}</td>

View File

@ -2,6 +2,7 @@ package main
import ( import (
"database/sql" "database/sql"
"github.com/filecoin-project/lotus/api"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"sync" "sync"
"time" "time"
@ -595,24 +596,47 @@ create temp table mi (like block_messages excluding constraints) on commit drop;
return tx.Commit() return tx.Commit()
} }
func (st *storage) storeMpoolInclusion(msg cid.Cid) error { func (st *storage) storeMpoolInclusions(msgs []api.MpoolUpdate) error {
tx, err := st.db.Begin() tx, err := st.db.Begin()
if err != nil { if err != nil {
return err return err
} }
stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES ($1, $2) on conflict do nothing`) if _, err := tx.Exec(`
create temp table mi (like mpool_messages excluding constraints) on commit drop;
`); err != nil {
return xerrors.Errorf("prep temp: %w", err)
}
stmt, err := tx.Prepare(`copy mi (msg, add_ts) from stdin `)
if err != nil { if err != nil {
return err return err
} }
defer stmt.Close()
for _, msg := range msgs {
if msg.Type != api.MpoolAdd {
continue
}
if _, err := stmt.Exec( if _, err := stmt.Exec(
msg.String(), msg.Message.Message.Cid().String(),
time.Now().Unix(), time.Now().Unix(),
); err != nil { ); err != nil {
return err return err
} }
}
if err := stmt.Close(); err != nil {
return err
}
if _, err := tx.Exec(`insert into mpool_messages select * from mi on conflict do nothing `); err != nil {
return xerrors.Errorf("actor put: %w", err)
}
return tx.Commit() return tx.Commit()
} }