update publisher interface

This commit is contained in:
i-norden 2023-05-10 13:11:12 -05:00
parent 0a04baab17
commit 4ee75a3371
8 changed files with 16 additions and 497 deletions

View File

@ -1,63 +0,0 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package cmd
import (
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/cerc-io/ipld-eth-state-snapshot/pkg/snapshot"
)
// inPlaceStateSnapshotCmd represents the inPlaceStateSnapshot command
var inPlaceStateSnapshotCmd = &cobra.Command{
Use: "inPlaceStateSnapshot",
Short: "Take an in-place state snapshot in the database",
Long: `Usage:
./ipld-eth-state-snapshot inPlaceStateSnapshot --config={path to toml config file}`,
Run: func(cmd *cobra.Command, args []string) {
subCommand = cmd.CalledAs()
logWithCommand = *logrus.WithField("SubCommand", subCommand)
inPlaceStateSnapshot()
},
}
func inPlaceStateSnapshot() {
config := snapshot.NewInPlaceSnapshotConfig()
startHeight := viper.GetUint64(snapshot.SNAPSHOT_START_HEIGHT_TOML)
endHeight := viper.GetUint64(snapshot.SNAPSHOT_END_HEIGHT_TOML)
params := snapshot.InPlaceSnapshotParams{StartHeight: uint64(startHeight), EndHeight: uint64(endHeight)}
if err := snapshot.CreateInPlaceSnapshot(config, params); err != nil {
logWithCommand.Fatal(err)
}
logWithCommand.Infof("snapshot taken at height %d starting from height %d", endHeight, startHeight)
}
func init() {
rootCmd.AddCommand(inPlaceStateSnapshotCmd)
inPlaceStateSnapshotCmd.PersistentFlags().String(snapshot.SNAPSHOT_START_HEIGHT_CLI, "", "start block height for in-place snapshot")
inPlaceStateSnapshotCmd.PersistentFlags().String(snapshot.SNAPSHOT_END_HEIGHT_CLI, "", "end block height for in-place snapshot")
viper.BindPFlag(snapshot.SNAPSHOT_START_HEIGHT_TOML, inPlaceStateSnapshotCmd.PersistentFlags().Lookup(snapshot.SNAPSHOT_START_HEIGHT_CLI))
viper.BindPFlag(snapshot.SNAPSHOT_END_HEIGHT_TOML, inPlaceStateSnapshotCmd.PersistentFlags().Lookup(snapshot.SNAPSHOT_END_HEIGHT_CLI))
}

5
fixture/leaf_keys.go Normal file
View File

@ -0,0 +1,5 @@
package fixture
var Block1_StateNodeLeafKeys = [][]byte{
}

View File

@ -1,359 +0,0 @@
package fixture
var Block1_StateNodePaths = [][]byte{
[]byte{},
[]byte{0},
[]byte{0, 0},
[]byte{0, 2},
[]byte{0, 2, 1},
[]byte{0, 2, 8},
[]byte{0, 2, 12},
[]byte{0, 3},
[]byte{0, 4},
[]byte{0, 6},
[]byte{0, 6, 3},
[]byte{0, 6, 13},
[]byte{0, 7},
[]byte{0, 8},
[]byte{0, 8, 7},
[]byte{0, 8, 11},
[]byte{0, 9},
[]byte{0, 9, 9},
[]byte{0, 9, 10},
[]byte{0, 12},
[]byte{0, 13},
[]byte{0, 14},
[]byte{1},
[]byte{1, 2},
[]byte{1, 2, 5},
[]byte{1, 2, 7},
[]byte{1, 3},
[]byte{1, 3, 1},
[]byte{1, 3, 11},
[]byte{1, 4},
[]byte{1, 5},
[]byte{1, 5, 11},
[]byte{1, 5, 12},
[]byte{1, 5, 15},
[]byte{1, 6},
[]byte{1, 8},
[]byte{1, 10},
[]byte{1, 13},
[]byte{1, 14},
[]byte{1, 14, 2},
[]byte{1, 14, 11},
[]byte{1, 15},
[]byte{1, 15, 9},
[]byte{1, 15, 15},
[]byte{2},
[]byte{2, 0},
[]byte{2, 0, 9},
[]byte{2, 0, 14},
[]byte{2, 1},
[]byte{2, 1, 1},
[]byte{2, 1, 3},
[]byte{2, 1, 14},
[]byte{2, 5},
[]byte{2, 6},
[]byte{2, 9},
[]byte{2, 9, 1},
[]byte{2, 9, 7},
[]byte{2, 11},
[]byte{2, 11, 7},
[]byte{2, 11, 13},
[]byte{2, 13},
[]byte{2, 13, 1},
[]byte{2, 13, 15},
[]byte{2, 15},
[]byte{3},
[]byte{3, 0},
[]byte{3, 0, 0},
[]byte{3, 0, 1},
[]byte{3, 2},
[]byte{3, 2, 3},
[]byte{3, 2, 15},
[]byte{3, 3},
[]byte{3, 4},
[]byte{3, 4, 2},
[]byte{3, 4, 4},
[]byte{3, 4, 5},
[]byte{3, 6},
[]byte{3, 8},
[]byte{3, 9},
[]byte{3, 10},
[]byte{3, 10, 2},
[]byte{3, 10, 8},
[]byte{3, 10, 12},
[]byte{3, 11},
[]byte{3, 12},
[]byte{3, 13},
[]byte{3, 14},
[]byte{3, 14, 4},
[]byte{3, 14, 9},
[]byte{3, 14, 14},
[]byte{3, 14, 14, 10},
[]byte{3, 14, 14, 15},
[]byte{4},
[]byte{4, 0},
[]byte{4, 0, 6},
[]byte{4, 0, 15},
[]byte{4, 1},
[]byte{4, 2},
[]byte{4, 2, 1},
[]byte{4, 2, 11},
[]byte{4, 3},
[]byte{4, 5},
[]byte{4, 6},
[]byte{4, 7},
[]byte{4, 8},
[]byte{4, 11},
[]byte{4, 11, 6},
[]byte{4, 11, 9},
[]byte{4, 11, 12},
[]byte{4, 14},
[]byte{5},
[]byte{5, 0},
[]byte{5, 0, 3},
[]byte{5, 0, 9},
[]byte{5, 0, 15},
[]byte{5, 1},
[]byte{5, 1, 14},
[]byte{5, 1, 15},
[]byte{5, 2},
[]byte{5, 2, 8},
[]byte{5, 2, 10},
[]byte{5, 3},
[]byte{5, 4},
[]byte{5, 4, 6},
[]byte{5, 4, 12},
[]byte{5, 6},
[]byte{5, 8},
[]byte{5, 8, 3},
[]byte{5, 8, 11},
[]byte{5, 10},
[]byte{5, 11},
[]byte{5, 12},
[]byte{5, 13},
[]byte{5, 15},
[]byte{6},
[]byte{6, 0},
[]byte{6, 2},
[]byte{6, 2, 3},
[]byte{6, 2, 9},
[]byte{6, 4},
[]byte{6, 4, 0},
[]byte{6, 4, 0, 0},
[]byte{6, 4, 0, 5},
[]byte{6, 5},
[]byte{6, 5, 4},
[]byte{6, 5, 10},
[]byte{6, 5, 12},
[]byte{6, 5, 13},
[]byte{6, 6},
[]byte{6, 6, 0},
[]byte{6, 6, 8},
[]byte{6, 8},
[]byte{6, 8, 4},
[]byte{6, 8, 4, 2},
[]byte{6, 8, 4, 9},
[]byte{6, 8, 9},
[]byte{6, 10},
[]byte{6, 10, 1},
[]byte{6, 10, 14},
[]byte{6, 11},
[]byte{6, 11, 2},
[]byte{6, 11, 12},
[]byte{6, 11, 14},
[]byte{6, 13},
[]byte{6, 13, 2},
[]byte{6, 13, 12},
[]byte{7},
[]byte{7, 1},
[]byte{7, 5},
[]byte{7, 7},
[]byte{7, 8},
[]byte{7, 8, 2},
[]byte{7, 8, 5},
[]byte{7, 9},
[]byte{7, 13},
[]byte{7, 13, 1},
[]byte{7, 13, 1, 0},
[]byte{7, 13, 1, 13},
[]byte{7, 13, 7},
[]byte{7, 14},
[]byte{7, 14, 8},
[]byte{7, 14, 11},
[]byte{8},
[]byte{8, 0},
[]byte{8, 0, 3},
[]byte{8, 0, 11},
[]byte{8, 2},
[]byte{8, 4},
[]byte{8, 8},
[]byte{8, 9},
[]byte{8, 9, 3},
[]byte{8, 9, 13},
[]byte{8, 10},
[]byte{8, 12},
[]byte{8, 12, 3},
[]byte{8, 12, 15},
[]byte{8, 13},
[]byte{8, 15},
[]byte{8, 15, 8},
[]byte{8, 15, 13},
[]byte{9},
[]byte{9, 0},
[]byte{9, 5},
[]byte{9, 6},
[]byte{9, 6, 10},
[]byte{9, 6, 14},
[]byte{9, 7},
[]byte{9, 9},
[]byte{9, 14},
[]byte{9, 15},
[]byte{9, 15, 0},
[]byte{9, 15, 4},
[]byte{9, 15, 10},
[]byte{10},
[]byte{10, 0},
[]byte{10, 0, 9},
[]byte{10, 0, 10},
[]byte{10, 0, 15},
[]byte{10, 2},
[]byte{10, 3},
[]byte{10, 6},
[]byte{10, 8},
[]byte{10, 9},
[]byte{10, 10},
[]byte{10, 10, 5},
[]byte{10, 10, 8},
[]byte{10, 13},
[]byte{10, 13, 0},
[]byte{10, 13, 13},
[]byte{10, 14},
[]byte{10, 14, 4},
[]byte{10, 14, 11},
[]byte{10, 14, 11, 8},
[]byte{10, 14, 11, 14},
[]byte{10, 15},
[]byte{11},
[]byte{11, 0},
[]byte{11, 0, 2},
[]byte{11, 0, 15},
[]byte{11, 1},
[]byte{11, 2},
[]byte{11, 3},
[]byte{11, 4},
[]byte{11, 5},
[]byte{11, 7},
[]byte{11, 7, 12},
[]byte{11, 7, 15},
[]byte{11, 8},
[]byte{11, 8, 8},
[]byte{11, 8, 15},
[]byte{11, 9},
[]byte{11, 11},
[]byte{11, 12},
[]byte{11, 13},
[]byte{11, 14},
[]byte{11, 14, 0},
[]byte{11, 14, 0, 1},
[]byte{11, 14, 0, 3},
[]byte{11, 14, 8},
[]byte{11, 14, 13},
[]byte{12},
[]byte{12, 0},
[]byte{12, 0, 0},
[]byte{12, 0, 1},
[]byte{12, 0, 1, 3},
[]byte{12, 0, 1, 11},
[]byte{12, 0, 15},
[]byte{12, 2},
[]byte{12, 2, 9},
[]byte{12, 2, 12},
[]byte{12, 4},
[]byte{12, 5},
[]byte{12, 6},
[]byte{12, 6, 0},
[]byte{12, 6, 4},
[]byte{12, 6, 14},
[]byte{12, 7},
[]byte{12, 7, 0},
[]byte{12, 7, 12},
[]byte{12, 7, 13},
[]byte{12, 9},
[]byte{12, 11},
[]byte{12, 12},
[]byte{13},
[]byte{13, 2},
[]byte{13, 2, 0},
[]byte{13, 2, 2},
[]byte{13, 2, 4},
[]byte{13, 3},
[]byte{13, 3, 7},
[]byte{13, 3, 10},
[]byte{13, 5},
[]byte{13, 8},
[]byte{13, 8, 1},
[]byte{13, 8, 15},
[]byte{13, 9},
[]byte{13, 9, 0},
[]byte{13, 9, 14},
[]byte{13, 10},
[]byte{13, 12},
[]byte{13, 12, 8},
[]byte{13, 12, 11},
[]byte{13, 13},
[]byte{13, 13, 7},
[]byte{13, 13, 12},
[]byte{13, 14},
[]byte{14},
[]byte{14, 0},
[]byte{14, 1},
[]byte{14, 2},
[]byte{14, 2, 2},
[]byte{14, 2, 12},
[]byte{14, 3},
[]byte{14, 4},
[]byte{14, 5},
[]byte{14, 6},
[]byte{14, 6, 9},
[]byte{14, 6, 12},
[]byte{14, 7},
[]byte{14, 7, 4},
[]byte{14, 7, 12},
[]byte{14, 8},
[]byte{14, 8, 3},
[]byte{14, 8, 12},
[]byte{14, 8, 12, 0},
[]byte{14, 8, 12, 6},
[]byte{14, 10},
[]byte{14, 10, 6},
[]byte{14, 10, 12},
[]byte{14, 11},
[]byte{14, 11, 8},
[]byte{14, 11, 13},
[]byte{14, 12},
[]byte{14, 14},
[]byte{14, 14, 3},
[]byte{14, 14, 9},
[]byte{15},
[]byte{15, 0},
[]byte{15, 5},
[]byte{15, 6},
[]byte{15, 9},
[]byte{15, 9, 0},
[]byte{15, 9, 2},
[]byte{15, 9, 3},
[]byte{15, 11},
[]byte{15, 11, 1},
[]byte{15, 11, 6},
[]byte{15, 12},
[]byte{15, 12, 3},
[]byte{15, 12, 14},
[]byte{15, 12, 14, 7},
[]byte{15, 12, 14, 13},
[]byte{15, 13},
[]byte{15, 14},
[]byte{15, 15},
}

View File

@ -28,10 +28,8 @@ import (
"github.com/lib/pq" "github.com/lib/pq"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/models"
@ -162,23 +160,12 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
return fileTx{writers}, nil return fileTx{writers}, nil
} }
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height *big.Int) error {
// returns the CID return tx.write(&schema.TableIPLDBlock, height.String(), c.String(), raw)
func (tx fileWriters) publishRaw(codec uint64, raw []byte, height *big.Int) (cid string, err error) {
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
if err != nil {
return
}
cid = c.String()
return tx.publishIPLD(c, raw, height)
}
func (tx fileWriters) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
return c.String(), tx.write(&schema.TableIPLDBlock, height.String(), c.String(), raw)
} }
// PublishIPLD writes an IPLD to the ipld.blocks blockstore // PublishIPLD writes an IPLD to the ipld.blocks blockstore
func (p *publisher) PublishIPLD(c cid.Cid, raw []byte, height *big.Int, snapTx snapt.Tx) (string, error) { func (p *publisher) PublishIPLD(c cid.Cid, raw []byte, height *big.Int, snapTx snapt.Tx) error {
tx := snapTx.(fileTx) tx := snapTx.(fileTx)
return tx.publishIPLD(c, raw, height) return tx.publishIPLD(c, raw, height)
} }
@ -190,7 +177,7 @@ func (p *publisher) PublishHeader(header *types.Header) error {
if err != nil { if err != nil {
return err return err
} }
if _, err = p.writers.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number); err != nil { if err := p.writers.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number); err != nil {
return err return err
} }
@ -272,22 +259,6 @@ func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel,
return nil return nil
} }
// PublishCode writes code to the ipfs backing pg datastore
func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
c := ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes())
tx := snapTx.(fileTx)
if _, err := tx.publishIPLD(c, codeBytes, height); err != nil {
return err
}
// increment code node counter.
atomic.AddUint64(&p.codeNodeCounter, 1)
prom.IncCodeNodeCount()
p.currBatchSize++
return nil
}
func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, error) { func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, error) {
return tx, nil return tx, nil
} }

View File

@ -39,8 +39,7 @@ func writeFiles(t *testing.T, dir string) *publisher {
tx, err := pub.BeginTx() tx, err := pub.BeginTx()
test.NoError(t, err) test.NoError(t, err)
headerID := fixt.Block1_Header.Hash().String() test.NoError(t, pub.PublishStateLeafNode(&fixt.Block1_StateNode0, tx))
test.NoError(t, pub.PublishStateLeafNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
test.NoError(t, tx.Commit()) test.NoError(t, tx.Commit())
return pub return pub

View File

@ -24,11 +24,9 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/multiformats/go-multihash"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
@ -89,24 +87,13 @@ func (p *publisher) BeginTx() (snapt.Tx, error) {
}}, nil }}, nil
} }
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height *big.Int) error {
// returns the CID
func (tx pubTx) publishRaw(codec uint64, raw []byte, height *big.Int) (cid string, err error) {
c, err := ipld.RawdataToCid(codec, raw, multihash.KECCAK_256)
if err != nil {
return
}
cid = c.String()
return tx.publishIPLD(c, raw, height)
}
func (tx pubTx) publishIPLD(c cid.Cid, raw []byte, height *big.Int) (string, error) {
_, err := tx.Exec(schema.TableIPLDBlock.ToInsertStatement(false), height.Uint64(), c.String(), raw) _, err := tx.Exec(schema.TableIPLDBlock.ToInsertStatement(false), height.Uint64(), c.String(), raw)
return c.String(), err return err
} }
// PublishIPLD writes an IPLD to the ipld.blocks blockstore // PublishIPLD writes an IPLD to the ipld.blocks blockstore
func (p *publisher) PublishIPLD(c cid.Cid, raw []byte, height *big.Int, snapTx snapt.Tx) (string, error) { func (p *publisher) PublishIPLD(c cid.Cid, raw []byte, height *big.Int, snapTx snapt.Tx) error {
tx := snapTx.(pubTx) tx := snapTx.(pubTx)
return tx.publishIPLD(c, raw, height) return tx.publishIPLD(c, raw, height)
} }
@ -130,7 +117,7 @@ func (p *publisher) PublishHeader(header *types.Header) (err error) {
} }
}() }()
if _, err = tx.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number); err != nil { if err := tx.publishIPLD(headerNode.Cid(), headerNode.RawData(), header.Number); err != nil {
return err return err
} }
@ -202,23 +189,6 @@ func (p *publisher) PublishStorageLeafNode(storageNode *models.StorageNodeModel,
return err return err
} }
// PublishCode writes code to the ipfs backing pg datastore
func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, snapTx snapt.Tx) error {
c := ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes())
tx := snapTx.(pubTx)
if _, err := tx.publishIPLD(c, codeBytes, height); err != nil {
return err
}
// increment code node counter.
atomic.AddUint64(&p.codeNodeCounter, 1)
prom.IncCodeNodeCount()
p.currBatchSize++
return nil
}
func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, error) { func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, error) {
var err error var err error
// maximum batch size reached, commit the current transaction and begin a new transaction. // maximum batch size reached, commit the current transaction and begin a new transaction.

View File

@ -33,9 +33,7 @@ func writeData(t *testing.T, db *postgres.DB) *publisher {
tx, err := pub.BeginTx() tx, err := pub.BeginTx()
test.NoError(t, err) test.NoError(t, err)
headerID := fixt.Block1_Header.Hash().String() test.NoError(t, pub.PublishStateLeafNode(&fixt.Block1_StateNode0, tx))
stateNode := &fixt.Block1_StateNode0
test.NoError(t, pub.PublishStateLeafNode(&fixt.Block1_StateNode0, headerID, fixt.Block1_Header.Number, tx))
test.NoError(t, tx.Commit()) test.NoError(t, tx.Commit())
return pub return pub

View File

@ -6,7 +6,6 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
@ -14,8 +13,7 @@ type Publisher interface {
PublishHeader(header *types.Header) error PublishHeader(header *types.Header) error
PublishStateLeafNode(node *models.StateNodeModel, tx Tx) error PublishStateLeafNode(node *models.StateNodeModel, tx Tx) error
PublishStorageLeafNode(node *models.StorageNodeModel, tx Tx) error PublishStorageLeafNode(node *models.StorageNodeModel, tx Tx) error
PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, tx Tx) error PublishIPLD(c cid.Cid, raw []byte, height *big.Int, tx Tx) error
PublishIPLD(c cid.Cid, raw []byte, height *big.Int, tx Tx) (string, error)
BeginTx() (Tx, error) BeginTx() (Tx, error)
PrepareTxForBatch(tx Tx, batchSize uint) (Tx, error) PrepareTxForBatch(tx Tx, batchSize uint) (Tx, error)
} }