diff --git a/ethdb/postgres/ancient.go b/ethdb/postgres/ancient.go new file mode 100644 index 000000000..660dc6fe9 --- /dev/null +++ b/ethdb/postgres/ancient.go @@ -0,0 +1,238 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "database/sql" + "fmt" + + "github.com/sirupsen/logrus" +) + +const ( + // FreezerHeaderTable indicates the name of the freezer header table. + FreezerHeaderTable = "headers" + + // FreezerHashTable indicates the name of the freezer canonical hash table. + FreezerHashTable = "hashes" + + // FreezerBodiesTable indicates the name of the freezer block body table. + FreezerBodiesTable = "bodies" + + // FreezerReceiptTable indicates the name of the freezer receipts table. + FreezerReceiptTable = "receipts" + + // FreezerDifficultyTable indicates the name of the freezer total difficulty table. + FreezerDifficultyTable = "diffs" + + // ancient append Postgres statements + appendAncientHeaderPgStr = "INSERT INTO eth.ancient_headers (block_number, header) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET header = $2" + appendAncientHashPgStr = "INSERT INTO eth.ancient_hashes (block_number, hash) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET hash = $2" + appendAncientBodyPgStr = "INSERT INTO eth.ancient_bodies (block_number, body) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET body = $2" + appendAncientReceiptsPgStr = "INSERT INTO eth.ancient_receipts (block_number, receipts) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET receipts = $2" + appendAncientTDPgStr = "INSERT INTO eth.ancient_tds (block_number, td) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET td = $2" + + // ancient truncate Postgres statements + truncateAncientHeaderPgStr = "DELETE FROM eth.ancient_headers WHERE block_number > $1" + truncateAncientHashPgStr = "DELETE FROM eth.ancient_hashes WHERE block_number > $1" + truncateAncientBodiesPgStr = "DELETE FROM eth.ancient_bodies WHERE block_number > $1" + truncateAncientReceiptsPgStr = "DELETE FROM eth.ancient_receipts WHERE block_number > $1" + truncateAncientTDPgStr = "DELETE FROM eth.ancient_tds WHERE block_number > $1" + + // ancient size Postgres statement + ancientSizePgStr = "SELECT pg_total_relation_size($1)" + + // ancients Postgres statement + ancientsPgStr = "SELECT block_number FROM eth.ancient_headers ORDER BY block_number DESC LIMIT 1" + + // ancient has Postgres statements + hasAncientHeaderPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_headers WHERE block_number = $1)" + hasAncientHashPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_hashes WHERE block_number = $1)" + hasAncientBodyPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_bodies WHERE block_number = $1)" + hasAncientReceiptsPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_receipts WHERE block_number = $1)" + hasAncientTDPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_tds WHERE block_number = $1)" + + // ancient get Postgres statements + getAncientHeaderPgStr = "SELECT header FROM eth.ancient_headers WHERE block_number = $1" + getAncientHashPgStr = "SELECT hash FROM eth.ancient_hashes WHERE block_number = $1" + getAncientBodyPgStr = "SELECT body FROM eth.ancient_bodies WHERE block_number = $1" + getAncientReceiptsPgStr = "SELECT receipts FROM eth.ancient_receipts WHERE block_number = $1" + getAncientTDPgStr = "SELECT td FROM eth.ancient_tds WHERE block_number = $1" +) + +// HasAncient satisfies the ethdb.AncientReader interface +// HasAncient returns an indicator whether the specified data exists in the ancient store +func (d *Database) HasAncient(kind string, number uint64) (bool, error) { + var pgStr string + switch kind { + case FreezerHeaderTable: + pgStr = hasAncientHeaderPgStr + case FreezerHashTable: + pgStr = hasAncientHashPgStr + case FreezerBodiesTable: + pgStr = hasAncientBodyPgStr + case FreezerReceiptTable: + pgStr = hasAncientReceiptsPgStr + case FreezerDifficultyTable: + pgStr = hasAncientTDPgStr + default: + return false, fmt.Errorf("unexpected ancient kind: %s", kind) + } + has := new(bool) + return *has, d.db.Get(has, pgStr, number) +} + +// Ancient satisfies the ethdb.AncientReader interface +// Ancient retrieves an ancient binary blob from the append-only immutable files +func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { + var pgStr string + switch kind { + case FreezerHeaderTable: + pgStr = getAncientHeaderPgStr + case FreezerHashTable: + pgStr = getAncientHashPgStr + case FreezerBodiesTable: + pgStr = getAncientBodyPgStr + case FreezerReceiptTable: + pgStr = getAncientReceiptsPgStr + case FreezerDifficultyTable: + pgStr = getAncientTDPgStr + default: + return nil, fmt.Errorf("unexpected ancient kind: %s", kind) + } + data := new([]byte) + return *data, d.db.Get(data, pgStr, number) +} + +// Ancients satisfies the ethdb.AncientReader interface +// Ancients returns the ancient item numbers in the ancient store +func (d *Database) Ancients() (uint64, error) { + num := new(uint64) + if err := d.db.Get(num, ancientsPgStr); err != nil { + if err == sql.ErrNoRows { + return 0, nil + } + return 0, err + } + return *num, nil +} + +// AncientSize satisfies the ethdb.AncientReader interface +// AncientSize returns the ancient size of the specified category +func (d *Database) AncientSize(kind string) (uint64, error) { + var tableName string + switch kind { + case FreezerHeaderTable: + tableName = "eth.ancient_headers" + case FreezerHashTable: + tableName = "eth.ancient_hashes" + case FreezerBodiesTable: + tableName = "eth.ancient_bodies" + case FreezerReceiptTable: + tableName = "eth.ancient_receipts" + case FreezerDifficultyTable: + tableName = "eth.ancient_tds" + default: + return 0, fmt.Errorf("unexpected ancient kind: %s", kind) + } + size := new(uint64) + return *size, d.db.Get(size, ancientSizePgStr, tableName) +} + +// AppendAncient satisfies the ethdb.AncientWriter interface +// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files +func (d *Database) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { + // append in batch + var err error + if d.ancientTx == nil { + d.ancientTx, err = d.db.Beginx() + if err != nil { + return err + } + } + defer func() { + if err != nil { + if err := d.ancientTx.Rollback(); err != nil { + logrus.Error(err) + d.ancientTx = nil + } + } + }() + + if _, err := d.ancientTx.Exec(appendAncientHashPgStr, number, hash); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientHeaderPgStr, number, header); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientBodyPgStr, number, body); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientReceiptsPgStr, number, receipts); err != nil { + return err + } + _, err = d.ancientTx.Exec(appendAncientTDPgStr, number, td) + return err +} + +// TruncateAncients satisfies the ethdb.AncientWriter interface +// TruncateAncients discards all but the first n ancient data from the ancient store +func (d *Database) TruncateAncients(n uint64) error { + // truncate in batch + var err error + if d.ancientTx == nil { + d.ancientTx, err = d.db.Beginx() + if err != nil { + return err + } + } + defer func() { + if err != nil { + if err := d.ancientTx.Rollback(); err != nil { + logrus.Error(err) + d.ancientTx = nil + } + } + }() + if _, err := d.ancientTx.Exec(truncateAncientHeaderPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientHashPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientBodiesPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientReceiptsPgStr, n); err != nil { + return err + } + _, err = d.ancientTx.Exec(truncateAncientTDPgStr, n) + return err +} + +// Sync satisfies the ethdb.AncientWriter interface +// Sync flushes all in-memory ancient store data to disk +func (d *Database) Sync() error { + if d.ancientTx == nil { + return nil + } + if err := d.ancientTx.Commit(); err != nil { + return err + } + d.ancientTx = nil + return nil +} diff --git a/ethdb/postgres/ancient_test.go b/ethdb/postgres/ancient_test.go new file mode 100644 index 000000000..c2a916691 --- /dev/null +++ b/ethdb/postgres/ancient_test.go @@ -0,0 +1,231 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres_test + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + pgipfsethdb "github.com/ethereum/go-ethereum/ethdb/postgres" + "github.com/ethereum/go-ethereum/rlp" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var ( + ancientDB ethdb.Database + testBlockNumber uint64 = 1 + testAncientHeader = types.Header{Number: big.NewInt(2)} + testAncientHeaderRLP, _ = rlp.EncodeToBytes(testHeader2) + testAncientHash = testAncientHeader.Hash().Bytes() + testAncientBodyBytes = make([]byte, 10000) + testAncientReceiptsBytes = make([]byte, 5000) + testAncientTD, _ = new(big.Int).SetString("1000000000000000000000", 10) + testAncientTDBytes = testAncientTD.Bytes() +) + +var _ = Describe("Ancient", func() { + BeforeEach(func() { + db, err = pgipfsethdb.TestDB() + Expect(err).ToNot(HaveOccurred()) + ancientDB = pgipfsethdb.NewDatabase(db) + + }) + AfterEach(func() { + err = pgipfsethdb.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("AppendAncient/Sync/Has", func() { + It("adds eth objects to the Ancient database and returns whether or not an ancient record exists", func() { + hasAncient(testBlockNumber, false) + + err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + + hasAncient(testBlockNumber, false) + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + hasAncient(testBlockNumber, true) + }) + }) + + Describe("AppendAncient/Sync/Ancient", func() { + It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() { + hasAncient(testBlockNumber, false) + + _, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + _, err = ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + _, err = ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + _, err = ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + _, err = ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber) + Expect(err).To(HaveOccurred()) + + err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + hasAncient(testBlockNumber, true) + + ancientHeader, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientHeader).To(Equal(testAncientHeaderRLP)) + + ancientHash, err := ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientHash).To(Equal(testAncientHash)) + + ancientBody, err := ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientBody).To(Equal(testAncientBodyBytes)) + + ancientReceipts, err := ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientReceipts).To(Equal(testAncientReceiptsBytes)) + + ancientTD, err := ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientTD).To(Equal(testAncientTDBytes)) + }) + }) + + Describe("AppendAncient/Sync/Ancients", func() { + It("returns the height of the ancient database", func() { + ancients, err := ancientDB.Ancients() + Expect(err).ToNot(HaveOccurred()) + Expect(ancients).To(Equal(uint64(0))) + + for i := uint64(0); i <= 100; i++ { + hasAncient(i, false) + err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + } + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + for i := uint64(0); i <= 100; i++ { + hasAncient(i, true) + } + ancients, err = ancientDB.Ancients() + Expect(err).ToNot(HaveOccurred()) + Expect(ancients).To(Equal(uint64(100))) + }) + }) + + Describe("AppendAncient/Truncate/Sync", func() { + It("truncates the ancient database to the provided height", func() { + for i := uint64(0); i <= 100; i++ { + hasAncient(i, false) + err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + } + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + err = ancientDB.TruncateAncients(50) + Expect(err).ToNot(HaveOccurred()) + + for i := uint64(0); i <= 100; i++ { + hasAncient(i, true) + } + + ancients, err := ancientDB.Ancients() + Expect(err).ToNot(HaveOccurred()) + Expect(ancients).To(Equal(uint64(100))) + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + for i := uint64(0); i <= 100; i++ { + if i <= 50 { + hasAncient(i, true) + } else { + hasAncient(i, false) + } + } + + ancients, err = ancientDB.Ancients() + Expect(err).ToNot(HaveOccurred()) + Expect(ancients).To(Equal(uint64(50))) + }) + }) + + Describe("AppendAncient/Sync/AncientSize", func() { + It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() { + for i := uint64(0); i <= 100; i++ { + hasAncient(i, false) + err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes) + Expect(err).ToNot(HaveOccurred()) + } + + err = ancientDB.Sync() + Expect(err).ToNot(HaveOccurred()) + + for i := uint64(0); i <= 100; i++ { + hasAncient(i, true) + } + + ancientHeaderSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHeaderTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientHeaderSize).To(Equal(uint64(106496))) + + ancientHashSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHashTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientHashSize).To(Equal(uint64(32768))) + + ancientBodySize, err := ancientDB.AncientSize(pgipfsethdb.FreezerBodiesTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientBodySize).To(Equal(uint64(73728))) + + ancientReceiptsSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerReceiptTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientReceiptsSize).To(Equal(uint64(65536))) + + ancientTDSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerDifficultyTable) + Expect(err).ToNot(HaveOccurred()) + Expect(ancientTDSize).To(Equal(uint64(32768))) + }) + }) +}) + +func hasAncient(blockNumber uint64, shouldHave bool) { + has, err := ancientDB.HasAncient(pgipfsethdb.FreezerHeaderTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) + has, err = ancientDB.HasAncient(pgipfsethdb.FreezerHashTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) + has, err = ancientDB.HasAncient(pgipfsethdb.FreezerBodiesTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) + has, err = ancientDB.HasAncient(pgipfsethdb.FreezerReceiptTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) + has, err = ancientDB.HasAncient(pgipfsethdb.FreezerDifficultyTable, blockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(Equal(shouldHave)) +} diff --git a/ethdb/postgres/batch.go b/ethdb/postgres/batch.go new file mode 100644 index 000000000..a56e835bf --- /dev/null +++ b/ethdb/postgres/batch.go @@ -0,0 +1,124 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/jmoiron/sqlx" +) + +// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection +type Batch struct { + db *sqlx.DB + tx *sqlx.Tx + valueSize int + replayCache map[string][]byte +} + +// NewBatch returns a ethdb.Batch interface for PG-IPFS +func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch { + b := &Batch{ + db: db, + tx: tx, + replayCache: make(map[string][]byte), + } + if tx == nil { + b.Reset() + } + return b +} + +// Put satisfies the ethdb.Batch interface +// Put inserts the given value into the key-value data store +// Key is expected to be the keccak256 hash of value +func (b *Batch) Put(key []byte, value []byte) (err error) { + dsKey, prefix, err := DatastoreKeyFromGethKey(key) + if err != nil { + return err + } + if _, err = b.tx.Exec(putPgStr, dsKey, value); err != nil { + return err + } + if _, err = b.tx.Exec(putPreimagePgStr, key, dsKey, prefix); err != nil { + return err + } + b.valueSize += len(value) + b.replayCache[common.Bytes2Hex(key)] = value + return nil +} + +// Delete satisfies the ethdb.Batch interface +// Delete removes the key from the key-value data store +func (b *Batch) Delete(key []byte) (err error) { + _, err = b.tx.Exec(deletePgStr, key) + if err != nil { + return err + } + delete(b.replayCache, common.Bytes2Hex(key)) + return nil +} + +// ValueSize satisfies the ethdb.Batch interface +// ValueSize retrieves the amount of data queued up for writing +// The returned value is the total byte length of all data queued to write +func (b *Batch) ValueSize() int { + return b.valueSize +} + +// Write satisfies the ethdb.Batch interface +// Write flushes any accumulated data to disk +// Reset should be called after every write +func (b *Batch) Write() error { + if b.tx == nil { + return nil + } + if err := b.tx.Commit(); err != nil { + return err + } + b.replayCache = nil + return nil +} + +// Replay satisfies the ethdb.Batch interface +// Replay replays the batch contents +func (b *Batch) Replay(w ethdb.KeyValueWriter) error { + if b.tx != nil { + b.tx.Rollback() + b.tx = nil + } + for key, value := range b.replayCache { + if err := w.Put(common.Hex2Bytes(key), value); err != nil { + return err + } + } + b.replayCache = nil + return nil +} + +// Reset satisfies the ethdb.Batch interface +// Reset resets the batch for reuse +// This should be called after every write +func (b *Batch) Reset() { + var err error + b.tx, err = b.db.Beginx() + if err != nil { + panic(err) + } + b.replayCache = make(map[string][]byte) + b.valueSize = 0 +} diff --git a/ethdb/postgres/batch_test.go b/ethdb/postgres/batch_test.go new file mode 100644 index 000000000..aeda60330 --- /dev/null +++ b/ethdb/postgres/batch_test.go @@ -0,0 +1,143 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres_test + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + pgipfsethdb "github.com/ethereum/go-ethereum/ethdb/postgres" + "github.com/ethereum/go-ethereum/rlp" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var ( + batch ethdb.Batch + testHeader2 = types.Header{Number: big.NewInt(2)} + testValue2, _ = rlp.EncodeToBytes(testHeader2) + testKeccakEthKey2 = testHeader2.Hash().Bytes() +) + +var _ = Describe("Batch", func() { + BeforeEach(func() { + db, err = pgipfsethdb.TestDB() + Expect(err).ToNot(HaveOccurred()) + database = pgipfsethdb.NewDatabase(db) + batch = database.NewBatch() + }) + AfterEach(func() { + err = pgipfsethdb.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("Put/Write", func() { + It("adds the key-value pair to the batch", func() { + _, err = database.Get(testKeccakEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + _, err = database.Get(testKeccakEthKey2) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = batch.Put(testKeccakEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testKeccakEthKey2, testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + val, err := database.Get(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + val2, err := database.Get(testKeccakEthKey2) + Expect(err).ToNot(HaveOccurred()) + Expect(val2).To(Equal(testValue2)) + }) + }) + + Describe("Delete/Reset/Write", func() { + It("deletes the key-value pair in the batch", func() { + err = batch.Put(testKeccakEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testKeccakEthKey2, testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + batch.Reset() + err = batch.Delete(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + err = batch.Delete(testKeccakEthKey2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + _, err = database.Get(testKeccakEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + _, err = database.Get(testKeccakEthKey2) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) + + Describe("ValueSize/Reset", func() { + It("returns the size of data in the batch queued for write", func() { + err = batch.Put(testKeccakEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testKeccakEthKey2, testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + size := batch.ValueSize() + Expect(size).To(Equal(len(testValue) + len(testValue2))) + + batch.Reset() + size = batch.ValueSize() + Expect(size).To(Equal(0)) + }) + }) + + Describe("Replay", func() { + It("returns the size of data in the batch queued for write", func() { + err = batch.Put(testKeccakEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testKeccakEthKey2, testValue2) + Expect(err).ToNot(HaveOccurred()) + + _, err = database.Get(testKeccakEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + _, err = database.Get(testKeccakEthKey2) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = batch.Replay(database) + Expect(err).ToNot(HaveOccurred()) + + val, err := database.Get(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + val2, err := database.Get(testKeccakEthKey2) + Expect(err).ToNot(HaveOccurred()) + Expect(val2).To(Equal(testValue2)) + }) + }) +}) diff --git a/ethdb/postgres/config.go b/ethdb/postgres/config.go new file mode 100644 index 000000000..065d11c18 --- /dev/null +++ b/ethdb/postgres/config.go @@ -0,0 +1,91 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "fmt" + "time" + + "github.com/jmoiron/sqlx" +) + +var ( + defaultMaxDBConnections = 1024 + defaultMaxIdleConnections = 16 +) + +// Config holds Postgres connection pool configuration params +type Config struct { + Database string + Hostname string + Port int + User string + Password string + + // Optimization parameters + MaxOpen int + MaxIdle int + MaxLifetime time.Duration +} + +// NewConfig returns a new config struct from provided params +func NewConfig(database, hostname, password, user string, port, maxOpen, maxIdle int, maxLifetime time.Duration) *Config { + return &Config{ + Database: database, + Hostname: hostname, + Port: port, + User: user, + Password: password, + MaxOpen: maxOpen, + MaxLifetime: maxLifetime, + MaxIdle: maxIdle, + } +} + +// DbConnectionString resolves Postgres config params to a connection string +func DbConnectionString(config *Config) string { + if len(config.User) > 0 && len(config.Password) > 0 { + return fmt.Sprintf("postgresql://%s:%s@%s:%d/%s?sslmode=disable", + config.User, config.Password, config.Hostname, config.Port, config.Database) + } + if len(config.User) > 0 && len(config.Password) == 0 { + return fmt.Sprintf("postgresql://%s@%s:%d/%s?sslmode=disable", + config.User, config.Hostname, config.Port, config.Database) + } + return fmt.Sprintf("postgresql://%s:%d/%s?sslmode=disable", config.Hostname, config.Port, config.Database) +} + +// NewDB opens and returns a new Postgres connection pool using the provided config +func NewDB(c *Config) (*sqlx.DB, error) { + connectStr := DbConnectionString(c) + db, err := sqlx.Connect("postgres", connectStr) + if err != nil { + return nil, err + } + if c.MaxIdle > 0 { + db.SetMaxIdleConns(c.MaxIdle) + } else { + db.SetMaxIdleConns(defaultMaxIdleConnections) + } + if c.MaxOpen > 0 { + db.SetMaxOpenConns(c.MaxOpen) + } else { + db.SetMaxOpenConns(defaultMaxDBConnections) + } + db.SetConnMaxLifetime(c.MaxLifetime) + return db, nil +} diff --git a/ethdb/postgres/database.go b/ethdb/postgres/database.go new file mode 100644 index 000000000..dc5360acf --- /dev/null +++ b/ethdb/postgres/database.go @@ -0,0 +1,221 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "fmt" + "strings" + + "github.com/sirupsen/logrus" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/jmoiron/sqlx" +) + +const ( + hasPgStr = "SELECT exists(SELECT 1 FROM eth.key_preimages WHERE eth_key = $1)" + getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1" + putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING" + putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)" + deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1" + dbSizePgStr = "SELECT pg_database_size(current_database())" +) + +// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection +type Database struct { + db *sqlx.DB + ancientTx *sqlx.Tx +} + +// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS +func NewKeyValueStore(db *sqlx.DB) ethdb.KeyValueStore { + return &Database{ + db: db, + } +} + +// NewDatabase returns a ethdb.Database interface for PG-IPFS +func NewDatabase(db *sqlx.DB) ethdb.Database { + return &Database{ + db: db, + } +} + +// Has satisfies the ethdb.KeyValueReader interface +// Has retrieves if a key is present in the key-value data store +// Has uses the eth.key_preimages table +func (d *Database) Has(key []byte) (bool, error) { + var exists bool + return exists, d.db.Get(&exists, hasPgStr, key) +} + +// Get satisfies the ethdb.KeyValueReader interface +// Get retrieves the given key if it's present in the key-value data store +// Get uses the eth.key_preimages table +func (d *Database) Get(key []byte) ([]byte, error) { + var data []byte + return data, d.db.Get(&data, getPgStr, key) +} + +// Put satisfies the ethdb.KeyValueWriter interface +// Put inserts the given value into the key-value data store +// Key is expected to be the keccak256 hash of value +// Put inserts the keccak256 key into the eth.key_preimages table +func (d *Database) Put(key []byte, value []byte) error { + dsKey, prefix, err := DatastoreKeyFromGethKey(key) + if err != nil { + return err + } + tx, err := d.db.Beginx() + if err != nil { + return err + } + defer func() { + if err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + } else { + err = tx.Commit() + } + }() + if _, err = tx.Exec(putPgStr, dsKey, value); err != nil { + return err + } + _, err = tx.Exec(putPreimagePgStr, key, dsKey, prefix) + return err +} + +// Delete satisfies the ethdb.KeyValueWriter interface +// Delete removes the key from the key-value data store +// Delete uses the eth.key_preimages table +func (d *Database) Delete(key []byte) error { + _, err := d.db.Exec(deletePgStr, key) + return err +} + +// DatabaseProperty enum type +type DatabaseProperty int + +const ( + Unknown DatabaseProperty = iota + Size + Idle + InUse + MaxIdleClosed + MaxLifetimeClosed + MaxOpenConnections + OpenConnections + WaitCount + WaitDuration +) + +// DatabasePropertyFromString helper function +func DatabasePropertyFromString(property string) (DatabaseProperty, error) { + switch strings.ToLower(property) { + case "size": + return Size, nil + case "idle": + return Idle, nil + case "inuse": + return InUse, nil + case "maxidleclosed": + return MaxIdleClosed, nil + case "maxlifetimeclosed": + return MaxLifetimeClosed, nil + case "maxopenconnections": + return MaxOpenConnections, nil + case "openconnections": + return OpenConnections, nil + case "waitcount": + return WaitCount, nil + case "waitduration": + return WaitDuration, nil + default: + return Unknown, fmt.Errorf("unknown database property") + } +} + +// Stat satisfies the ethdb.Stater interface +// Stat returns a particular internal stat of the database +func (d *Database) Stat(property string) (string, error) { + prop, err := DatabasePropertyFromString(property) + if err != nil { + return "", err + } + switch prop { + case Size: + var byteSize string + return byteSize, d.db.Get(&byteSize, dbSizePgStr) + case Idle: + return string(d.db.Stats().Idle), nil + case InUse: + return string(d.db.Stats().InUse), nil + case MaxIdleClosed: + return string(d.db.Stats().MaxIdleClosed), nil + case MaxLifetimeClosed: + return string(d.db.Stats().MaxLifetimeClosed), nil + case MaxOpenConnections: + return string(d.db.Stats().MaxOpenConnections), nil + case OpenConnections: + return string(d.db.Stats().OpenConnections), nil + case WaitCount: + return string(d.db.Stats().WaitCount), nil + case WaitDuration: + return d.db.Stats().WaitDuration.String(), nil + default: + return "", fmt.Errorf("unhandled database property") + } +} + +// Compact satisfies the ethdb.Compacter interface +// Compact flattens the underlying data store for the given key range +func (d *Database) Compact(start []byte, limit []byte) error { + return nil +} + +// NewBatch satisfies the ethdb.Batcher interface +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called +func (d *Database) NewBatch() ethdb.Batch { + return NewBatch(d.db, nil) +} + +// NewIterator creates a binary-alphabetical iterator over the entire keyspace +// contained within the key-value database. +func (d *Database) NewIterator() ethdb.Iterator { + return NewIterator(nil, nil, d.db) +} + +// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of +// database content starting at a particular initial key (or after, if it does +// not exist). +func (d *Database) NewIteratorWithStart(start []byte) ethdb.Iterator { + return NewIterator(start, nil, d.db) +} + +// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset +// of database content with a particular key prefix. +func (d *Database) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { + return NewIterator(nil, prefix, d.db) +} + +// Close satisfies the io.Closer interface +// Close closes the db connection +func (d *Database) Close() error { + return d.db.DB.Close() +} diff --git a/ethdb/postgres/database_test.go b/ethdb/postgres/database_test.go new file mode 100644 index 000000000..bdb698ecb --- /dev/null +++ b/ethdb/postgres/database_test.go @@ -0,0 +1,385 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres_test + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + pgipfsethdb "github.com/ethereum/go-ethereum/ethdb/postgres" + "github.com/ethereum/go-ethereum/rlp" + "github.com/jmoiron/sqlx" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var ( + database ethdb.Database + db *sqlx.DB + err error + testHeader = types.Header{Number: big.NewInt(1337)} + testValue, _ = rlp.EncodeToBytes(testHeader) + testKeccakEthKey = testHeader.Hash().Bytes() + testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testKeccakEthKey) + + testPrefixedEthKey = append(append([]byte("prefix"), pgipfsethdb.KeyDelineation...), testKeccakEthKey...) + testPrefixedDsKey = common.Bytes2Hex(testPrefixedEthKey) + + testSuffixedEthKey = append(append(testPrefixedEthKey, pgipfsethdb.KeyDelineation...), []byte("suffix")...) + testSuffixedDsKey = common.Bytes2Hex(testSuffixedEthKey) + + testHeaderEthKey = append(append(append(append(pgipfsethdb.HeaderPrefix, pgipfsethdb.KeyDelineation...), + []byte("number")...), pgipfsethdb.NumberDelineation...), testKeccakEthKey...) + testHeaderDsKey = testMhKey + + testPreimageEthKey = append(append(pgipfsethdb.PreimagePrefix, pgipfsethdb.KeyDelineation...), testKeccakEthKey...) + testPreimageDsKey = testMhKey +) + +var _ = Describe("Database", func() { + BeforeEach(func() { + db, err = pgipfsethdb.TestDB() + Expect(err).ToNot(HaveOccurred()) + database = pgipfsethdb.NewDatabase(db) + }) + AfterEach(func() { + err = pgipfsethdb.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("Has - Keccak keys", func() { + It("returns false if a key-pair doesn't exist in the db", func() { + has, err := database.Has(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).ToNot(BeTrue()) + }) + It("returns true if a key-pair exists in the db", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testKeccakEthKey, testMhKey) + Expect(err).ToNot(HaveOccurred()) + has, err := database.Has(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(BeTrue()) + }) + }) + + Describe("Has - Prefixed keys", func() { + It("returns false if a key-pair doesn't exist in the db", func() { + has, err := database.Has(testPrefixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).ToNot(BeTrue()) + }) + It("returns true if a key-pair exists in the db", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testPrefixedDsKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testPrefixedEthKey, testPrefixedDsKey) + Expect(err).ToNot(HaveOccurred()) + has, err := database.Has(testPrefixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(BeTrue()) + }) + }) + + Describe("Has - Suffixed keys", func() { + It("returns false if a key-pair doesn't exist in the db", func() { + has, err := database.Has(testSuffixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).ToNot(BeTrue()) + }) + It("returns true if a key-pair exists in the db", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testSuffixedDsKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testSuffixedEthKey, testSuffixedDsKey) + Expect(err).ToNot(HaveOccurred()) + has, err := database.Has(testSuffixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(BeTrue()) + }) + }) + + Describe("Has - Header keys", func() { + It("returns false if a key-pair doesn't exist in the db", func() { + has, err := database.Has(testHeaderEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).ToNot(BeTrue()) + }) + It("returns true if a key-pair exists in the db", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testHeaderDsKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testHeaderEthKey, testHeaderDsKey) + Expect(err).ToNot(HaveOccurred()) + has, err := database.Has(testHeaderEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(BeTrue()) + }) + }) + + Describe("Has - Preimage keys", func() { + It("returns false if a key-pair doesn't exist in the db", func() { + has, err := database.Has(testPreimageEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).ToNot(BeTrue()) + }) + It("returns true if a key-pair exists in the db", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testPreimageDsKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testPreimageEthKey, testPreimageDsKey) + Expect(err).ToNot(HaveOccurred()) + has, err := database.Has(testPreimageEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(BeTrue()) + }) + }) + + Describe("Get - Keccak keys", func() { + It("throws an err if the key-pair doesn't exist in the db", func() { + _, err = database.Get(testKeccakEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + It("returns the value associated with the key, if the pair exists", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testKeccakEthKey, testMhKey) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Get - Prefixed keys", func() { + It("throws an err if the key-pair doesn't exist in the db", func() { + _, err = database.Get(testPrefixedEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + It("returns the value associated with the key, if the pair exists", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testPrefixedDsKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testPrefixedEthKey, testPrefixedDsKey) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testPrefixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Get - Suffixed keys", func() { + It("throws an err if the key-pair doesn't exist in the db", func() { + _, err = database.Get(testSuffixedEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + It("returns the value associated with the key, if the pair exists", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testSuffixedDsKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testSuffixedEthKey, testSuffixedDsKey) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testSuffixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Get - Header keys", func() { + It("throws an err if the key-pair doesn't exist in the db", func() { + _, err = database.Get(testHeaderEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + It("returns the value associated with the key, if the pair exists", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testHeaderDsKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testHeaderEthKey, testHeaderDsKey) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testHeaderEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Get - Preimage keys", func() { + It("throws an err if the key-pair doesn't exist in the db", func() { + _, err = database.Get(testPreimageEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + It("returns the value associated with the key, if the pair exists", func() { + _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testPreimageDsKey, testValue) + Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testPreimageEthKey, testPreimageDsKey) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testPreimageEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Put - Keccak keys", func() { + It("persists the key-value pair in the database", func() { + _, err = database.Get(testKeccakEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = database.Put(testKeccakEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Put - Prefixed keys", func() { + It("persists the key-value pair in the database", func() { + _, err = database.Get(testPrefixedEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = database.Put(testPrefixedEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testPrefixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Put - Suffixed keys", func() { + It("persists the key-value pair in the database", func() { + _, err = database.Get(testSuffixedEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = database.Put(testSuffixedEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testSuffixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Put - Header keys", func() { + It("persists the key-value pair in the database", func() { + _, err = database.Get(testHeaderEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = database.Put(testHeaderEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testHeaderEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Put - Preimage keys", func() { + It("persists the key-value pair in the database", func() { + _, err = database.Get(testPreimageEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = database.Put(testPreimageEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testPreimageEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Delete - Keccak keys", func() { + It("removes the key-value pair from the database", func() { + err = database.Put(testKeccakEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + + err = database.Delete(testKeccakEthKey) + Expect(err).ToNot(HaveOccurred()) + _, err = database.Get(testKeccakEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) + + Describe("Delete - Prefixed keys", func() { + It("removes the key-value pair from the database", func() { + err = database.Put(testPrefixedEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testPrefixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + + err = database.Delete(testPrefixedEthKey) + Expect(err).ToNot(HaveOccurred()) + _, err = database.Get(testPrefixedEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) + + Describe("Delete - Suffixed keys", func() { + It("removes the key-value pair from the database", func() { + err = database.Put(testSuffixedEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testSuffixedEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + + err = database.Delete(testSuffixedEthKey) + Expect(err).ToNot(HaveOccurred()) + _, err = database.Get(testSuffixedEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) + + Describe("Delete - Header keys", func() { + It("removes the key-value pair from the database", func() { + err = database.Put(testHeaderEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testHeaderEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + + err = database.Delete(testHeaderEthKey) + Expect(err).ToNot(HaveOccurred()) + _, err = database.Get(testHeaderEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) + + Describe("Delete - Preimage keys", func() { + It("removes the key-value pair from the database", func() { + err = database.Put(testPreimageEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testPreimageEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + + err = database.Delete(testPreimageEthKey) + Expect(err).ToNot(HaveOccurred()) + _, err = database.Get(testPreimageEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) +}) diff --git a/ethdb/postgres/iterator.go b/ethdb/postgres/iterator.go new file mode 100644 index 000000000..797691807 --- /dev/null +++ b/ethdb/postgres/iterator.go @@ -0,0 +1,115 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "github.com/ethereum/go-ethereum/ethdb" + "github.com/jmoiron/sqlx" +) + +const ( + initPgStr = `SELECT eth_key, data FROM public.blocks + INNER JOIN eth.key_preimages ON (ipfs_key = key) + WHERE eth_key = $1` + nextPgStr = `SELECT eth_key, data FROM public.blocks + INNER JOIN eth.key_preimages ON (ipfs_key = key) + WHERE eth_key > $1 + ORDER BY eth_key LIMIT 1` + nextPgStrWithPrefix = `SELECT eth_key, data FROM public.blocks + INNER JOIN eth.key_preimages ON (ipfs_key = key) + WHERE eth_key > $1 AND prefix = $2 + ORDER BY eth_key LIMIT 1` +) + +type nextModel struct { + Key []byte `db:"eth_key"` + Value []byte `db:"data"` +} + +// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection +type Iterator struct { + db *sqlx.DB + currentKey, prefix, currentValue []byte + err error + init bool +} + +// NewIterator returns an ethdb.Iterator interface for PG-IPFS +func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator { + return &Iterator{ + db: db, + prefix: prefix, + currentKey: start, + init: start != nil, + } +} + +// Next satisfies the ethdb.Iterator interface +// Next moves the iterator to the next key/value pair +// It returns whether the iterator is exhausted +func (i *Iterator) Next() bool { + next := new(nextModel) + if i.init { + i.init = false + if err := i.db.Get(next, initPgStr, i.currentKey); err != nil { + i.currentKey, i.currentValue, i.err = nil, nil, err + return false + } + } else if i.prefix != nil { + if err := i.db.Get(next, nextPgStrWithPrefix, i.currentKey, i.prefix); err != nil { + i.currentKey, i.currentValue, i.err = nil, nil, err + return false + } + } else { + if err := i.db.Get(next, nextPgStr, i.currentKey); err != nil { + i.currentKey, i.currentValue, i.err = nil, nil, err + return false + } + } + i.currentKey, i.currentValue, i.err = next.Key, next.Value, nil + return true +} + +// Error satisfies the ethdb.Iterator interface +// Error returns any accumulated error +// Exhausting all the key/value pairs is not considered to be an error +func (i *Iterator) Error() error { + return i.err +} + +// Key satisfies the ethdb.Iterator interface +// Key returns the key of the current key/value pair, or nil if done +// The caller should not modify the contents of the returned slice +// and its contents may change on the next call to Next +func (i *Iterator) Key() []byte { + return i.currentKey +} + +// Value satisfies the ethdb.Iterator interface +// Value returns the value of the current key/value pair, or nil if done +// The caller should not modify the contents of the returned slice +// and its contents may change on the next call to Next +func (i *Iterator) Value() []byte { + return i.currentValue +} + +// Release satisfies the ethdb.Iterator interface +// Release releases associated resources +// Release should always succeed and can be called multiple times without causing error +func (i *Iterator) Release() { + i.db, i.currentKey, i.currentValue, i.err, i.prefix = nil, nil, nil, nil, nil +} diff --git a/ethdb/postgres/iterator_test.go b/ethdb/postgres/iterator_test.go new file mode 100644 index 000000000..ea768fc16 --- /dev/null +++ b/ethdb/postgres/iterator_test.go @@ -0,0 +1,512 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres_test + +import ( + "database/sql" + + "github.com/ethereum/go-ethereum/ethdb" + pgipfsethdb "github.com/ethereum/go-ethereum/ethdb/postgres" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var ( + iterator ethdb.Iterator + testPrefix = []byte("testPrefix") + testEthKey1 = []byte{'\x01'} + testEthKey2 = []byte{'\x01', '\x01'} + testEthKey3 = []byte{'\x01', '\x02'} + testEthKey4 = []byte{'\x01', '\x0e'} + testEthKey5 = []byte{'\x01', '\x02', '\x01'} + testEthKey6 = []byte{'\x01', '\x0e', '\x01'} + prefixedTestEthKey1 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey1...) + prefixedTestEthKey2 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey2...) + prefixedTestEthKey3 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey3...) + prefixedTestEthKey4 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey4...) + prefixedTestEthKey5 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey5...) + prefixedTestEthKey6 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey6...) + mockValue1 = []byte{1} + mockValue2 = []byte{2} + mockValue3 = []byte{3} + mockValue4 = []byte{4} + mockValue5 = []byte{5} + mockValue6 = []byte{6} +) + +var _ = Describe("Iterator", func() { + BeforeEach(func() { + db, err = pgipfsethdb.TestDB() + Expect(err).ToNot(HaveOccurred()) + database = pgipfsethdb.NewDatabase(db) + // non-prefixed entries + err = database.Put(testEthKey1, mockValue1) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(testEthKey2, mockValue2) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(testEthKey3, mockValue3) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(testEthKey4, mockValue4) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(testEthKey5, mockValue5) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(testEthKey6, mockValue6) + Expect(err).ToNot(HaveOccurred()) + // prefixed entries + err = database.Put(prefixedTestEthKey1, mockValue1) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(prefixedTestEthKey2, mockValue2) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(prefixedTestEthKey3, mockValue3) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(prefixedTestEthKey4, mockValue4) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(prefixedTestEthKey5, mockValue5) + Expect(err).ToNot(HaveOccurred()) + err = database.Put(prefixedTestEthKey6, mockValue6) + Expect(err).ToNot(HaveOccurred()) + }) + AfterEach(func() { + err = pgipfsethdb.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("NewIterator", func() { + It("iterates over the entire key-set (prefixed or not)", func() { + iterator = database.NewIterator() + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(BeNil()) + + more := iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey1)) + Expect(iterator.Value()).To(Equal(mockValue1)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey1)) + Expect(iterator.Value()).To(Equal(mockValue1)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).ToNot(BeTrue()) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(Equal(sql.ErrNoRows)) + }) + }) + + Describe("NewIteratorWithPrefix", func() { + It("iterates over all db entries that have the provided prefix", func() { + iterator = database.NewIteratorWithPrefix(testPrefix) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(BeNil()) + + more := iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey1)) + Expect(iterator.Value()).To(Equal(mockValue1)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).ToNot(BeTrue()) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(Equal(sql.ErrNoRows)) + }) + + It("behaves as no prefix is provided if prefix is nil", func() { + iterator = database.NewIteratorWithPrefix(nil) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(BeNil()) + + more := iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey1)) + Expect(iterator.Value()).To(Equal(mockValue1)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey1)) + Expect(iterator.Value()).To(Equal(mockValue1)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).ToNot(BeTrue()) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(Equal(sql.ErrNoRows)) + }) + + It("considers empty but non-nil []byte a valid prefix, which precludes iteration over any other prefixed keys", func() { + iterator = database.NewIteratorWithPrefix([]byte{}) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(BeNil()) + + more := iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey1)) + Expect(iterator.Value()).To(Equal(mockValue1)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).ToNot(BeTrue()) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(Equal(sql.ErrNoRows)) + }) + }) + + Describe("NewIteratorWithStart", func() { + It("iterates over the entire key-set (prefixed or not) starting with at the provided path", func() { + iterator = database.NewIteratorWithStart(testEthKey2) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Error()).To(BeNil()) + + more := iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey1)) + Expect(iterator.Value()).To(Equal(mockValue1)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).ToNot(BeTrue()) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(Equal(sql.ErrNoRows)) + }) + + It("iterates over the entire key-set (prefixed or not) starting with at the provided path", func() { + iterator = database.NewIteratorWithStart(prefixedTestEthKey3) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey3)) + Expect(iterator.Error()).To(BeNil()) + + more := iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey3)) + Expect(iterator.Value()).To(Equal(mockValue3)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey5)) + Expect(iterator.Value()).To(Equal(mockValue5)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey4)) + Expect(iterator.Value()).To(Equal(mockValue4)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(prefixedTestEthKey6)) + Expect(iterator.Value()).To(Equal(mockValue6)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).ToNot(BeTrue()) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(Equal(sql.ErrNoRows)) + }) + }) + + Describe("Release", func() { + It("releases resources associated with the Iterator", func() { + iterator = database.NewIteratorWithStart(testEthKey2) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Error()).To(BeNil()) + + more := iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + + iterator.Release() + iterator.Release() // check that we don't panic if called multiple times + + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(BeNil()) + Expect(iterator.Error()).To(BeNil()) + Expect(func() { iterator.Next() }).To(Panic()) // check that we panic if we try to use released iterator + + // We can still create a new iterator from the same backing db + iterator = database.NewIteratorWithStart(testEthKey2) + Expect(iterator.Value()).To(BeNil()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Error()).To(BeNil()) + + more = iterator.Next() + Expect(more).To(BeTrue()) + Expect(iterator.Key()).To(Equal(testEthKey2)) + Expect(iterator.Value()).To(Equal(mockValue2)) + Expect(iterator.Error()).To(BeNil()) + }) + }) +}) diff --git a/ethdb/postgres/key_type.go b/ethdb/postgres/key_type.go new file mode 100644 index 000000000..f64f108f0 --- /dev/null +++ b/ethdb/postgres/key_type.go @@ -0,0 +1,59 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/core/rawdb" +) + +type KeyType uint + +const ( + Invalid KeyType = iota + Static + Keccak + Prefixed + Suffixed + Header + Preimage +) + +// ResolveKeyType returns the key type based on the prefix +func ResolveKeyType(key []byte) (KeyType, [][]byte) { + sk := bytes.Split(key, rawdb.KeyDelineation) + switch len(sk) { + case 1: + if len(sk[0]) < 32 { + return Static, sk + } + return Keccak, sk + case 2: + switch prefix := sk[0]; { + case bytes.Equal(prefix, rawdb.HeaderPrefix): + return Header, bytes.Split(sk[1], rawdb.NumberDelineation) + case bytes.Equal(prefix, rawdb.PreimagePrefix): + return Preimage, sk + default: + return Prefixed, sk + } + case 3: + return Suffixed, sk + } + return Invalid, sk +} diff --git a/ethdb/postgres/postgres_suite_test.go b/ethdb/postgres/postgres_suite_test.go new file mode 100644 index 000000000..9b868788c --- /dev/null +++ b/ethdb/postgres/postgres_suite_test.go @@ -0,0 +1,29 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestPGIPFSETHDB(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "PG-IPFS ethdb test") +} diff --git a/ethdb/postgres/test_helpers.go b/ethdb/postgres/test_helpers.go new file mode 100644 index 000000000..c0c26c646 --- /dev/null +++ b/ethdb/postgres/test_helpers.go @@ -0,0 +1,65 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import "github.com/jmoiron/sqlx" + +// TestDB connect to the testing database +// it assumes the database has the IPFS public.blocks table present +// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table +func TestDB() (*sqlx.DB, error) { + connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" + return sqlx.Connect("postgres", connectStr) +} + +// ResetTestDB drops all rows in the test db public.blocks table +func ResetTestDB(db *sqlx.DB) error { + tx, err := db.Beginx() + if err != nil { + return err + } + defer func() { + if p := recover(); p != nil { + tx.Rollback() + panic(p) + } else if err != nil { + tx.Rollback() + } else { + err = tx.Commit() + } + }() + if _, err := tx.Exec("TRUNCATE public.blocks CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.key_preimages CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.ancient_headers CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.ancient_hashes CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.ancient_bodies CASCADE"); err != nil { + return err + } + if _, err := tx.Exec("TRUNCATE eth.ancient_receipts CASCADE"); err != nil { + return err + } + _, err = tx.Exec("TRUNCATE eth.ancient_tds CASCADE") + return err +} diff --git a/ethdb/postgres/util.go b/ethdb/postgres/util.go new file mode 100644 index 000000000..530028ec7 --- /dev/null +++ b/ethdb/postgres/util.go @@ -0,0 +1,63 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-ipfs-ds-help" + _ "github.com/lib/pq" //postgres driver + "github.com/multiformats/go-multihash" +) + +// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string +func MultihashKeyFromKeccak256(h []byte) (string, error) { + mh, err := multihash.Encode(h, multihash.KECCAK_256) + if err != nil { + return "", err + } + dbKey := dshelp.MultihashToDsKey(mh) + return blockstore.BlockPrefix.String() + dbKey.String(), nil +} + +// DatastoreKeyFromGethKey returns the public.blocks key from the provided geth key +// It also returns the key's prefix, if it has one +func DatastoreKeyFromGethKey(h []byte) (string, []byte, error) { + keyType, keyComponents := ResolveKeyType(h) + switch keyType { + case Keccak: + mhKey, err := MultihashKeyFromKeccak256(h) + return mhKey, nil, err + case Header: + mhKey, err := MultihashKeyFromKeccak256(keyComponents[1]) + return mhKey, keyComponents[0], err + case Preimage: + mhKey, err := MultihashKeyFromKeccak256(keyComponents[1]) + return mhKey, keyComponents[0], err + case Prefixed, Suffixed: + // This data is not mapped by hash => content by geth, store it using the prefixed/suffixed key directly + // I.e. the public.blocks datastore key == the hex representation of the geth key + // Alternatively, decompose the data and derive the hash + return common.Bytes2Hex(h), keyComponents[0], nil + case Static: + return common.Bytes2Hex(h), nil, nil + default: + return "", nil, fmt.Errorf("invalid formatting of database key: %x", h) + } +}