Improve logging.

This commit is contained in:
Arijit Das 2021-12-23 13:22:44 +05:30
parent 48636d2f1b
commit 50edaff3ce
3 changed files with 37 additions and 2 deletions

3
go.mod
View File

@ -13,8 +13,7 @@ require (
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect
github.com/ipfs/go-cid v0.1.0 // indirect github.com/ipfs/go-cid v0.1.0 // indirect
github.com/ipfs/go-datastore v0.5.1 // indirect github.com/ipfs/go-datastore v0.5.1 // indirect
github.com/ipfs/go-ipfs-blockstore v1.1.2 github.com/ipfs/go-ipfs-blockstore v1.1.2 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-log/v2 v2.4.0 // indirect github.com/ipfs/go-log/v2 v2.4.0 // indirect
github.com/jmoiron/sqlx v1.2.0 github.com/jmoiron/sqlx v1.2.0

View File

@ -18,21 +18,29 @@ package snapshot
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"github.com/sirupsen/logrus"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
) )
const period = 1 * time.Minute
// Publisher is wrapper around DB. // Publisher is wrapper around DB.
type Publisher struct { type Publisher struct {
db *postgres.DB db *postgres.DB
currBatchSize uint currBatchSize uint
stateNodeCounter uint64
storageNodeCounter uint64
blockNodeCounter uint64
} }
// NewPublisher creates Publisher // NewPublisher creates Publisher
@ -95,11 +103,18 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i
return 0, err return 0, err
} }
// increment block node counter.
atomic.AddUint64(&p.blockNodeCounter, 1)
err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
RETURNING id`, RETURNING id`,
headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID) headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID)
// increment state node counter.
atomic.AddUint64(&p.stateNodeCounter, 1)
// increment current batch size counter
p.currBatchSize += 2 p.currBatchSize += 2
return stateID, err return stateID, err
} }
@ -116,6 +131,9 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e
return err return err
} }
// increment block node counter.
atomic.AddUint64(&p.blockNodeCounter, 1)
_, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) _, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`, ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey) stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey)
@ -123,6 +141,10 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e
return err return err
} }
// increment storage node counter.
atomic.AddUint64(&p.storageNodeCounter, 1)
// increment current batch size counter
p.currBatchSize += 2 p.currBatchSize += 2
return nil return nil
} }
@ -139,6 +161,9 @@ func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx
return fmt.Errorf("error publishing code IPLD: %v", err) return fmt.Errorf("error publishing code IPLD: %v", err)
} }
// increment block node counter.
atomic.AddUint64(&p.blockNodeCounter, 1)
p.currBatchSize++ p.currBatchSize++
return nil return nil
} }
@ -161,3 +186,13 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er
return tx, nil return tx, nil
} }
// printNodeCounters prints number of node processed.
func (p *Publisher) printNodeCounters() {
t := time.NewTicker(period)
for range t.C {
logrus.Infof("processed state nodes %d", atomic.LoadUint64(&p.stateNodeCounter))
logrus.Infof("processed storage nodes %d", atomic.LoadUint64(&p.storageNodeCounter))
logrus.Infof("processed block nodes %d", atomic.LoadUint64(&p.blockNodeCounter))
}
}

View File

@ -136,6 +136,7 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he
return err return err
} }
go s.ipfsPublisher.printNodeCounters()
defer func() { defer func() {
if rec := recover(); rec != nil { if rec := recover(); rec != nil {
shared.Rollback(tx) shared.Rollback(tx)