From 50edaff3ceb1141b74ed03b9e4d799195398660a Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Thu, 23 Dec 2021 13:22:44 +0530 Subject: [PATCH 1/5] Improve logging. --- go.mod | 3 +-- pkg/snapshot/publisher.go | 35 +++++++++++++++++++++++++++++++++++ pkg/snapshot/service.go | 1 + 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index b3c15c9..4137b19 100644 --- a/go.mod +++ b/go.mod @@ -13,8 +13,7 @@ require ( github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/ipfs/go-cid v0.1.0 // indirect github.com/ipfs/go-datastore v0.5.1 // indirect - github.com/ipfs/go-ipfs-blockstore v1.1.2 - github.com/ipfs/go-ipfs-ds-help v1.1.0 + github.com/ipfs/go-ipfs-blockstore v1.1.2 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log/v2 v2.4.0 // indirect github.com/jmoiron/sqlx v1.2.0 diff --git a/pkg/snapshot/publisher.go b/pkg/snapshot/publisher.go index 6b7b56f..b6a4e59 100644 --- a/pkg/snapshot/publisher.go +++ b/pkg/snapshot/publisher.go @@ -18,21 +18,29 @@ package snapshot import ( "bytes" "fmt" + "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/jmoiron/sqlx" "github.com/multiformats/go-multihash" + "github.com/sirupsen/logrus" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) +const period = 1 * time.Minute + // Publisher is wrapper around DB. type Publisher struct { db *postgres.DB currBatchSize uint + stateNodeCounter uint64 + storageNodeCounter uint64 + blockNodeCounter uint64 } // NewPublisher creates Publisher @@ -95,11 +103,18 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i return 0, err } + // increment block node counter. + atomic.AddUint64(&p.blockNodeCounter, 1) + err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) RETURNING id`, headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID) + // increment state node counter. + atomic.AddUint64(&p.stateNodeCounter, 1) + + // increment current batch size counter p.currBatchSize += 2 return stateID, err } @@ -116,6 +131,9 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e return err } + // increment block node counter. + atomic.AddUint64(&p.blockNodeCounter, 1) + _, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`, stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey) @@ -123,6 +141,10 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e return err } + // increment storage node counter. + atomic.AddUint64(&p.storageNodeCounter, 1) + + // increment current batch size counter p.currBatchSize += 2 return nil } @@ -139,6 +161,9 @@ func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx return fmt.Errorf("error publishing code IPLD: %v", err) } + // increment block node counter. + atomic.AddUint64(&p.blockNodeCounter, 1) + p.currBatchSize++ return nil } @@ -161,3 +186,13 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er return tx, nil } + +// printNodeCounters prints number of node processed. +func (p *Publisher) printNodeCounters() { + t := time.NewTicker(period) + for range t.C { + logrus.Infof("processed state nodes %d", atomic.LoadUint64(&p.stateNodeCounter)) + logrus.Infof("processed storage nodes %d", atomic.LoadUint64(&p.storageNodeCounter)) + logrus.Infof("processed block nodes %d", atomic.LoadUint64(&p.blockNodeCounter)) + } +} diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index 0f5ea20..4a14429 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -136,6 +136,7 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he return err } + go s.ipfsPublisher.printNodeCounters() defer func() { if rec := recover(); rec != nil { shared.Rollback(tx) From c5dfa98ffee48ca4b36f7aaa64bbf4b2993e837d Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Thu, 23 Dec 2021 20:04:34 +0530 Subject: [PATCH 2/5] Refactor. --- pkg/snapshot/publisher.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/snapshot/publisher.go b/pkg/snapshot/publisher.go index b6a4e59..4a680d8 100644 --- a/pkg/snapshot/publisher.go +++ b/pkg/snapshot/publisher.go @@ -32,7 +32,7 @@ import ( "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) -const period = 1 * time.Minute +const logInterval = 1 * time.Minute // Publisher is wrapper around DB. type Publisher struct { @@ -189,7 +189,7 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er // printNodeCounters prints number of node processed. func (p *Publisher) printNodeCounters() { - t := time.NewTicker(period) + t := time.NewTicker(logInterval) for range t.C { logrus.Infof("processed state nodes %d", atomic.LoadUint64(&p.stateNodeCounter)) logrus.Infof("processed storage nodes %d", atomic.LoadUint64(&p.storageNodeCounter)) From 9d9fe231f73bbdee733a1dfbf05714fbc5031477 Mon Sep 17 00:00:00 2001 From: i-norden Date: Tue, 28 Dec 2021 23:22:01 -0600 Subject: [PATCH 3/5] storageNodeCounter; log the total runtime --- pkg/snapshot/publisher.go | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/pkg/snapshot/publisher.go b/pkg/snapshot/publisher.go index 4a680d8..432f7fa 100644 --- a/pkg/snapshot/publisher.go +++ b/pkg/snapshot/publisher.go @@ -36,11 +36,12 @@ const logInterval = 1 * time.Minute // Publisher is wrapper around DB. type Publisher struct { - db *postgres.DB - currBatchSize uint + db *postgres.DB + currBatchSize uint stateNodeCounter uint64 - storageNodeCounter uint64 - blockNodeCounter uint64 + storageNodeCounter uint64 + codeNodeCounter uint64 + startTime time.Time } // NewPublisher creates Publisher @@ -48,6 +49,7 @@ func NewPublisher(db *postgres.DB) *Publisher { return &Publisher{ db: db, currBatchSize: 0, + startTime: time.Now(), } } @@ -103,9 +105,6 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i return 0, err } - // increment block node counter. - atomic.AddUint64(&p.blockNodeCounter, 1) - err = tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) RETURNING id`, @@ -131,9 +130,6 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e return err } - // increment block node counter. - atomic.AddUint64(&p.blockNodeCounter, 1) - _, err = tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`, stateID, storageKey, storageCIDStr, node.path, node.nodeType, false, mhKey) @@ -161,8 +157,8 @@ func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx return fmt.Errorf("error publishing code IPLD: %v", err) } - // increment block node counter. - atomic.AddUint64(&p.blockNodeCounter, 1) + // increment code node counter. + atomic.AddUint64(&p.codeNodeCounter, 1) p.currBatchSize++ return nil @@ -191,8 +187,9 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er func (p *Publisher) printNodeCounters() { t := time.NewTicker(logInterval) for range t.C { - logrus.Infof("processed state nodes %d", atomic.LoadUint64(&p.stateNodeCounter)) - logrus.Infof("processed storage nodes %d", atomic.LoadUint64(&p.storageNodeCounter)) - logrus.Infof("processed block nodes %d", atomic.LoadUint64(&p.blockNodeCounter)) + logrus.Infof("total runtime: %s", time.Now().Sub(p.startTime).String()) + logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter)) + logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter)) + logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter)) } } From b5d31748e0e924b975a209be05ad1c39f096277c Mon Sep 17 00:00:00 2001 From: i-norden Date: Tue, 28 Dec 2021 23:30:17 -0600 Subject: [PATCH 4/5] bump geth version --- go.mod | 4 ++-- go.sum | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 4137b19..8c5c001 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/ethereum/go-ethereum v1.9.11 + github.com/ethereum/go-ethereum v1.9.14 github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-kit/kit v0.10.0 // indirect github.com/google/go-cmp v0.5.6 // indirect @@ -40,4 +40,4 @@ require ( lukechampine.com/blake3 v1.1.7 // indirect ) -replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27 +replace github.com/ethereum/go-ethereum v1.9.14 => github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29 diff --git a/go.sum b/go.sum index f32e2df..bb1e5f8 100644 --- a/go.sum +++ b/go.sum @@ -671,8 +671,8 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= -github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27 h1:ldAVLlKll2WHHKLNu8oKbkYRUVRs9T45z12KiSrRW24= -github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27/go.mod h1:9L+QY31AnWnX2/2HDOySCjQoYUdWNGBRMezFJVfH73E= +github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29 h1:kjZjteD/6vh9DcixPkrg27XtxKW7ZoV5++1EYAi6FAw= +github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29/go.mod h1:9L+QY31AnWnX2/2HDOySCjQoYUdWNGBRMezFJVfH73E= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= From e50bd160fea50e77ffd61b584abe370994e1bb46 Mon Sep 17 00:00:00 2001 From: i-norden Date: Tue, 28 Dec 2021 23:42:19 -0600 Subject: [PATCH 5/5] log final counts when process completes --- pkg/snapshot/publisher.go | 16 ++++++++++------ pkg/snapshot/service.go | 4 +++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/snapshot/publisher.go b/pkg/snapshot/publisher.go index 432f7fa..781cea4 100644 --- a/pkg/snapshot/publisher.go +++ b/pkg/snapshot/publisher.go @@ -183,13 +183,17 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er return tx, nil } -// printNodeCounters prints number of node processed. -func (p *Publisher) printNodeCounters() { +// logNodeCounters periodically logs the number of node processed. +func (p *Publisher) logNodeCounters() { t := time.NewTicker(logInterval) for range t.C { - logrus.Infof("total runtime: %s", time.Now().Sub(p.startTime).String()) - logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter)) - logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter)) - logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter)) + p.printNodeCounters() } } + +func (p *Publisher) printNodeCounters() { + logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String()) + logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter)) + logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter)) + logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter)) +} diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index 4a14429..a684741 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -136,8 +136,10 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he return err } - go s.ipfsPublisher.printNodeCounters() + go s.ipfsPublisher.logNodeCounters() defer func() { + logrus.Info("----- final counts -----") + s.ipfsPublisher.printNodeCounters() if rec := recover(); rec != nil { shared.Rollback(tx) panic(rec)