// Copyright © 2020 Vulcanize, Inc // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package pg import ( "context" "math/big" "strconv" "sync/atomic" "time" "github.com/ipfs/go-cid" "github.com/lib/pq" "github.com/multiformats/go-multihash" "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" "github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom" snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types" ) var _ snapt.Publisher = (*publisher)(nil) const logInterval = 1 * time.Minute // Publisher is wrapper around DB. type publisher struct { db *postgres.DB currBatchSize uint stateNodeCounter uint64 storageNodeCounter uint64 codeNodeCounter uint64 startTime time.Time } // NewPublisher creates Publisher func NewPublisher(db *postgres.DB) *publisher { return &publisher{ db: db, startTime: time.Now(), } } type pubTx struct { sql.Tx callback func() } func (tx pubTx) Rollback() error { return tx.Tx.Rollback(context.Background()) } func (tx pubTx) Commit() error { if tx.callback != nil { defer tx.callback() } return tx.Tx.Commit(context.Background()) } func (tx pubTx) Exec(sql string, args ...interface{}) (sql.Result, error) { return tx.Tx.Exec(context.Background(), sql, args...) } func (p *publisher) BeginTx() (snapt.Tx, error) { tx, err := p.db.Begin(context.Background()) if err != nil { return nil, err } go p.logNodeCounters() return pubTx{tx, func() { p.printNodeCounters("final stats") }}, nil } // PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx // returns the CID func (tx pubTx) publishRaw(codec uint64, raw []byte, height *big.Int) (cid string, err error) { c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256) if err != nil { return } cid = c.String() return tx.publishIPLD(c, raw, height) } func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) { _, err := tx.Exec(schema.TableIPLDBlock.ToInsertStatement(false), height.Uint64(), c.String(), raw) return c.String(), err } // PublishIPLD writes an IPLD to the ipld.blocks blockstore func (p *publisher) PublishIPLD(c cid.Cid, raw []byte, height *big.Int, snapTx snapt.Tx) (string, error) { tx := snapTx.(pubTx) return tx.publishIPLD(c, raw, height) } // PublishHeader writes the header to the ipfs backing pg datastore and adds secondary indexes in the header_cids table func (p *publisher) PublishHeader(header *types.Header) (err error) { headerNode, err := ipld.NewEthHeader(header) if err != nil { return err } snapTx, err := p.db.Begin(context.Background()) if err != nil { return err } tx := pubTx{snapTx, nil} defer func() { err = snapt.CommitOrRollback(tx, err) if err != nil { logrus.Errorf("CommitOrRollback failed: %s", err) } }() if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number); err != nil { return err } _, err = tx.Exec(schema.TableHeader.ToInsertStatement(false), header.Number.Uint64(), header.Hash().Hex(), header.ParentHash.Hex(), headerNode.Cid().String(), "0", pq.StringArray([]string{p.db.NodeID()}), "0", header.Root.Hex(), header.TxHash.Hex(), header.ReceiptHash.Hex(), header.UncleHash.Hex(), header.Bloom.Bytes(), strconv.FormatUint(header.Time, 10), header.Coinbase.String()) return err } // PublishStateLeafNode writes the state leaf node to eth.state_cids func (p *publisher) PublishStateLeafNode(stateNode *models.StateNodeModel, snapTx snapt.Tx) error { tx := snapTx.(pubTx) _, err := tx.Exec(schema.TableStateNode.ToInsertStatement(false), stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, true, stateNode.Balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, false) if err != nil { return err } // increment state node counter. atomic.AddUint64(&p.stateNodeCounter, 1) prom.IncStateNodeCount() // increment current batch size counter p.currBatchSize += 2 return err } // PublishStorageLeafNode writes the storage leaf node to eth.storage_cids func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel, snapTx snapt.Tx) error { tx := snapTx.(pubTx) _, err := tx.Exec(schema.TableStorageNode.ToInsertStatement(false), storageNode.BlockNumber, storageNode.HeaderID, storageNode.StateKey, storageNode.StorageKey, storageNode.CID, true, storageNode.Value, false) if err != nil { return err } // increment storage node counter. atomic.AddUint64(&p.storageNodeCounter, 1) prom.IncStorageNodeCount() // increment current batch size counter p.currBatchSize += 2 return err } // PublishCode writes code to the ipfs backing pg datastore func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error { c := ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()) tx := snapTx.(pubTx) if _, err := tx.publishIPLD(c, codeBytes, height); err != nil { return err } // increment code node counter. atomic.AddUint64(&p.codeNodeCounter, 1) prom.IncCodeNodeCount() p.currBatchSize++ return nil } func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, error) { var err error // maximum batch size reached, commit the current transaction and begin a new transaction. if maxBatchSize <= p.currBatchSize { if err = tx.Commit(); err != nil { return nil, err } snapTx, err := p.db.Begin(context.Background()) tx = pubTx{Tx: snapTx} if err != nil { return nil, err } p.currBatchSize = 0 } return tx, nil } // logNodeCounters periodically logs the number of node processed. func (p *publisher) logNodeCounters() { t := time.NewTicker(logInterval) for range t.C { p.printNodeCounters("progress") } } func (p *publisher) printNodeCounters(msg string) { log.WithFields(log.Fields{ "runtime": time.Now().Sub(p.startTime).String(), "state nodes": atomic.LoadUint64(&p.stateNodeCounter), "storage nodes": atomic.LoadUint64(&p.storageNodeCounter), "code nodes": atomic.LoadUint64(&p.codeNodeCounter), }).Info(msg) }