From f2ea12571fdba34ba0509aee0edae7d32d10c72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 13 Dec 2019 12:04:24 +0100 Subject: [PATCH] Optimise mpool update processing --- cmd/lotus-chainwatch/mpool.go | 50 +++++++++++++++++++++--------- cmd/lotus-chainwatch/site/key.html | 4 +-- cmd/lotus-chainwatch/storage.go | 38 ++++++++++++++++++----- 3 files changed, 68 insertions(+), 24 deletions(-) diff --git a/cmd/lotus-chainwatch/mpool.go b/cmd/lotus-chainwatch/mpool.go index aaea80c75..ea45380b7 100644 --- a/cmd/lotus-chainwatch/mpool.go +++ b/cmd/lotus-chainwatch/mpool.go @@ -2,6 +2,7 @@ package main import ( "context" + "time" "github.com/ipfs/go-cid" @@ -15,26 +16,45 @@ func subMpool(ctx context.Context, api aapi.FullNode, st *storage) { return } - for change := range sub { - if change.Type != aapi.MpoolAdd { - continue + for { + var updates []aapi.MpoolUpdate + + select { + case update := <-sub: + updates = append(updates, update) + case <-ctx.Done(): + return } - log.Info("mpool message") + loop: + for { + time.Sleep(10 * time.Millisecond) + select { + case update := <-sub: + updates = append(updates, update) + default: + break loop + } + } - go func() { - err := st.storeMessages(map[cid.Cid]*types.Message{ - change.Message.Message.Cid(): &change.Message.Message, - }) - if err != nil { - //log.Error(err) - return + msgs := map[cid.Cid]*types.Message{} + for _, v := range updates { + if v.Type != aapi.MpoolAdd { + continue } - if err := st.storeMpoolInclusion(change.Message.Message.Cid()); err != nil { - log.Error(err) - } - }() + msgs[v.Message.Message.Cid()] = &v.Message.Message + } + log.Infof("Processing %d mpool updates", len(msgs)) + + err := st.storeMessages(msgs) + if err != nil { + log.Error(err) + } + + if err := st.storeMpoolInclusions(updates); err != nil { + log.Error(err) + } } } diff --git a/cmd/lotus-chainwatch/site/key.html b/cmd/lotus-chainwatch/site/key.html index 697838ed2..40ec02cb6 100644 --- a/cmd/lotus-chainwatch/site/key.html +++ b/cmd/lotus-chainwatch/site/key.html @@ -15,13 +15,13 @@
- Balance: {{queryNum "select balance from actors inner join main.id_address_map m on m.address = ? where actors.id = m.id order by nonce desc limit 1" $wallet }} + Balance: {{queryNum "select balance from actors inner join id_address_map m on m.address = $1 where actors.id = m.id order by nonce desc limit 1" $wallet }}
Messages: - {{ range messages "`from` = ? or `to` = ?" $wallet $wallet $wallet}} + {{ range messages "`from` = $1 or `to` = $2" $wallet $wallet $wallet}} {{ if eq .From.String $wallet }} diff --git a/cmd/lotus-chainwatch/storage.go b/cmd/lotus-chainwatch/storage.go index 477eb50a6..01c93c231 100644 --- a/cmd/lotus-chainwatch/storage.go +++ b/cmd/lotus-chainwatch/storage.go @@ -2,6 +2,7 @@ package main import ( "database/sql" + "github.com/filecoin-project/lotus/api" "golang.org/x/xerrors" "sync" "time" @@ -595,24 +596,47 @@ create temp table mi (like block_messages excluding constraints) on commit drop; return tx.Commit() } -func (st *storage) storeMpoolInclusion(msg cid.Cid) error { +func (st *storage) storeMpoolInclusions(msgs []api.MpoolUpdate) error { tx, err := st.db.Begin() if err != nil { return err } - stmt, err := tx.Prepare(`insert into mpool_messages (msg, add_ts) VALUES ($1, $2) on conflict do nothing`) + if _, err := tx.Exec(` + +create temp table mi (like mpool_messages excluding constraints) on commit drop; + + +`); err != nil { + return xerrors.Errorf("prep temp: %w", err) + } + + stmt, err := tx.Prepare(`copy mi (msg, add_ts) from stdin `) if err != nil { return err } - defer stmt.Close() - if _, err := stmt.Exec( - msg.String(), - time.Now().Unix(), - ); err != nil { + for _, msg := range msgs { + if msg.Type != api.MpoolAdd { + continue + } + + if _, err := stmt.Exec( + msg.Message.Message.Cid().String(), + time.Now().Unix(), + ); err != nil { + return err + } + } + + if err := stmt.Close(); err != nil { return err } + + if _, err := tx.Exec(`insert into mpool_messages select * from mi on conflict do nothing `); err != nil { + return xerrors.Errorf("actor put: %w", err) + } + return tx.Commit() }
DirPeerNonceValueBlockMpool Wait
To{{.To.String}}