From 07a7a534da53e0646cdc0e48f05d96173e4a2cdb Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 14 Sep 2020 08:50:51 -0500 Subject: [PATCH 1/5] batch.Replay() method --- postgres/batch.go | 39 +++++++++++++++++++++++++++++++-------- postgres/batch_test.go | 26 ++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/postgres/batch.go b/postgres/batch.go index 8e38292..96b9b32 100644 --- a/postgres/batch.go +++ b/postgres/batch.go @@ -17,22 +17,25 @@ package pgipfsethdb import ( + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" ) // Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection type Batch struct { - db *sqlx.DB - tx *sqlx.Tx - valueSize int + db *sqlx.DB + tx *sqlx.Tx + valueSize int + replayCache map[string][]byte } // NewBatch returns a ethdb.Batch interface for PG-IPFS func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch { b := &Batch{ - db: db, - tx: tx, + db: db, + tx: tx, + replayCache: make(map[string][]byte), } if tx == nil { b.Reset() @@ -55,6 +58,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) { return err } b.valueSize += len(value) + b.replayCache[common.Bytes2Hex(key)] = value return nil } @@ -62,7 +66,11 @@ func (b *Batch) Put(key []byte, value []byte) (err error) { // Delete removes the key from the key-value data store func (b *Batch) Delete(key []byte) (err error) { _, err = b.tx.Exec(deletePgStr, key) - return err + if err != nil { + return err + } + delete(b.replayCache, common.Bytes2Hex(key)) + return nil } // ValueSize satisfies the ethdb.Batch interface @@ -79,13 +87,27 @@ func (b *Batch) Write() error { if b.tx == nil { return nil } - return b.tx.Commit() + if err := b.tx.Commit(); err != nil { + return err + } + b.replayCache = nil + return nil } // Replay satisfies the ethdb.Batch interface // Replay replays the batch contents func (b *Batch) Replay(w ethdb.KeyValueWriter) error { - return errNotSupported + if b.tx != nil { + b.tx.Rollback() + b.tx = nil + } + for key, value := range b.replayCache { + if err := w.Put(common.Hex2Bytes(key), value); err != nil { + return err + } + } + b.replayCache = nil + return nil } // Reset satisfies the ethdb.Batch interface @@ -97,5 +119,6 @@ func (b *Batch) Reset() { if err != nil { panic(err) } + b.replayCache = make(map[string][]byte) b.valueSize = 0 } diff --git a/postgres/batch_test.go b/postgres/batch_test.go index 5d89168..f941be6 100644 --- a/postgres/batch_test.go +++ b/postgres/batch_test.go @@ -115,4 +115,30 @@ var _ = Describe("Batch", func() { Expect(size).To(Equal(0)) }) }) + + Describe("Replay", func() { + It("returns the size of data in the batch queued for write", func() { + err = batch.Put(testKeccakEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testKeccakEthKey2, testValue2) + Expect(err).ToNot(HaveOccurred()) + + _, err = database.Get(testKeccakEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + _, err = database.Get(testKeccakEthKey2) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = batch.Replay(database) + Expect(err).ToNot(HaveOccurred()) + + val, err := database.Get(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + val2, err := database.Get(testKeccakEthKey2) + Expect(err).ToNot(HaveOccurred()) + Expect(val2).To(Equal(testValue2)) + }) + }) }) From 531bc62effd7defb7511c2cc1bf81cc964cf5d02 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 14 Sep 2020 08:51:48 -0500 Subject: [PATCH 2/5] ancient tables --- .../db/migrations/00004_create_ancient_headers_table.sql | 8 ++++++++ .../db/migrations/00005_create_ancient_hashes_table.sql | 8 ++++++++ .../db/migrations/00006_create_ancient_bodies_table.sql | 8 ++++++++ .../db/migrations/00007_create_ancient_receipts_table.sql | 8 ++++++++ postgres/db/migrations/00008_create_ancient_tds_table.sql | 8 ++++++++ 5 files changed, 40 insertions(+) create mode 100644 postgres/db/migrations/00004_create_ancient_headers_table.sql create mode 100644 postgres/db/migrations/00005_create_ancient_hashes_table.sql create mode 100644 postgres/db/migrations/00006_create_ancient_bodies_table.sql create mode 100644 postgres/db/migrations/00007_create_ancient_receipts_table.sql create mode 100644 postgres/db/migrations/00008_create_ancient_tds_table.sql diff --git a/postgres/db/migrations/00004_create_ancient_headers_table.sql b/postgres/db/migrations/00004_create_ancient_headers_table.sql new file mode 100644 index 0000000..832209d --- /dev/null +++ b/postgres/db/migrations/00004_create_ancient_headers_table.sql @@ -0,0 +1,8 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS eth.ancient_headers ( + block_number BIGINT UNIQUE NOT NULL, + header BYTEA NOT NULL +); + +-- +goose Down +DROP TABLE eth.ancient_headers; \ No newline at end of file diff --git a/postgres/db/migrations/00005_create_ancient_hashes_table.sql b/postgres/db/migrations/00005_create_ancient_hashes_table.sql new file mode 100644 index 0000000..b5ec597 --- /dev/null +++ b/postgres/db/migrations/00005_create_ancient_hashes_table.sql @@ -0,0 +1,8 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS eth.ancient_hashes ( + block_number BIGINT UNIQUE NOT NULL, + hash BYTEA NOT NULL +); + +-- +goose Down +DROP TABLE eth.ancient_hashes; \ No newline at end of file diff --git a/postgres/db/migrations/00006_create_ancient_bodies_table.sql b/postgres/db/migrations/00006_create_ancient_bodies_table.sql new file mode 100644 index 0000000..2de60cd --- /dev/null +++ b/postgres/db/migrations/00006_create_ancient_bodies_table.sql @@ -0,0 +1,8 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS eth.ancient_bodies ( + block_number BIGINT UNIQUE NOT NULL, + body BYTEA NOT NULL +); + +-- +goose Down +DROP TABLE eth.ancient_bodies; \ No newline at end of file diff --git a/postgres/db/migrations/00007_create_ancient_receipts_table.sql b/postgres/db/migrations/00007_create_ancient_receipts_table.sql new file mode 100644 index 0000000..beec1e7 --- /dev/null +++ b/postgres/db/migrations/00007_create_ancient_receipts_table.sql @@ -0,0 +1,8 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS eth.ancient_receipts ( + block_number BIGINT UNIQUE NOT NULL, + receipts BYTEA NOT NULL +); + +-- +goose Down +DROP TABLE eth.ancient_receipts; \ No newline at end of file diff --git a/postgres/db/migrations/00008_create_ancient_tds_table.sql b/postgres/db/migrations/00008_create_ancient_tds_table.sql new file mode 100644 index 0000000..724ecf4 --- /dev/null +++ b/postgres/db/migrations/00008_create_ancient_tds_table.sql @@ -0,0 +1,8 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS eth.ancient_tds ( + block_number BIGINT UNIQUE NOT NULL, + td BYTEA NOT NULL +); + +-- +goose Down +DROP TABLE eth.ancient_tds; \ No newline at end of file From 95178107c0fc41092262828d2f19926f7bf5a1d8 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 14 Sep 2020 08:52:02 -0500 Subject: [PATCH 3/5] ancient interfaces --- postgres/ancient.go | 227 +++++++++++++++++++++++++++++++++++++++++++ postgres/database.go | 47 +-------- 2 files changed, 230 insertions(+), 44 deletions(-) create mode 100644 postgres/ancient.go diff --git a/postgres/ancient.go b/postgres/ancient.go new file mode 100644 index 0000000..ef9c358 --- /dev/null +++ b/postgres/ancient.go @@ -0,0 +1,227 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "fmt" + + "github.com/sirupsen/logrus" +) + +const ( + // freezerHeaderTable indicates the name of the freezer header table. + freezerHeaderTable = "headers" + + // freezerHashTable indicates the name of the freezer canonical hash table. + freezerHashTable = "hashes" + + // freezerBodiesTable indicates the name of the freezer block body table. + freezerBodiesTable = "bodies" + + // freezerReceiptTable indicates the name of the freezer receipts table. + freezerReceiptTable = "receipts" + + // freezerDifficultyTable indicates the name of the freezer total difficulty table. + freezerDifficultyTable = "diffs" + + // ancient append Postgres statements + appendAncientHeaderPgStr = "INSERT INTO eth.ancient_headers (block_number, header) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET header = $2" + appendAncientHashPgStr = "INSERT INTO eth.ancient_hashes (block_number, hash) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET hash = $2" + appendAncientBodyPgStr = "INSERT INTO eth.ancient_bodies (block_number, body) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET body = $2" + appendAncientReceiptsPgStr = "INSERT INTO eth.ancient_receipts (block_number, receipts) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET receipts = $2" + appendAncientTDPgStr = "INSERT INTO eth.ancient_tds (block_number, td) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET td = $2" + + // ancient truncate Postgres statements + truncateAncientHeaderPgStr = "DELETE FROM eth.ancient_headers WHERE block_number > $1" + truncateAncientHashPgStr = "DELETE FROM eth.ancient_hashes WHERE block_number > $1" + truncateAncientBodiesPgStr = "DELETE FROM eth.ancient_bodies WHERE block_number > $1" + truncateAncientReceiptsPgStr = "DELETE FROM eth.ancient_receipts WHERE block_number > $1" + truncateAncientTDPgStr = "DELETE FROM eth.ancient_tds WHERE block_number > $1" + + // ancient size Postgres statement + ancientSizePgStr = "SELECT pg_total_relation_size($1)" + + // ancients Postgres statement + ancientsPgStr = "SELECT block_number FROM eth.ancient_headers ORDER BY block_number DESC LIMIT 1" + + // ancient has Postgres statements + hasAncientHeaderPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_headers WHERE block_number = $1)" + hasAncientHashPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_hashes WHERE block_number = $1)" + hasAncientBodyPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_bodies WHERE block_number = $1)" + hasAncientReceiptsPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_receipts WHERE block_number = $1)" + hasAncientTDPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_tds WHERE block_number = $1)" + + // ancient get Postgres statements + getAncientHeaderPgStr = "SELECT header FROM eth.ancient_headers WHERE block_number = $1" + getAncientHashPgStr = "SELECT hash FROM eth.ancient_hashes WHERE block_number = $1" + getAncientBodyPgStr = "SELECT body FROM eth.ancient_bodies WHERE block_number = $1" + getAncientReceiptsPgStr = "SELECT receipts FROM eth.ancient_receipts WHERE block_number = $1" + getAncientTDPgStr = "SELECT td FROM eth.ancient_tds WHERE block_number = $1" +) + +// HasAncient satisfies the ethdb.AncientReader interface +// HasAncient returns an indicator whether the specified data exists in the ancient store +func (d *Database) HasAncient(kind string, number uint64) (bool, error) { + var pgStr string + switch kind { + case freezerHeaderTable: + pgStr = hasAncientHeaderPgStr + case freezerHashTable: + pgStr = hasAncientHashPgStr + case freezerBodiesTable: + pgStr = hasAncientBodyPgStr + case freezerReceiptTable: + pgStr = hasAncientReceiptsPgStr + case freezerDifficultyTable: + pgStr = hasAncientTDPgStr + default: + return false, fmt.Errorf("unexpected ancient kind: %s", kind) + } + has := new(bool) + return *has, d.db.Get(has, pgStr, number) +} + +// Ancient satisfies the ethdb.AncientReader interface +// Ancient retrieves an ancient binary blob from the append-only immutable files +func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { + var pgStr string + switch kind { + case freezerHeaderTable: + pgStr = getAncientHeaderPgStr + case freezerHashTable: + pgStr = getAncientHashPgStr + case freezerBodiesTable: + pgStr = getAncientBodyPgStr + case freezerReceiptTable: + pgStr = getAncientReceiptsPgStr + case freezerDifficultyTable: + pgStr = getAncientTDPgStr + default: + return nil, fmt.Errorf("unexpected ancient kind: %s", kind) + } + data := new([]byte) + return *data, d.db.Get(data, pgStr, number) +} + +// Ancients satisfies the ethdb.AncientReader interface +// Ancients returns the ancient item numbers in the ancient store +func (d *Database) Ancients() (uint64, error) { + num := new(uint64) + return *num, d.db.Get(num, ancientsPgStr) +} + +// AncientSize satisfies the ethdb.AncientReader interface +// AncientSize returns the ancient size of the specified category +func (d *Database) AncientSize(kind string) (uint64, error) { + var tableName string + switch kind { + case freezerHeaderTable: + tableName = "eth.ancient_headers" + case freezerHashTable: + tableName = "eth.ancient_hashes" + case freezerBodiesTable: + tableName = "eth.ancient_bodies" + case freezerReceiptTable: + tableName = "eth.ancient_receipts" + case freezerDifficultyTable: + tableName = "eth.ancient_tds" + default: + return 0, fmt.Errorf("unexpected ancient kind: %s", kind) + } + size := new(uint64) + return *size, d.db.Get(size, ancientSizePgStr, tableName) +} + +// AppendAncient satisfies the ethdb.AncientWriter interface +// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files +func (d *Database) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { + // append in batch + var err error + if d.ancientTx == nil { + d.ancientTx, err = d.db.Beginx() + if err != nil { + return err + } + } + defer func() { + if err != nil { + if err := d.ancientTx.Rollback(); err != nil { + logrus.Error(err) + d.ancientTx = nil + } + } + }() + + if _, err := d.ancientTx.Exec(appendAncientHashPgStr, number, hash); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientHeaderPgStr, number, header); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientBodyPgStr, number, body); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientReceiptsPgStr, number, receipts); err != nil { + return err + } + _, err = d.ancientTx.Exec(appendAncientTDPgStr, number, td) + return err +} + +// TruncateAncients satisfies the ethdb.AncientWriter interface +// TruncateAncients discards all but the first n ancient data from the ancient store +func (d *Database) TruncateAncients(n uint64) error { + // truncate in batch + var err error + if d.ancientTx == nil { + d.ancientTx, err = d.db.Beginx() + if err != nil { + return err + } + } + defer func() { + if err != nil { + if err := d.ancientTx.Rollback(); err != nil { + logrus.Error(err) + d.ancientTx = nil + } + } + }() + if _, err := d.ancientTx.Exec(truncateAncientHeaderPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientHashPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientBodiesPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientReceiptsPgStr, n); err != nil { + return err + } + _, err = d.ancientTx.Exec(truncateAncientTDPgStr, n) + return err +} + +// Sync satisfies the ethdb.AncientWriter interface +// Sync flushes all in-memory ancient store data to disk +func (d *Database) Sync() error { + if d.ancientTx == nil { + return nil + } + return d.ancientTx.Commit() +} diff --git a/postgres/database.go b/postgres/database.go index 1e5a958..c00c129 100644 --- a/postgres/database.go +++ b/postgres/database.go @@ -30,7 +30,7 @@ import ( var errNotSupported = errors.New("this operation is not supported") const ( - hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)" + hasPgStr = "SELECT exists(SELECT 1 FROM eth.key_preimages WHERE eth_key = $1)" getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1" putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING" putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)" @@ -40,7 +40,8 @@ const ( // Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection type Database struct { - db *sqlx.DB + db *sqlx.DB + ancientTx *sqlx.Tx } // NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS @@ -221,45 +222,3 @@ func (d *Database) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { func (d *Database) Close() error { return d.db.DB.Close() } - -// HasAncient satisfies the ethdb.AncientReader interface -// HasAncient returns an indicator whether the specified data exists in the ancient store -func (d *Database) HasAncient(kind string, number uint64) (bool, error) { - return false, errNotSupported -} - -// Ancient satisfies the ethdb.AncientReader interface -// Ancient retrieves an ancient binary blob from the append-only immutable files -func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { - return nil, errNotSupported -} - -// Ancients satisfies the ethdb.AncientReader interface -// Ancients returns the ancient item numbers in the ancient store -func (d *Database) Ancients() (uint64, error) { - return 0, errNotSupported -} - -// AncientSize satisfies the ethdb.AncientReader interface -// AncientSize returns the ancient size of the specified category -func (d *Database) AncientSize(kind string) (uint64, error) { - return 0, errNotSupported -} - -// AppendAncient satisfies the ethdb.AncientWriter interface -// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files -func (d *Database) AppendAncient(number uint64, hash, header, body, receipt, td []byte) error { - return errNotSupported -} - -// TruncateAncients satisfies the ethdb.AncientWriter interface -// TruncateAncients discards all but the first n ancient data from the ancient store -func (d *Database) TruncateAncients(n uint64) error { - return errNotSupported -} - -// Sync satisfies the ethdb.AncientWriter interface -// Sync flushes all in-memory ancient store data to disk -func (d *Database) Sync() error { - return errNotSupported -} From 961e10bfe8e0152d95183c4c4c529db144812448 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 14 Sep 2020 10:15:32 -0500 Subject: [PATCH 4/5] unit tests for ancient interfaces --- postgres/ancient.go | 65 +++-- postgres/ancient_test.go | 232 ++++++++++++++++++ postgres/database.go | 3 +- ...0004_create_eth_ancient_headers_table.sql} | 0 ...00005_create_eth_ancient_hashes_table.sql} | 0 ...00006_create_eth_ancient_bodies_table.sql} | 0 ...007_create_eth_ancient_receipts_table.sql} | 0 ...=> 00008_create_eth_ancient_tds_table.sql} | 0 postgres/test_helpers.go | 38 ++- 9 files changed, 308 insertions(+), 30 deletions(-) create mode 100644 postgres/ancient_test.go rename postgres/db/migrations/{00004_create_ancient_headers_table.sql => 00004_create_eth_ancient_headers_table.sql} (100%) rename postgres/db/migrations/{00005_create_ancient_hashes_table.sql => 00005_create_eth_ancient_hashes_table.sql} (100%) rename postgres/db/migrations/{00006_create_ancient_bodies_table.sql => 00006_create_eth_ancient_bodies_table.sql} (100%) rename postgres/db/migrations/{00007_create_ancient_receipts_table.sql => 00007_create_eth_ancient_receipts_table.sql} (100%) rename postgres/db/migrations/{00008_create_ancient_tds_table.sql => 00008_create_eth_ancient_tds_table.sql} (100%) diff --git a/postgres/ancient.go b/postgres/ancient.go index ef9c358..e6c4454 100644 --- a/postgres/ancient.go +++ b/postgres/ancient.go @@ -17,26 +17,27 @@ package pgipfsethdb import ( + "database/sql" "fmt" "github.com/sirupsen/logrus" ) const ( - // freezerHeaderTable indicates the name of the freezer header table. - freezerHeaderTable = "headers" + // FreezerHeaderTable indicates the name of the freezer header table. + FreezerHeaderTable = "headers" - // freezerHashTable indicates the name of the freezer canonical hash table. - freezerHashTable = "hashes" + // FreezerHashTable indicates the name of the freezer canonical hash table. + FreezerHashTable = "hashes" - // freezerBodiesTable indicates the name of the freezer block body table. - freezerBodiesTable = "bodies" + // FreezerBodiesTable indicates the name of the freezer block body table. + FreezerBodiesTable = "bodies" - // freezerReceiptTable indicates the name of the freezer receipts table. - freezerReceiptTable = "receipts" + // FreezerReceiptTable indicates the name of the freezer receipts table. + FreezerReceiptTable = "receipts" - // freezerDifficultyTable indicates the name of the freezer total difficulty table. - freezerDifficultyTable = "diffs" + // FreezerDifficultyTable indicates the name of the freezer total difficulty table. + FreezerDifficultyTable = "diffs" // ancient append Postgres statements appendAncientHeaderPgStr = "INSERT INTO eth.ancient_headers (block_number, header) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET header = $2" @@ -78,15 +79,15 @@ const ( func (d *Database) HasAncient(kind string, number uint64) (bool, error) { var pgStr string switch kind { - case freezerHeaderTable: + case FreezerHeaderTable: pgStr = hasAncientHeaderPgStr - case freezerHashTable: + case FreezerHashTable: pgStr = hasAncientHashPgStr - case freezerBodiesTable: + case FreezerBodiesTable: pgStr = hasAncientBodyPgStr - case freezerReceiptTable: + case FreezerReceiptTable: pgStr = hasAncientReceiptsPgStr - case freezerDifficultyTable: + case FreezerDifficultyTable: pgStr = hasAncientTDPgStr default: return false, fmt.Errorf("unexpected ancient kind: %s", kind) @@ -100,15 +101,15 @@ func (d *Database) HasAncient(kind string, number uint64) (bool, error) { func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { var pgStr string switch kind { - case freezerHeaderTable: + case FreezerHeaderTable: pgStr = getAncientHeaderPgStr - case freezerHashTable: + case FreezerHashTable: pgStr = getAncientHashPgStr - case freezerBodiesTable: + case FreezerBodiesTable: pgStr = getAncientBodyPgStr - case freezerReceiptTable: + case FreezerReceiptTable: pgStr = getAncientReceiptsPgStr - case freezerDifficultyTable: + case FreezerDifficultyTable: pgStr = getAncientTDPgStr default: return nil, fmt.Errorf("unexpected ancient kind: %s", kind) @@ -121,7 +122,13 @@ func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { // Ancients returns the ancient item numbers in the ancient store func (d *Database) Ancients() (uint64, error) { num := new(uint64) - return *num, d.db.Get(num, ancientsPgStr) + if err := d.db.Get(num, ancientsPgStr); err != nil { + if err == sql.ErrNoRows { + return 0, nil + } + return 0, err + } + return *num, nil } // AncientSize satisfies the ethdb.AncientReader interface @@ -129,15 +136,15 @@ func (d *Database) Ancients() (uint64, error) { func (d *Database) AncientSize(kind string) (uint64, error) { var tableName string switch kind { - case freezerHeaderTable: + case FreezerHeaderTable: tableName = "eth.ancient_headers" - case freezerHashTable: + case FreezerHashTable: tableName = "eth.ancient_hashes" - case freezerBodiesTable: + case FreezerBodiesTable: tableName = "eth.ancient_bodies" - case freezerReceiptTable: + case FreezerReceiptTable: tableName = "eth.ancient_receipts" - case freezerDifficultyTable: + case FreezerDifficultyTable: tableName = "eth.ancient_tds" default: return 0, fmt.Errorf("unexpected ancient kind: %s", kind) @@ -223,5 +230,9 @@ func (d *Database) Sync() error { if d.ancientTx == nil { return nil } - return d.ancientTx.Commit() + if err := d.ancientTx.Commit(); err != nil { + return err + } + d.ancientTx = nil + return nil } diff --git a/postgres/ancient_test.go b/postgres/ancient_test.go new file mode 100644 index 0000000..8cd85ee --- /dev/null +++ b/postgres/ancient_test.go @@ -0,0 +1,232 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb_test + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "github.com/vulcanize/ipfs-ethdb/postgres" +) + +var ( + ancientDB ethdb.Database + testBlockNumber uint64 = 1 + testAncientHeader = types.Header{Number: big.NewInt(2)} + testAncientHeaderRLP, _ = rlp.EncodeToBytes(testHeader2) + testAncientHash = testAncientHeader.Hash().Bytes() + testAncientBodyBytes = make([]byte, 10000) + testAncientReceiptsBytes = make([]byte, 5000) + testAncientTD, _ = new(big.Int).SetString("1000000000000000000000", 10) + testAncientTDBytes = testAncientTD.Bytes() +) + +var _ = Describe("Ancient", func() { + BeforeEach(func() { + db, err = pgipfsethdb.TestDB() + Expect(err).ToNot(HaveOccurred()) + ancientDB = pgipfsethdb.NewDatabase(db) + + }) + AfterEach(func() { + err = pgipfsethdb.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("AppendAncient/Sync/Has", func() { + It("adds eth objects to the Ancient database and returns whether or not an ancient record exists", func() { + hasAncient(testBlockNumber, false) + + err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + + hasAncient(testBlockNumber, false) + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + hasAncient(testBlockNumber, true) + }) + }) + + Describe("AppendAncient/Sync/Ancient", func() { + It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() { + hasAncient(testBlockNumber, false) + + _, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + _, err = ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + _, err = ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + _, err = ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + _, err = ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + + err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + hasAncient(testBlockNumber, true) + + ancientHeader, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientHeader).To(Equal(testAncientHeaderRLP)) + + ancientHash, err := ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientHash).To(Equal(testAncientHash)) + + ancientBody, err := ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientBody).To(Equal(testAncientBodyBytes)) + + ancientReceipts, err := ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientReceipts).To(Equal(testAncientReceiptsBytes)) + + ancientTD, err := ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientTD).To(Equal(testAncientTDBytes)) + }) + }) + + Describe("AppendAncient/Sync/Ancients", func() { + It("returns the height of the ancient database", func() { + ancients, err := ancientDB.Ancients() + Expect(err).ToNot(HaveOccurred()) + Expect(ancients).To(Equal(uint64(0))) + + for i := uint64(0); i <= 100; i++ { + hasAncient(i, false) + err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + } + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + for i := uint64(0); i <= 100; i++ { + hasAncient(i, true) + } + ancients, err = ancientDB.Ancients() + Expect(err).ToNot(HaveOccurred()) + Expect(ancients).To(Equal(uint64(100))) + }) + }) + + Describe("AppendAncient/Truncate/Sync", func() { + It("truncates the ancient database to the provided height", func() { + for i := uint64(0); i <= 100; i++ { + hasAncient(i, false) + err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + } + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + err = ancientDB.TruncateAncients(50) + Expect(err).ToNot(HaveOccurred()) + + for i := uint64(0); i <= 100; i++ { + hasAncient(i, true) + } + + ancients, err := ancientDB.Ancients() + Expect(err).ToNot(HaveOccurred()) + Expect(ancients).To(Equal(uint64(100))) + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + for i := uint64(0); i <= 100; i++ { + if i <= 50 { + hasAncient(i, true) + } else { + hasAncient(i, false) + } + } + + ancients, err = ancientDB.Ancients() + Expect(err).ToNot(HaveOccurred()) + Expect(ancients).To(Equal(uint64(50))) + }) + }) + + Describe("AppendAncient/Sync/AncientSize", func() { + It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() { + for i := uint64(0); i <= 100; i++ { + hasAncient(i, false) + err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + } + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + for i := uint64(0); i <= 100; i++ { + hasAncient(i, true) + } + + ancientHeaderSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHeaderTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientHeaderSize).To(Equal(uint64(106496))) + + ancientHashSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHashTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientHashSize).To(Equal(uint64(32768))) + + ancientBodySize, err := ancientDB.AncientSize(pgipfsethdb.FreezerBodiesTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientBodySize).To(Equal(uint64(73728))) + + ancientReceiptsSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerReceiptTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientReceiptsSize).To(Equal(uint64(65536))) + + ancientTDSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerDifficultyTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientTDSize).To(Equal(uint64(32768))) + }) + }) +}) + +func hasAncient(blockNumber uint64, shouldHave bool) { + has, err := ancientDB.HasAncient(pgipfsethdb.FreezerHeaderTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) + has, err = ancientDB.HasAncient(pgipfsethdb.FreezerHashTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) + has, err = ancientDB.HasAncient(pgipfsethdb.FreezerBodiesTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) + has, err = ancientDB.HasAncient(pgipfsethdb.FreezerReceiptTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) + has, err = ancientDB.HasAncient(pgipfsethdb.FreezerDifficultyTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) +} diff --git a/postgres/database.go b/postgres/database.go index c00c129..3872935 100644 --- a/postgres/database.go +++ b/postgres/database.go @@ -188,7 +188,8 @@ func (d *Database) Stat(property string) (string, error) { // Compact satisfies the ethdb.Compacter interface // Compact flattens the underlying data store for the given key range func (d *Database) Compact(start []byte, limit []byte) error { - return errNotSupported + // this leveldb functionality doesn't translate to Postgres + return nil } // NewBatch satisfies the ethdb.Batcher interface diff --git a/postgres/db/migrations/00004_create_ancient_headers_table.sql b/postgres/db/migrations/00004_create_eth_ancient_headers_table.sql similarity index 100% rename from postgres/db/migrations/00004_create_ancient_headers_table.sql rename to postgres/db/migrations/00004_create_eth_ancient_headers_table.sql diff --git a/postgres/db/migrations/00005_create_ancient_hashes_table.sql b/postgres/db/migrations/00005_create_eth_ancient_hashes_table.sql similarity index 100% rename from postgres/db/migrations/00005_create_ancient_hashes_table.sql rename to postgres/db/migrations/00005_create_eth_ancient_hashes_table.sql diff --git a/postgres/db/migrations/00006_create_ancient_bodies_table.sql b/postgres/db/migrations/00006_create_eth_ancient_bodies_table.sql similarity index 100% rename from postgres/db/migrations/00006_create_ancient_bodies_table.sql rename to postgres/db/migrations/00006_create_eth_ancient_bodies_table.sql diff --git a/postgres/db/migrations/00007_create_ancient_receipts_table.sql b/postgres/db/migrations/00007_create_eth_ancient_receipts_table.sql similarity index 100% rename from postgres/db/migrations/00007_create_ancient_receipts_table.sql rename to postgres/db/migrations/00007_create_eth_ancient_receipts_table.sql diff --git a/postgres/db/migrations/00008_create_ancient_tds_table.sql b/postgres/db/migrations/00008_create_eth_ancient_tds_table.sql similarity index 100% rename from postgres/db/migrations/00008_create_ancient_tds_table.sql rename to postgres/db/migrations/00008_create_eth_ancient_tds_table.sql diff --git a/postgres/test_helpers.go b/postgres/test_helpers.go index 7eb4260..e62f787 100644 --- a/postgres/test_helpers.go +++ b/postgres/test_helpers.go @@ -16,7 +16,9 @@ package pgipfsethdb -import "github.com/jmoiron/sqlx" +import ( + "github.com/jmoiron/sqlx" +) // TestDB connect to the testing database // it assumes the database has the IPFS public.blocks table present @@ -28,6 +30,38 @@ func TestDB() (*sqlx.DB, error) { // ResetTestDB drops all rows in the test db public.blocks table func ResetTestDB(db *sqlx.DB) error { - _, err := db.Exec("TRUNCATE public.blocks CASCADE") + tx, err := db.Beginx() + if err != nil { + return err + } + defer func() { + if p := recover(); p != nil { + tx.Rollback() + panic(p) + } else if err != nil { + tx.Rollback() + } else { + err = tx.Commit() + } + }() + if _, err := tx.Exec("TRUNCATE public.blocks CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.key_preimages CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.ancient_headers CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.ancient_hashes CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.ancient_bodies CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.ancient_receipts CASCADE"); err != nil { + return err + } + _, err = tx.Exec("TRUNCATE eth.ancient_tds CASCADE") return err } From 2c4e718a67b7a80380bba2c7a6a5aeb5e948660b Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 14 Sep 2020 10:18:50 -0500 Subject: [PATCH 5/5] no more need for errNotSupported --- postgres/database.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/postgres/database.go b/postgres/database.go index 3872935..a1253f8 100644 --- a/postgres/database.go +++ b/postgres/database.go @@ -17,7 +17,6 @@ package pgipfsethdb import ( - "errors" "fmt" "strings" @@ -27,8 +26,6 @@ import ( "github.com/jmoiron/sqlx" ) -var errNotSupported = errors.New("this operation is not supported") - const ( hasPgStr = "SELECT exists(SELECT 1 FROM eth.key_preimages WHERE eth_key = $1)" getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"