ipld-eth-state-snapshot/pkg/snapshot/pg/publisher.go

252 lines
7.6 KiB
Go
Raw Normal View History

2020-07-01 18:44:59 +00:00
// Copyright © 2020 Vulcanize, Inc
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pg
2020-07-01 18:44:59 +00:00
import (
"context"
2021-12-15 07:23:18 +00:00
"fmt"
"math/big"
2021-12-23 07:52:44 +00:00
"sync/atomic"
"time"
2020-07-01 23:07:56 +00:00
2021-12-15 07:23:18 +00:00
"github.com/ethereum/go-ethereum/common"
2020-07-01 18:44:59 +00:00
"github.com/ethereum/go-ethereum/core/types"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
2020-07-01 18:44:59 +00:00
"github.com/multiformats/go-multihash"
"github.com/sirupsen/logrus"
2022-03-09 13:37:33 +00:00
log "github.com/sirupsen/logrus"
2020-07-01 18:44:59 +00:00
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/prom"
snapt "github.com/cerc-io/ipld-eth-state-snapshot/pkg/types"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
2020-07-01 18:44:59 +00:00
)
var _ snapt.Publisher = (*publisher)(nil)
2021-12-23 14:34:34 +00:00
const logInterval = 1 * time.Minute
2021-12-23 07:52:44 +00:00
2021-12-14 06:50:19 +00:00
// Publisher is wrapper around DB.
type publisher struct {
db *postgres.DB
currBatchSize uint
2021-12-23 07:52:44 +00:00
stateNodeCounter uint64
storageNodeCounter uint64
codeNodeCounter uint64
startTime time.Time
2020-07-01 18:44:59 +00:00
}
2021-12-14 06:50:19 +00:00
// NewPublisher creates Publisher
func NewPublisher(db *postgres.DB) *publisher {
return &publisher{
db: db,
startTime: time.Now(),
2020-07-01 18:44:59 +00:00
}
}
type pubTx struct {
sql.Tx
callback func()
}
func (tx pubTx) Rollback() error { return tx.Tx.Rollback(context.Background()) }
func (tx pubTx) Commit() error {
if tx.callback != nil {
defer tx.callback()
}
return tx.Tx.Commit(context.Background())
}
func (tx pubTx) Exec(sql string, args ...interface{}) (sql.Result, error) {
return tx.Tx.Exec(context.Background(), sql, args...)
}
func (p *publisher) BeginTx() (snapt.Tx, error) {
tx, err := p.db.Begin(context.Background())
if err != nil {
return nil, err
}
go p.logNodeCounters()
return pubTx{tx, func() {
2022-03-09 13:37:33 +00:00
p.printNodeCounters("final stats")
}}, nil
}
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
// returns the CID and blockstore prefixed multihash key
func (tx pubTx) publishRaw(codec uint64, raw []byte, height *big.Int) (cid, prefixedKey string, err error) {
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
if err != nil {
return
}
cid = c.String()
2022-04-19 10:19:49 +00:00
prefixedKey, err = tx.publishIPLD(c, raw, height)
return
}
func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
dbKey := dshelp.MultihashToDsKey(c.Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
_, err := tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height.Uint64(), prefixedKey, raw)
return prefixedKey, err
}
// PublishHeader writes the header to the ipfs backing pg datastore and adds secondary indexes in the header_cids table
func (p *publisher) PublishHeader(header *types.Header) (err error) {
2020-07-01 18:44:59 +00:00
headerNode, err := ipld.NewEthHeader(header)
if err != nil {
return err
2020-07-01 18:44:59 +00:00
}
2022-02-16 11:27:02 +00:00
snapTx, err := p.db.Begin(context.Background())
2020-07-01 18:44:59 +00:00
if err != nil {
return err
2020-07-01 18:44:59 +00:00
}
2022-02-16 11:27:02 +00:00
tx := pubTx{snapTx, nil}
defer func() {
err = snapt.CommitOrRollback(tx, err)
if err != nil {
logrus.Errorf("CommitOrRollback failed: %s", err)
}
}()
if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number); err != nil {
return err
2020-07-01 18:44:59 +00:00
}
mhKey := shared.MultihashKeyFromCID(headerNode.Cid())
_, err = tx.Exec(snapt.TableHeader.ToInsertStatement(), header.Number.Uint64(), header.Hash().Hex(),
header.ParentHash.Hex(), headerNode.Cid().String(), "0", p.db.NodeID(), "0",
header.Root.Hex(), header.TxHash.Hex(), header.ReceiptHash.Hex(), header.UncleHash.Hex(),
header.Bloom.Bytes(), header.Time, mhKey, 0, header.Coinbase.String())
return err
2020-07-01 18:44:59 +00:00
}
// PublishStateNode writes the state node to the ipfs backing datastore and adds secondary indexes in the state_cids table
func (p *publisher) PublishStateNode(node *snapt.Node, headerID string, height *big.Int, snapTx snapt.Tx) error {
2020-07-01 18:44:59 +00:00
var stateKey string
if !snapt.IsNullHash(node.Key) {
stateKey = node.Key.Hex()
2020-07-01 18:44:59 +00:00
}
2022-02-16 11:27:02 +00:00
tx := snapTx.(pubTx)
2022-04-19 10:19:49 +00:00
stateCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStateTrie, node.Value, height)
2020-07-01 18:44:59 +00:00
if err != nil {
return err
2020-07-01 18:44:59 +00:00
}
_, err = tx.Exec(snapt.TableStateNode.ToInsertStatement(),
height.Uint64(), headerID, stateKey, stateCIDStr, node.Path, node.NodeType, false, mhKey)
if err != nil {
return err
}
2021-12-23 07:52:44 +00:00
// increment state node counter.
atomic.AddUint64(&p.stateNodeCounter, 1)
prom.IncStateNodeCount()
2021-12-23 07:52:44 +00:00
// increment current batch size counter
p.currBatchSize += 2
return err
2020-07-01 18:44:59 +00:00
}
// PublishStorageNode writes the storage node to the ipfs backing pg datastore and adds secondary indexes in the storage_cids table
func (p *publisher) PublishStorageNode(node *snapt.Node, headerID string, height *big.Int, statePath []byte, snapTx snapt.Tx) error {
2020-07-01 18:44:59 +00:00
var storageKey string
if !snapt.IsNullHash(node.Key) {
storageKey = node.Key.Hex()
2020-07-01 18:44:59 +00:00
}
2022-02-16 11:27:02 +00:00
tx := snapTx.(pubTx)
2022-04-19 10:19:49 +00:00
storageCIDStr, mhKey, err := tx.publishRaw(ipld.MEthStorageTrie, node.Value, height)
2020-07-01 18:44:59 +00:00
if err != nil {
return err
}
_, err = tx.Exec(snapt.TableStorageNode.ToInsertStatement(),
height.Uint64(), headerID, statePath, storageKey, storageCIDStr, node.Path, node.NodeType, false, mhKey)
if err != nil {
return err
}
2021-12-23 07:52:44 +00:00
// increment storage node counter.
atomic.AddUint64(&p.storageNodeCounter, 1)
prom.IncStorageNodeCount()
2021-12-23 07:52:44 +00:00
// increment current batch size counter
p.currBatchSize += 2
return err
2020-07-01 18:44:59 +00:00
}
// PublishCode writes code to the ipfs backing pg datastore
func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
// no codec for code, doesn't matter though since blockstore key is multihash-derived
2021-12-15 07:23:18 +00:00
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
if err != nil {
2021-12-15 07:23:18 +00:00
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
2022-02-16 11:27:02 +00:00
tx := snapTx.(pubTx)
if _, err = tx.Exec(snapt.TableIPLDBlock.ToInsertStatement(), height.Uint64(), mhKey, codeBytes); err != nil {
2021-12-15 07:23:18 +00:00
return fmt.Errorf("error publishing code IPLD: %v", err)
}
// increment code node counter.
atomic.AddUint64(&p.codeNodeCounter, 1)
prom.IncCodeNodeCount()
2021-12-23 07:52:44 +00:00
p.currBatchSize++
return nil
}
func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, error) {
var err error
// maximum batch size reached, commit the current transaction and begin a new transaction.
if maxBatchSize <= p.currBatchSize {
if err = tx.Commit(); err != nil {
return nil, err
}
2022-02-16 11:27:02 +00:00
snapTx, err := p.db.Begin(context.Background())
tx = pubTx{Tx: snapTx}
if err != nil {
return nil, err
}
p.currBatchSize = 0
}
return tx, nil
}
2021-12-23 07:52:44 +00:00
// logNodeCounters periodically logs the number of node processed.
func (p *publisher) logNodeCounters() {
2021-12-23 14:34:34 +00:00
t := time.NewTicker(logInterval)
2021-12-23 07:52:44 +00:00
for range t.C {
2022-03-09 13:37:33 +00:00
p.printNodeCounters("progress")
2021-12-23 07:52:44 +00:00
}
}
2022-03-09 13:37:33 +00:00
func (p *publisher) printNodeCounters(msg string) {
log.WithFields(log.Fields{
"runtime": time.Now().Sub(p.startTime).String(),
"state nodes": atomic.LoadUint64(&p.stateNodeCounter),
"storage nodes": atomic.LoadUint64(&p.storageNodeCounter),
"code nodes": atomic.LoadUint64(&p.codeNodeCounter),
}).Info(msg)
}