diff --git a/cmd/geth/config.go b/cmd/geth/config.go index bb93509e0..d77e261f9 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -206,11 +206,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } switch dbType { case shared.FILE: - if !ctx.GlobalIsSet(utils.StateDiffFileNodeRowIDFlag.Name) { - utils.Fatalf("In statediff file writing mode a node row ID must be provided") - } indexerConfig = file.Config{ - NodeID: int64(ctx.GlobalInt(utils.StateDiffFileNodeRowIDFlag.Name)), FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name), } case shared.POSTGRES: diff --git a/cmd/geth/main.go b/cmd/geth/main.go index e027eddf9..990b40a60 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -167,7 +167,6 @@ var ( utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, utils.StateDiffWorkersFlag, - utils.StateDiffFileNodeRowIDFlag, utils.StateDiffFilePath, configFileFlag, utils.CatalystFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 7f768c11a..885cc2c16 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -243,7 +243,6 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, utils.StateDiffWorkersFlag, - utils.StateDiffFileNodeRowIDFlag, utils.StateDiffFilePath, }, }, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 26f904aaa..ccc9ac89e 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -852,10 +852,6 @@ var ( Name: "statediff.db.nodeid", Usage: "Node ID to use when writing state diffs to database", } - StateDiffFileNodeRowIDFlag = cli.IntFlag{ - Name: "statediff.file.nodeid", - Usage: "Node row ID to use as FK when writing state diffs to database", - } StateDiffFilePath = cli.StringFlag{ Name: "statediff.file.path", Usage: "Full path (including filename) to write statediff data out to when operating in file mode", diff --git a/statediff/README.md b/statediff/README.md index 97666d50a..7170363ae 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -79,7 +79,7 @@ This service introduces a CLI flag namespace `statediff` `--statediff` flag is used to turn on the service `--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database `--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database -`--statediff.db.type` is the type of database we write out to (current options: postgres and dump) +`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file) `--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard) `--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx) `--statediff.db.host` is the hostname/ip to dial to connect to the database @@ -95,6 +95,7 @@ This service introduces a CLI flag namespace `statediff` `--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds) `--statediff.db.nodeid` is the node id to use in the Postgres database `--statediff.db.clientname` is the client name to use in the Postgres database +`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`) diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go index 60e6dadb7..bfb746080 100644 --- a/statediff/indexer/constructor.go +++ b/statediff/indexer/constructor.go @@ -38,6 +38,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n if !ok { return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{}) } + fc.NodeInfo = nodeInfo return file.NewStateDiffIndexer(ctx, chainConfig, fc) case shared.POSTGRES: pgc, ok := config.(postgres.Config) diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go index fd9e488c9..2553174a3 100644 --- a/statediff/indexer/database/file/config.go +++ b/statediff/indexer/database/file/config.go @@ -17,13 +17,14 @@ package file import ( + "github.com/ethereum/go-ethereum/statediff/indexer/node" "github.com/ethereum/go-ethereum/statediff/indexer/shared" ) // Config holds params for writing sql statements out to a file type Config struct { - NodeID int64 // this is the nodeID used as FK in eth.header_cids FilePath string + NodeInfo node.Info } // Type satisfies interfaces.Config diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 57c5c2e1d..1cc19480a 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -55,7 +55,7 @@ var ( type StateDiffIndexer struct { writer *SQLWriter chainConfig *params.ChainConfig - nodeID int64 + nodeID string wg *sync.WaitGroup } @@ -78,7 +78,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c return &StateDiffIndexer{ writer: w, chainConfig: chainConfig, - nodeID: config.NodeID, + nodeID: config.NodeInfo.ID, wg: wg, }, nil } diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go index d95fba328..5ee169229 100644 --- a/statediff/indexer/database/file/writer.go +++ b/statediff/indexer/database/file/writer.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" + nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" ) var ( @@ -102,15 +103,18 @@ func (sqw *SQLWriter) flush() error { } const ( + nodeInsert = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES (%s, %s, %s, %s, %d) + ON CONFLICT (node_id) DO NOTHING;\n` + ipldInsert = `INSERT INTO public.blocks (key, data) VALUES (%s, %x) ON CONFLICT (key) DO NOTHING;\n` headerInsert = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) -VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d) -ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n` +VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d) +ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n` headerInsertWithoutBaseFee = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) -VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL) -ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n` +VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL) +ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n` uncleInsert = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (block_hash) DO NOTHING;\n` @@ -137,6 +141,12 @@ ON CONFLICT (header_id, state_path) DO NOTHING;\n` ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n` ) +// ON CONFLICT (node_id) DO UPDATE SET genesis_block = %s, network_id = %s, client_name = %s, chain_id = %s;\n` + +func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { + sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)) +} + func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) { sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.Key, ipld.Data)) } diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go index 3ed1a11e7..445b35d9b 100644 --- a/statediff/indexer/database/sql/interfaces.go +++ b/statediff/indexer/database/sql/interfaces.go @@ -36,7 +36,7 @@ type Driver interface { Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error Begin(ctx context.Context) (Tx, error) Stats() Stats - NodeID() int64 + NodeID() string Context() context.Context io.Closer } diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index 26cc660ec..a86927341 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -26,7 +26,6 @@ import ( "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - "github.com/jmoiron/sqlx" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" @@ -153,7 +152,7 @@ func TestPGXIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { setupPGX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, block_hash, base_fee + pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed @@ -165,7 +164,12 @@ func TestPGXIndexer(t *testing.T) { BaseFee *int64 `db:"base_fee"` } header := new(res) - err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) + err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan( + &header.CID, + &header.TD, + &header.Reward, + &header.BlockHash, + &header.BaseFee) if err != nil { t.Fatal(err) } @@ -221,43 +225,43 @@ func TestPGXIndexer(t *testing.T) { switch c { case trx1CID.String(): test_helpers.ExpectEqual(t, data, tx1) - var txType *uint8 + var txType uint8 err = db.Get(context.Background(), &txType, txTypePgStr, c) if err != nil { t.Fatal(err) } - if txType != nil { - t.Fatalf("expected nil tx_type, got %d", *txType) + if txType != 0 { + t.Fatalf("expected tx_type 0, got %d", txType) } case trx2CID.String(): test_helpers.ExpectEqual(t, data, tx2) - var txType *uint8 + var txType uint8 err = db.Get(context.Background(), &txType, txTypePgStr, c) if err != nil { t.Fatal(err) } - if txType != nil { - t.Fatalf("expected nil tx_type, got %d", *txType) + if txType != 0 { + t.Fatalf("expected tx_type 0, got %d", txType) } case trx3CID.String(): test_helpers.ExpectEqual(t, data, tx3) - var txType *uint8 + var txType uint8 err = db.Get(context.Background(), &txType, txTypePgStr, c) if err != nil { t.Fatal(err) } - if txType != nil { - t.Fatalf("expected nil tx_type, got %d", *txType) + if txType != 0 { + t.Fatalf("expected tx_type 0, got %d", txType) } case trx4CID.String(): test_helpers.ExpectEqual(t, data, tx4) - var txType *uint8 + var txType uint8 err = db.Get(context.Background(), &txType, txTypePgStr, c) if err != nil { t.Fatal(err) } - if *txType != types.AccessListTxType { - t.Fatalf("expected AccessListTxType (1), got %d", *txType) + if txType != types.AccessListTxType { + t.Fatalf("expected AccessListTxType (1), got %d", txType) } accessListElementModels := make([]models.AccessListElementModel, 0) pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC` @@ -467,7 +471,7 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2` + pgStr = `SELECT header_id, state_path, cast(balance AS TEXT), nonce, code_hash, storage_root from eth.state_accounts WHERE header_id = $1 AND state_path = $2` var account models.StateAccountModel err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path) if err != nil { diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 825e50163..5794bd0af 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -30,7 +30,6 @@ type DriverType string const ( PGX DriverType = "PGX" SQLX DriverType = "SQLX" - FILE DriverType = "File" Unknown DriverType = "Unknown" ) @@ -41,8 +40,6 @@ func ResolveDriverType(str string) (DriverType, error) { return PGX, nil case "sqlx": return SQLX, nil - case "file": - return FILE, nil default: return Unknown, fmt.Errorf("unrecognized driver type string: %s", str) } @@ -52,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) { var DefaultConfig = Config{ Hostname: "localhost", Port: 5432, - DatabaseName: "vulcanize_test", + DatabaseName: "vulcanize_testing", Username: "postgres", - Password: "", + Password: "password", } // Config holds params for a Postgres db diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index ef091760d..213638017 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -22,14 +22,7 @@ var _ sql.Database = &DB{} const ( createNodeStm = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (genesis_block, network_id, node_id, chain_id) - DO UPDATE - SET genesis_block = $1, - network_id = $2, - node_id = $3, - client_name = $4, - chain_id = $5 - RETURNING id` + ON CONFLICT (node_id) DO NOTHING` ) // NewPostgresDB returns a postgres.DB using the provided driver @@ -37,7 +30,7 @@ func NewPostgresDB(driver sql.Driver) *DB { return &DB{driver} } -// DB implements sql.Databse using a configured driver and Postgres statement syntax +// DB implements sql.Database using a configured driver and Postgres statement syntax type DB struct { sql.Driver } diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index fa9b84dd0..936a3765d 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -34,7 +34,7 @@ type PGXDriver struct { ctx context.Context pool *pgxpool.Pool nodeInfo node.Info - nodeID int64 + nodeID string } // NewPGXDriver returns a new pgx driver @@ -89,17 +89,16 @@ func MakeConfig(config Config) (*pgxpool.Config, error) { } func (pgx *PGXDriver) createNode() error { - var nodeID int64 - err := pgx.pool.QueryRow( + _, err := pgx.pool.Exec( pgx.ctx, createNodeStm, pgx.nodeInfo.GenesisBlock, pgx.nodeInfo.NetworkID, pgx.nodeInfo.ID, pgx.nodeInfo.ClientName, - pgx.nodeInfo.ChainID).Scan(&nodeID) + pgx.nodeInfo.ChainID) if err != nil { return ErrUnableToSetNode(err) } - pgx.nodeID = nodeID + pgx.nodeID = pgx.nodeInfo.ID return nil } @@ -139,7 +138,7 @@ func (pgx *PGXDriver) Stats() sql.Stats { } // NodeID satisfies sql.Database -func (pgx *PGXDriver) NodeID() int64 { +func (pgx *PGXDriver) NodeID() string { return pgx.nodeID } diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go index 0bbd0d9e9..406b44a19 100644 --- a/statediff/indexer/database/sql/postgres/sqlx.go +++ b/statediff/indexer/database/sql/postgres/sqlx.go @@ -32,7 +32,7 @@ type SQLXDriver struct { ctx context.Context db *sqlx.DB nodeInfo node.Info - nodeID int64 + nodeID string } // NewSQLXDriver returns a new sqlx driver for Postgres @@ -60,16 +60,15 @@ func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDri } func (driver *SQLXDriver) createNode() error { - var nodeID int64 - err := driver.db.QueryRowx( + _, err := driver.db.Exec( createNodeStm, driver.nodeInfo.GenesisBlock, driver.nodeInfo.NetworkID, driver.nodeInfo.ID, driver.nodeInfo.ClientName, - driver.nodeInfo.ChainID).Scan(&nodeID) + driver.nodeInfo.ChainID) if err != nil { return ErrUnableToSetNode(err) } - driver.nodeID = nodeID + driver.nodeID = driver.nodeInfo.ID return nil } @@ -108,7 +107,7 @@ func (driver *SQLXDriver) Stats() sql.Stats { } // NodeID satisfies sql.Database -func (driver *SQLXDriver) NodeID() int64 { +func (driver *SQLXDriver) NodeID() string { return driver.nodeID } diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index 60d83d96e..d37aa5449 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -32,7 +32,7 @@ type HeaderModel struct { CID string `db:"cid"` MhKey string `db:"mh_key"` TotalDifficulty string `db:"td"` - NodeID int64 `db:"node_id"` + NodeID string `db:"node_id"` Reward string `db:"reward"` StateRoot string `db:"state_root"` UncleRoot string `db:"uncle_root"` diff --git a/statediff/test_helpers/mocks/service_test.go b/statediff/test_helpers/mocks/service_test.go index b3b77d4bf..dde784316 100644 --- a/statediff/test_helpers/mocks/service_test.go +++ b/statediff/test_helpers/mocks/service_test.go @@ -24,6 +24,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -150,29 +151,35 @@ func testSubscriptionAPI(t *testing.T) { id := rpc.NewID() payloadChan := make(chan statediff.Payload) quitChan := make(chan bool) + wg := new(sync.WaitGroup) + go func() { + wg.Add(1) + defer wg.Done() + sort.Slice(expectedStateDiffBytes, func(i, j int) bool { return expectedStateDiffBytes[i] < expectedStateDiffBytes[j] }) + select { + case payload := <-payloadChan: + if !bytes.Equal(payload.BlockRlp, expectedBlockRlp) { + t.Errorf("payload does not have expected block\r\nactual block rlp: %v\r\nexpected block rlp: %v", payload.BlockRlp, expectedBlockRlp) + } + sort.Slice(payload.StateObjectRlp, func(i, j int) bool { return payload.StateObjectRlp[i] < payload.StateObjectRlp[j] }) + if !bytes.Equal(payload.StateObjectRlp, expectedStateDiffBytes) { + t.Errorf("payload does not have expected state diff\r\nactual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateObjectRlp, expectedStateDiffBytes) + } + if !bytes.Equal(expectedReceiptBytes, payload.ReceiptsRlp) { + t.Errorf("payload does not have expected receipts\r\nactual receipt rlp: %v\r\nexpected receipt rlp: %v", payload.ReceiptsRlp, expectedReceiptBytes) + } + if !bytes.Equal(payload.TotalDifficulty.Bytes(), mockTotalDifficulty.Bytes()) { + t.Errorf("payload does not have expected total difficulty\r\nactual td: %d\r\nexpected td: %d", payload.TotalDifficulty.Int64(), mockTotalDifficulty.Int64()) + } + case <-quitChan: + t.Errorf("channel quit before delivering payload") + } + }() + time.Sleep(1) mockService.Subscribe(id, payloadChan, quitChan, params) blockChan <- block1 parentBlockChain <- block0 - - sort.Slice(expectedStateDiffBytes, func(i, j int) bool { return expectedStateDiffBytes[i] < expectedStateDiffBytes[j] }) - select { - case payload := <-payloadChan: - if !bytes.Equal(payload.BlockRlp, expectedBlockRlp) { - t.Errorf("payload does not have expected block\r\nactual block rlp: %v\r\nexpected block rlp: %v", payload.BlockRlp, expectedBlockRlp) - } - sort.Slice(payload.StateObjectRlp, func(i, j int) bool { return payload.StateObjectRlp[i] < payload.StateObjectRlp[j] }) - if !bytes.Equal(payload.StateObjectRlp, expectedStateDiffBytes) { - t.Errorf("payload does not have expected state diff\r\nactual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateObjectRlp, expectedStateDiffBytes) - } - if !bytes.Equal(expectedReceiptBytes, payload.ReceiptsRlp) { - t.Errorf("payload does not have expected receipts\r\nactual receipt rlp: %v\r\nexpected receipt rlp: %v", payload.ReceiptsRlp, expectedReceiptBytes) - } - if !bytes.Equal(payload.TotalDifficulty.Bytes(), mockTotalDifficulty.Bytes()) { - t.Errorf("payload does not have expected total difficulty\r\nactual td: %d\r\nexpected td: %d", payload.TotalDifficulty.Int64(), mockTotalDifficulty.Int64()) - } - case <-quitChan: - t.Errorf("channel quit before delivering payload") - } + wg.Wait() } func testHTTPAPI(t *testing.T) {