diff --git a/go.mod b/go.mod index b3c15c9..8c5c001 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.15 require ( github.com/btcsuite/btcd v0.22.0-beta // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/ethereum/go-ethereum v1.9.11 + github.com/ethereum/go-ethereum v1.9.14 github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-kit/kit v0.10.0 // indirect github.com/google/go-cmp v0.5.6 // indirect @@ -13,8 +13,7 @@ require ( github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/ipfs/go-cid v0.1.0 // indirect github.com/ipfs/go-datastore v0.5.1 // indirect - github.com/ipfs/go-ipfs-blockstore v1.1.2 - github.com/ipfs/go-ipfs-ds-help v1.1.0 + github.com/ipfs/go-ipfs-blockstore v1.1.2 // indirect github.com/ipfs/go-log v1.0.5 // indirect github.com/ipfs/go-log/v2 v2.4.0 // indirect github.com/jmoiron/sqlx v1.2.0 @@ -41,4 +40,4 @@ require ( lukechampine.com/blake3 v1.1.7 // indirect ) -replace github.com/ethereum/go-ethereum v1.9.11 => github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27 +replace github.com/ethereum/go-ethereum v1.9.14 => github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29 diff --git a/go.sum b/go.sum index f32e2df..bb1e5f8 100644 --- a/go.sum +++ b/go.sum @@ -671,8 +671,8 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= -github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27 h1:ldAVLlKll2WHHKLNu8oKbkYRUVRs9T45z12KiSrRW24= -github.com/vulcanize/go-ethereum v1.10.11-statediff-0.0.27/go.mod h1:9L+QY31AnWnX2/2HDOySCjQoYUdWNGBRMezFJVfH73E= +github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29 h1:kjZjteD/6vh9DcixPkrg27XtxKW7ZoV5++1EYAi6FAw= +github.com/vulcanize/go-ethereum v1.10.14-statediff-0.0.29/go.mod h1:9L+QY31AnWnX2/2HDOySCjQoYUdWNGBRMezFJVfH73E= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= diff --git a/pkg/snapshot/publisher.go b/pkg/snapshot/publisher.go index 6b7b56f..781cea4 100644 --- a/pkg/snapshot/publisher.go +++ b/pkg/snapshot/publisher.go @@ -18,21 +18,30 @@ package snapshot import ( "bytes" "fmt" + "sync/atomic" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/jmoiron/sqlx" "github.com/multiformats/go-multihash" + "github.com/sirupsen/logrus" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) +const logInterval = 1 * time.Minute + // Publisher is wrapper around DB. type Publisher struct { - db *postgres.DB - currBatchSize uint + db *postgres.DB + currBatchSize uint + stateNodeCounter uint64 + storageNodeCounter uint64 + codeNodeCounter uint64 + startTime time.Time } // NewPublisher creates Publisher @@ -40,6 +49,7 @@ func NewPublisher(db *postgres.DB) *Publisher { return &Publisher{ db: db, currBatchSize: 0, + startTime: time.Now(), } } @@ -100,6 +110,10 @@ func (p *Publisher) PublishStateNode(node *node, headerID int64, tx *sqlx.Tx) (i RETURNING id`, headerID, stateKey, stateCIDStr, node.path, node.nodeType, false, mhKey).Scan(&stateID) + // increment state node counter. + atomic.AddUint64(&p.stateNodeCounter, 1) + + // increment current batch size counter p.currBatchSize += 2 return stateID, err } @@ -123,6 +137,10 @@ func (p *Publisher) PublishStorageNode(node *node, stateID int64, tx *sqlx.Tx) e return err } + // increment storage node counter. + atomic.AddUint64(&p.storageNodeCounter, 1) + + // increment current batch size counter p.currBatchSize += 2 return nil } @@ -139,6 +157,9 @@ func (p *Publisher) PublishCode(codeHash common.Hash, codeBytes []byte, tx *sqlx return fmt.Errorf("error publishing code IPLD: %v", err) } + // increment code node counter. + atomic.AddUint64(&p.codeNodeCounter, 1) + p.currBatchSize++ return nil } @@ -161,3 +182,18 @@ func (p *Publisher) checkBatchSize(tx *sqlx.Tx, maxBatchSize uint) (*sqlx.Tx, er return tx, nil } + +// logNodeCounters periodically logs the number of node processed. +func (p *Publisher) logNodeCounters() { + t := time.NewTicker(logInterval) + for range t.C { + p.printNodeCounters() + } +} + +func (p *Publisher) printNodeCounters() { + logrus.Infof("runtime: %s", time.Now().Sub(p.startTime).String()) + logrus.Infof("processed state nodes: %d", atomic.LoadUint64(&p.stateNodeCounter)) + logrus.Infof("processed storage nodes: %d", atomic.LoadUint64(&p.storageNodeCounter)) + logrus.Infof("processed code nodes: %d", atomic.LoadUint64(&p.codeNodeCounter)) +} diff --git a/pkg/snapshot/service.go b/pkg/snapshot/service.go index 0f5ea20..a684741 100644 --- a/pkg/snapshot/service.go +++ b/pkg/snapshot/service.go @@ -136,7 +136,10 @@ func (s *Service) createSnapshot(it trie.NodeIterator, trieDB *trie.Database, he return err } + go s.ipfsPublisher.logNodeCounters() defer func() { + logrus.Info("----- final counts -----") + s.ipfsPublisher.printNodeCounters() if rec := recover(); rec != nil { shared.Rollback(tx) panic(rec)