port in ipfs-ethdb

This commit is contained in:
Ian Norden 2020-09-18 08:04:17 -05:00
parent 50adf39ded
commit 857214d512
13 changed files with 2276 additions and 0 deletions

238
ethdb/postgres/ancient.go Normal file
View File

@ -0,0 +1,238 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import (
"database/sql"
"fmt"
"github.com/sirupsen/logrus"
)
const (
// FreezerHeaderTable indicates the name of the freezer header table.
FreezerHeaderTable = "headers"
// FreezerHashTable indicates the name of the freezer canonical hash table.
FreezerHashTable = "hashes"
// FreezerBodiesTable indicates the name of the freezer block body table.
FreezerBodiesTable = "bodies"
// FreezerReceiptTable indicates the name of the freezer receipts table.
FreezerReceiptTable = "receipts"
// FreezerDifficultyTable indicates the name of the freezer total difficulty table.
FreezerDifficultyTable = "diffs"
// ancient append Postgres statements
appendAncientHeaderPgStr = "INSERT INTO eth.ancient_headers (block_number, header) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET header = $2"
appendAncientHashPgStr = "INSERT INTO eth.ancient_hashes (block_number, hash) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET hash = $2"
appendAncientBodyPgStr = "INSERT INTO eth.ancient_bodies (block_number, body) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET body = $2"
appendAncientReceiptsPgStr = "INSERT INTO eth.ancient_receipts (block_number, receipts) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET receipts = $2"
appendAncientTDPgStr = "INSERT INTO eth.ancient_tds (block_number, td) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET td = $2"
// ancient truncate Postgres statements
truncateAncientHeaderPgStr = "DELETE FROM eth.ancient_headers WHERE block_number > $1"
truncateAncientHashPgStr = "DELETE FROM eth.ancient_hashes WHERE block_number > $1"
truncateAncientBodiesPgStr = "DELETE FROM eth.ancient_bodies WHERE block_number > $1"
truncateAncientReceiptsPgStr = "DELETE FROM eth.ancient_receipts WHERE block_number > $1"
truncateAncientTDPgStr = "DELETE FROM eth.ancient_tds WHERE block_number > $1"
// ancient size Postgres statement
ancientSizePgStr = "SELECT pg_total_relation_size($1)"
// ancients Postgres statement
ancientsPgStr = "SELECT block_number FROM eth.ancient_headers ORDER BY block_number DESC LIMIT 1"
// ancient has Postgres statements
hasAncientHeaderPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_headers WHERE block_number = $1)"
hasAncientHashPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_hashes WHERE block_number = $1)"
hasAncientBodyPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_bodies WHERE block_number = $1)"
hasAncientReceiptsPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_receipts WHERE block_number = $1)"
hasAncientTDPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_tds WHERE block_number = $1)"
// ancient get Postgres statements
getAncientHeaderPgStr = "SELECT header FROM eth.ancient_headers WHERE block_number = $1"
getAncientHashPgStr = "SELECT hash FROM eth.ancient_hashes WHERE block_number = $1"
getAncientBodyPgStr = "SELECT body FROM eth.ancient_bodies WHERE block_number = $1"
getAncientReceiptsPgStr = "SELECT receipts FROM eth.ancient_receipts WHERE block_number = $1"
getAncientTDPgStr = "SELECT td FROM eth.ancient_tds WHERE block_number = $1"
)
// HasAncient satisfies the ethdb.AncientReader interface
// HasAncient returns an indicator whether the specified data exists in the ancient store
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
var pgStr string
switch kind {
case FreezerHeaderTable:
pgStr = hasAncientHeaderPgStr
case FreezerHashTable:
pgStr = hasAncientHashPgStr
case FreezerBodiesTable:
pgStr = hasAncientBodyPgStr
case FreezerReceiptTable:
pgStr = hasAncientReceiptsPgStr
case FreezerDifficultyTable:
pgStr = hasAncientTDPgStr
default:
return false, fmt.Errorf("unexpected ancient kind: %s", kind)
}
has := new(bool)
return *has, d.db.Get(has, pgStr, number)
}
// Ancient satisfies the ethdb.AncientReader interface
// Ancient retrieves an ancient binary blob from the append-only immutable files
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
var pgStr string
switch kind {
case FreezerHeaderTable:
pgStr = getAncientHeaderPgStr
case FreezerHashTable:
pgStr = getAncientHashPgStr
case FreezerBodiesTable:
pgStr = getAncientBodyPgStr
case FreezerReceiptTable:
pgStr = getAncientReceiptsPgStr
case FreezerDifficultyTable:
pgStr = getAncientTDPgStr
default:
return nil, fmt.Errorf("unexpected ancient kind: %s", kind)
}
data := new([]byte)
return *data, d.db.Get(data, pgStr, number)
}
// Ancients satisfies the ethdb.AncientReader interface
// Ancients returns the ancient item numbers in the ancient store
func (d *Database) Ancients() (uint64, error) {
num := new(uint64)
if err := d.db.Get(num, ancientsPgStr); err != nil {
if err == sql.ErrNoRows {
return 0, nil
}
return 0, err
}
return *num, nil
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
var tableName string
switch kind {
case FreezerHeaderTable:
tableName = "eth.ancient_headers"
case FreezerHashTable:
tableName = "eth.ancient_hashes"
case FreezerBodiesTable:
tableName = "eth.ancient_bodies"
case FreezerReceiptTable:
tableName = "eth.ancient_receipts"
case FreezerDifficultyTable:
tableName = "eth.ancient_tds"
default:
return 0, fmt.Errorf("unexpected ancient kind: %s", kind)
}
size := new(uint64)
return *size, d.db.Get(size, ancientSizePgStr, tableName)
}
// AppendAncient satisfies the ethdb.AncientWriter interface
// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
func (d *Database) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
// append in batch
var err error
if d.ancientTx == nil {
d.ancientTx, err = d.db.Beginx()
if err != nil {
return err
}
}
defer func() {
if err != nil {
if err := d.ancientTx.Rollback(); err != nil {
logrus.Error(err)
d.ancientTx = nil
}
}
}()
if _, err := d.ancientTx.Exec(appendAncientHashPgStr, number, hash); err != nil {
return err
}
if _, err := d.ancientTx.Exec(appendAncientHeaderPgStr, number, header); err != nil {
return err
}
if _, err := d.ancientTx.Exec(appendAncientBodyPgStr, number, body); err != nil {
return err
}
if _, err := d.ancientTx.Exec(appendAncientReceiptsPgStr, number, receipts); err != nil {
return err
}
_, err = d.ancientTx.Exec(appendAncientTDPgStr, number, td)
return err
}
// TruncateAncients satisfies the ethdb.AncientWriter interface
// TruncateAncients discards all but the first n ancient data from the ancient store
func (d *Database) TruncateAncients(n uint64) error {
// truncate in batch
var err error
if d.ancientTx == nil {
d.ancientTx, err = d.db.Beginx()
if err != nil {
return err
}
}
defer func() {
if err != nil {
if err := d.ancientTx.Rollback(); err != nil {
logrus.Error(err)
d.ancientTx = nil
}
}
}()
if _, err := d.ancientTx.Exec(truncateAncientHeaderPgStr, n); err != nil {
return err
}
if _, err := d.ancientTx.Exec(truncateAncientHashPgStr, n); err != nil {
return err
}
if _, err := d.ancientTx.Exec(truncateAncientBodiesPgStr, n); err != nil {
return err
}
if _, err := d.ancientTx.Exec(truncateAncientReceiptsPgStr, n); err != nil {
return err
}
_, err = d.ancientTx.Exec(truncateAncientTDPgStr, n)
return err
}
// Sync satisfies the ethdb.AncientWriter interface
// Sync flushes all in-memory ancient store data to disk
func (d *Database) Sync() error {
if d.ancientTx == nil {
return nil
}
if err := d.ancientTx.Commit(); err != nil {
return err
}
d.ancientTx = nil
return nil
}

View File

@ -0,0 +1,231 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres_test
import (
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
pgipfsethdb "github.com/ethereum/go-ethereum/ethdb/postgres"
"github.com/ethereum/go-ethereum/rlp"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var (
ancientDB ethdb.Database
testBlockNumber uint64 = 1
testAncientHeader = types.Header{Number: big.NewInt(2)}
testAncientHeaderRLP, _ = rlp.EncodeToBytes(testHeader2)
testAncientHash = testAncientHeader.Hash().Bytes()
testAncientBodyBytes = make([]byte, 10000)
testAncientReceiptsBytes = make([]byte, 5000)
testAncientTD, _ = new(big.Int).SetString("1000000000000000000000", 10)
testAncientTDBytes = testAncientTD.Bytes()
)
var _ = Describe("Ancient", func() {
BeforeEach(func() {
db, err = pgipfsethdb.TestDB()
Expect(err).ToNot(HaveOccurred())
ancientDB = pgipfsethdb.NewDatabase(db)
})
AfterEach(func() {
err = pgipfsethdb.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("AppendAncient/Sync/Has", func() {
It("adds eth objects to the Ancient database and returns whether or not an ancient record exists", func() {
hasAncient(testBlockNumber, false)
err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
Expect(err).ToNot(HaveOccurred())
hasAncient(testBlockNumber, false)
err = ancientDB.Sync()
Expect(err).ToNot(HaveOccurred())
hasAncient(testBlockNumber, true)
})
})
Describe("AppendAncient/Sync/Ancient", func() {
It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() {
hasAncient(testBlockNumber, false)
_, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber)
Expect(err).To(HaveOccurred())
_, err = ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber)
Expect(err).To(HaveOccurred())
_, err = ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber)
Expect(err).To(HaveOccurred())
_, err = ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber)
Expect(err).To(HaveOccurred())
_, err = ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber)
Expect(err).To(HaveOccurred())
err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
Expect(err).ToNot(HaveOccurred())
err = ancientDB.Sync()
Expect(err).ToNot(HaveOccurred())
hasAncient(testBlockNumber, true)
ancientHeader, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(ancientHeader).To(Equal(testAncientHeaderRLP))
ancientHash, err := ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(ancientHash).To(Equal(testAncientHash))
ancientBody, err := ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(ancientBody).To(Equal(testAncientBodyBytes))
ancientReceipts, err := ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(ancientReceipts).To(Equal(testAncientReceiptsBytes))
ancientTD, err := ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(ancientTD).To(Equal(testAncientTDBytes))
})
})
Describe("AppendAncient/Sync/Ancients", func() {
It("returns the height of the ancient database", func() {
ancients, err := ancientDB.Ancients()
Expect(err).ToNot(HaveOccurred())
Expect(ancients).To(Equal(uint64(0)))
for i := uint64(0); i <= 100; i++ {
hasAncient(i, false)
err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
Expect(err).ToNot(HaveOccurred())
}
err = ancientDB.Sync()
Expect(err).ToNot(HaveOccurred())
for i := uint64(0); i <= 100; i++ {
hasAncient(i, true)
}
ancients, err = ancientDB.Ancients()
Expect(err).ToNot(HaveOccurred())
Expect(ancients).To(Equal(uint64(100)))
})
})
Describe("AppendAncient/Truncate/Sync", func() {
It("truncates the ancient database to the provided height", func() {
for i := uint64(0); i <= 100; i++ {
hasAncient(i, false)
err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
Expect(err).ToNot(HaveOccurred())
}
err = ancientDB.Sync()
Expect(err).ToNot(HaveOccurred())
err = ancientDB.TruncateAncients(50)
Expect(err).ToNot(HaveOccurred())
for i := uint64(0); i <= 100; i++ {
hasAncient(i, true)
}
ancients, err := ancientDB.Ancients()
Expect(err).ToNot(HaveOccurred())
Expect(ancients).To(Equal(uint64(100)))
err = ancientDB.Sync()
Expect(err).ToNot(HaveOccurred())
for i := uint64(0); i <= 100; i++ {
if i <= 50 {
hasAncient(i, true)
} else {
hasAncient(i, false)
}
}
ancients, err = ancientDB.Ancients()
Expect(err).ToNot(HaveOccurred())
Expect(ancients).To(Equal(uint64(50)))
})
})
Describe("AppendAncient/Sync/AncientSize", func() {
It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() {
for i := uint64(0); i <= 100; i++ {
hasAncient(i, false)
err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
Expect(err).ToNot(HaveOccurred())
}
err = ancientDB.Sync()
Expect(err).ToNot(HaveOccurred())
for i := uint64(0); i <= 100; i++ {
hasAncient(i, true)
}
ancientHeaderSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHeaderTable)
Expect(err).ToNot(HaveOccurred())
Expect(ancientHeaderSize).To(Equal(uint64(106496)))
ancientHashSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHashTable)
Expect(err).ToNot(HaveOccurred())
Expect(ancientHashSize).To(Equal(uint64(32768)))
ancientBodySize, err := ancientDB.AncientSize(pgipfsethdb.FreezerBodiesTable)
Expect(err).ToNot(HaveOccurred())
Expect(ancientBodySize).To(Equal(uint64(73728)))
ancientReceiptsSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerReceiptTable)
Expect(err).ToNot(HaveOccurred())
Expect(ancientReceiptsSize).To(Equal(uint64(65536)))
ancientTDSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerDifficultyTable)
Expect(err).ToNot(HaveOccurred())
Expect(ancientTDSize).To(Equal(uint64(32768)))
})
})
})
func hasAncient(blockNumber uint64, shouldHave bool) {
has, err := ancientDB.HasAncient(pgipfsethdb.FreezerHeaderTable, blockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(Equal(shouldHave))
has, err = ancientDB.HasAncient(pgipfsethdb.FreezerHashTable, blockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(Equal(shouldHave))
has, err = ancientDB.HasAncient(pgipfsethdb.FreezerBodiesTable, blockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(Equal(shouldHave))
has, err = ancientDB.HasAncient(pgipfsethdb.FreezerReceiptTable, blockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(Equal(shouldHave))
has, err = ancientDB.HasAncient(pgipfsethdb.FreezerDifficultyTable, blockNumber)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(Equal(shouldHave))
}

124
ethdb/postgres/batch.go Normal file
View File

@ -0,0 +1,124 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
type Batch struct {
db *sqlx.DB
tx *sqlx.Tx
valueSize int
replayCache map[string][]byte
}
// NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
b := &Batch{
db: db,
tx: tx,
replayCache: make(map[string][]byte),
}
if tx == nil {
b.Reset()
}
return b
}
// Put satisfies the ethdb.Batch interface
// Put inserts the given value into the key-value data store
// Key is expected to be the keccak256 hash of value
func (b *Batch) Put(key []byte, value []byte) (err error) {
dsKey, prefix, err := DatastoreKeyFromGethKey(key)
if err != nil {
return err
}
if _, err = b.tx.Exec(putPgStr, dsKey, value); err != nil {
return err
}
if _, err = b.tx.Exec(putPreimagePgStr, key, dsKey, prefix); err != nil {
return err
}
b.valueSize += len(value)
b.replayCache[common.Bytes2Hex(key)] = value
return nil
}
// Delete satisfies the ethdb.Batch interface
// Delete removes the key from the key-value data store
func (b *Batch) Delete(key []byte) (err error) {
_, err = b.tx.Exec(deletePgStr, key)
if err != nil {
return err
}
delete(b.replayCache, common.Bytes2Hex(key))
return nil
}
// ValueSize satisfies the ethdb.Batch interface
// ValueSize retrieves the amount of data queued up for writing
// The returned value is the total byte length of all data queued to write
func (b *Batch) ValueSize() int {
return b.valueSize
}
// Write satisfies the ethdb.Batch interface
// Write flushes any accumulated data to disk
// Reset should be called after every write
func (b *Batch) Write() error {
if b.tx == nil {
return nil
}
if err := b.tx.Commit(); err != nil {
return err
}
b.replayCache = nil
return nil
}
// Replay satisfies the ethdb.Batch interface
// Replay replays the batch contents
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
if b.tx != nil {
b.tx.Rollback()
b.tx = nil
}
for key, value := range b.replayCache {
if err := w.Put(common.Hex2Bytes(key), value); err != nil {
return err
}
}
b.replayCache = nil
return nil
}
// Reset satisfies the ethdb.Batch interface
// Reset resets the batch for reuse
// This should be called after every write
func (b *Batch) Reset() {
var err error
b.tx, err = b.db.Beginx()
if err != nil {
panic(err)
}
b.replayCache = make(map[string][]byte)
b.valueSize = 0
}

View File

@ -0,0 +1,143 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres_test
import (
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
pgipfsethdb "github.com/ethereum/go-ethereum/ethdb/postgres"
"github.com/ethereum/go-ethereum/rlp"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var (
batch ethdb.Batch
testHeader2 = types.Header{Number: big.NewInt(2)}
testValue2, _ = rlp.EncodeToBytes(testHeader2)
testKeccakEthKey2 = testHeader2.Hash().Bytes()
)
var _ = Describe("Batch", func() {
BeforeEach(func() {
db, err = pgipfsethdb.TestDB()
Expect(err).ToNot(HaveOccurred())
database = pgipfsethdb.NewDatabase(db)
batch = database.NewBatch()
})
AfterEach(func() {
err = pgipfsethdb.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("Put/Write", func() {
It("adds the key-value pair to the batch", func() {
_, err = database.Get(testKeccakEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testKeccakEthKey2)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = batch.Put(testKeccakEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testKeccakEthKey2, testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
val2, err := database.Get(testKeccakEthKey2)
Expect(err).ToNot(HaveOccurred())
Expect(val2).To(Equal(testValue2))
})
})
Describe("Delete/Reset/Write", func() {
It("deletes the key-value pair in the batch", func() {
err = batch.Put(testKeccakEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testKeccakEthKey2, testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
batch.Reset()
err = batch.Delete(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
err = batch.Delete(testKeccakEthKey2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testKeccakEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testKeccakEthKey2)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("ValueSize/Reset", func() {
It("returns the size of data in the batch queued for write", func() {
err = batch.Put(testKeccakEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testKeccakEthKey2, testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
size := batch.ValueSize()
Expect(size).To(Equal(len(testValue) + len(testValue2)))
batch.Reset()
size = batch.ValueSize()
Expect(size).To(Equal(0))
})
})
Describe("Replay", func() {
It("returns the size of data in the batch queued for write", func() {
err = batch.Put(testKeccakEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testKeccakEthKey2, testValue2)
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testKeccakEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testKeccakEthKey2)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = batch.Replay(database)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
val2, err := database.Get(testKeccakEthKey2)
Expect(err).ToNot(HaveOccurred())
Expect(val2).To(Equal(testValue2))
})
})
})

91
ethdb/postgres/config.go Normal file
View File

@ -0,0 +1,91 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import (
"fmt"
"time"
"github.com/jmoiron/sqlx"
)
var (
defaultMaxDBConnections = 1024
defaultMaxIdleConnections = 16
)
// Config holds Postgres connection pool configuration params
type Config struct {
Database string
Hostname string
Port int
User string
Password string
// Optimization parameters
MaxOpen int
MaxIdle int
MaxLifetime time.Duration
}
// NewConfig returns a new config struct from provided params
func NewConfig(database, hostname, password, user string, port, maxOpen, maxIdle int, maxLifetime time.Duration) *Config {
return &Config{
Database: database,
Hostname: hostname,
Port: port,
User: user,
Password: password,
MaxOpen: maxOpen,
MaxLifetime: maxLifetime,
MaxIdle: maxIdle,
}
}
// DbConnectionString resolves Postgres config params to a connection string
func DbConnectionString(config *Config) string {
if len(config.User) > 0 && len(config.Password) > 0 {
return fmt.Sprintf("postgresql://%s:%s@%s:%d/%s?sslmode=disable",
config.User, config.Password, config.Hostname, config.Port, config.Database)
}
if len(config.User) > 0 && len(config.Password) == 0 {
return fmt.Sprintf("postgresql://%s@%s:%d/%s?sslmode=disable",
config.User, config.Hostname, config.Port, config.Database)
}
return fmt.Sprintf("postgresql://%s:%d/%s?sslmode=disable", config.Hostname, config.Port, config.Database)
}
// NewDB opens and returns a new Postgres connection pool using the provided config
func NewDB(c *Config) (*sqlx.DB, error) {
connectStr := DbConnectionString(c)
db, err := sqlx.Connect("postgres", connectStr)
if err != nil {
return nil, err
}
if c.MaxIdle > 0 {
db.SetMaxIdleConns(c.MaxIdle)
} else {
db.SetMaxIdleConns(defaultMaxIdleConnections)
}
if c.MaxOpen > 0 {
db.SetMaxOpenConns(c.MaxOpen)
} else {
db.SetMaxOpenConns(defaultMaxDBConnections)
}
db.SetConnMaxLifetime(c.MaxLifetime)
return db, nil
}

221
ethdb/postgres/database.go Normal file
View File

@ -0,0 +1,221 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import (
"fmt"
"strings"
"github.com/sirupsen/logrus"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
const (
hasPgStr = "SELECT exists(SELECT 1 FROM eth.key_preimages WHERE eth_key = $1)"
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)"
deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
)
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
type Database struct {
db *sqlx.DB
ancientTx *sqlx.Tx
}
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
func NewKeyValueStore(db *sqlx.DB) ethdb.KeyValueStore {
return &Database{
db: db,
}
}
// NewDatabase returns a ethdb.Database interface for PG-IPFS
func NewDatabase(db *sqlx.DB) ethdb.Database {
return &Database{
db: db,
}
}
// Has satisfies the ethdb.KeyValueReader interface
// Has retrieves if a key is present in the key-value data store
// Has uses the eth.key_preimages table
func (d *Database) Has(key []byte) (bool, error) {
var exists bool
return exists, d.db.Get(&exists, hasPgStr, key)
}
// Get satisfies the ethdb.KeyValueReader interface
// Get retrieves the given key if it's present in the key-value data store
// Get uses the eth.key_preimages table
func (d *Database) Get(key []byte) ([]byte, error) {
var data []byte
return data, d.db.Get(&data, getPgStr, key)
}
// Put satisfies the ethdb.KeyValueWriter interface
// Put inserts the given value into the key-value data store
// Key is expected to be the keccak256 hash of value
// Put inserts the keccak256 key into the eth.key_preimages table
func (d *Database) Put(key []byte, value []byte) error {
dsKey, prefix, err := DatastoreKeyFromGethKey(key)
if err != nil {
return err
}
tx, err := d.db.Beginx()
if err != nil {
return err
}
defer func() {
if err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
} else {
err = tx.Commit()
}
}()
if _, err = tx.Exec(putPgStr, dsKey, value); err != nil {
return err
}
_, err = tx.Exec(putPreimagePgStr, key, dsKey, prefix)
return err
}
// Delete satisfies the ethdb.KeyValueWriter interface
// Delete removes the key from the key-value data store
// Delete uses the eth.key_preimages table
func (d *Database) Delete(key []byte) error {
_, err := d.db.Exec(deletePgStr, key)
return err
}
// DatabaseProperty enum type
type DatabaseProperty int
const (
Unknown DatabaseProperty = iota
Size
Idle
InUse
MaxIdleClosed
MaxLifetimeClosed
MaxOpenConnections
OpenConnections
WaitCount
WaitDuration
)
// DatabasePropertyFromString helper function
func DatabasePropertyFromString(property string) (DatabaseProperty, error) {
switch strings.ToLower(property) {
case "size":
return Size, nil
case "idle":
return Idle, nil
case "inuse":
return InUse, nil
case "maxidleclosed":
return MaxIdleClosed, nil
case "maxlifetimeclosed":
return MaxLifetimeClosed, nil
case "maxopenconnections":
return MaxOpenConnections, nil
case "openconnections":
return OpenConnections, nil
case "waitcount":
return WaitCount, nil
case "waitduration":
return WaitDuration, nil
default:
return Unknown, fmt.Errorf("unknown database property")
}
}
// Stat satisfies the ethdb.Stater interface
// Stat returns a particular internal stat of the database
func (d *Database) Stat(property string) (string, error) {
prop, err := DatabasePropertyFromString(property)
if err != nil {
return "", err
}
switch prop {
case Size:
var byteSize string
return byteSize, d.db.Get(&byteSize, dbSizePgStr)
case Idle:
return string(d.db.Stats().Idle), nil
case InUse:
return string(d.db.Stats().InUse), nil
case MaxIdleClosed:
return string(d.db.Stats().MaxIdleClosed), nil
case MaxLifetimeClosed:
return string(d.db.Stats().MaxLifetimeClosed), nil
case MaxOpenConnections:
return string(d.db.Stats().MaxOpenConnections), nil
case OpenConnections:
return string(d.db.Stats().OpenConnections), nil
case WaitCount:
return string(d.db.Stats().WaitCount), nil
case WaitDuration:
return d.db.Stats().WaitDuration.String(), nil
default:
return "", fmt.Errorf("unhandled database property")
}
}
// Compact satisfies the ethdb.Compacter interface
// Compact flattens the underlying data store for the given key range
func (d *Database) Compact(start []byte, limit []byte) error {
return nil
}
// NewBatch satisfies the ethdb.Batcher interface
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called
func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil)
}
// NewIterator creates a binary-alphabetical iterator over the entire keyspace
// contained within the key-value database.
func (d *Database) NewIterator() ethdb.Iterator {
return NewIterator(nil, nil, d.db)
}
// NewIteratorWithStart creates a binary-alphabetical iterator over a subset of
// database content starting at a particular initial key (or after, if it does
// not exist).
func (d *Database) NewIteratorWithStart(start []byte) ethdb.Iterator {
return NewIterator(start, nil, d.db)
}
// NewIteratorWithPrefix creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix.
func (d *Database) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
return NewIterator(nil, prefix, d.db)
}
// Close satisfies the io.Closer interface
// Close closes the db connection
func (d *Database) Close() error {
return d.db.DB.Close()
}

View File

@ -0,0 +1,385 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres_test
import (
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
pgipfsethdb "github.com/ethereum/go-ethereum/ethdb/postgres"
"github.com/ethereum/go-ethereum/rlp"
"github.com/jmoiron/sqlx"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var (
database ethdb.Database
db *sqlx.DB
err error
testHeader = types.Header{Number: big.NewInt(1337)}
testValue, _ = rlp.EncodeToBytes(testHeader)
testKeccakEthKey = testHeader.Hash().Bytes()
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testKeccakEthKey)
testPrefixedEthKey = append(append([]byte("prefix"), pgipfsethdb.KeyDelineation...), testKeccakEthKey...)
testPrefixedDsKey = common.Bytes2Hex(testPrefixedEthKey)
testSuffixedEthKey = append(append(testPrefixedEthKey, pgipfsethdb.KeyDelineation...), []byte("suffix")...)
testSuffixedDsKey = common.Bytes2Hex(testSuffixedEthKey)
testHeaderEthKey = append(append(append(append(pgipfsethdb.HeaderPrefix, pgipfsethdb.KeyDelineation...),
[]byte("number")...), pgipfsethdb.NumberDelineation...), testKeccakEthKey...)
testHeaderDsKey = testMhKey
testPreimageEthKey = append(append(pgipfsethdb.PreimagePrefix, pgipfsethdb.KeyDelineation...), testKeccakEthKey...)
testPreimageDsKey = testMhKey
)
var _ = Describe("Database", func() {
BeforeEach(func() {
db, err = pgipfsethdb.TestDB()
Expect(err).ToNot(HaveOccurred())
database = pgipfsethdb.NewDatabase(db)
})
AfterEach(func() {
err = pgipfsethdb.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("Has - Keccak keys", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testKeccakEthKey, testMhKey)
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Has - Prefixed keys", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testPrefixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testPrefixedDsKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testPrefixedEthKey, testPrefixedDsKey)
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testPrefixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Has - Suffixed keys", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testSuffixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testSuffixedDsKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testSuffixedEthKey, testSuffixedDsKey)
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testSuffixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Has - Header keys", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testHeaderEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testHeaderDsKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testHeaderEthKey, testHeaderDsKey)
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testHeaderEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Has - Preimage keys", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testPreimageEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testPreimageDsKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testPreimageEthKey, testPreimageDsKey)
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testPreimageEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Get - Keccak keys", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testKeccakEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testKeccakEthKey, testMhKey)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Get - Prefixed keys", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testPrefixedEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testPrefixedDsKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testPrefixedEthKey, testPrefixedDsKey)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testPrefixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Get - Suffixed keys", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testSuffixedEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testSuffixedDsKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testSuffixedEthKey, testSuffixedDsKey)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testSuffixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Get - Header keys", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testHeaderEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testHeaderDsKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testHeaderEthKey, testHeaderDsKey)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testHeaderEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Get - Preimage keys", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testPreimageEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testPreimageDsKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testPreimageEthKey, testPreimageDsKey)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testPreimageEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put - Keccak keys", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testKeccakEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testKeccakEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put - Prefixed keys", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testPrefixedEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testPrefixedEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testPrefixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put - Suffixed keys", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testSuffixedEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testSuffixedEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testSuffixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put - Header keys", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testHeaderEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testHeaderEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testHeaderEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put - Preimage keys", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testPreimageEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testPreimageEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testPreimageEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Delete - Keccak keys", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testKeccakEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testKeccakEthKey)
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testKeccakEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("Delete - Prefixed keys", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testPrefixedEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testPrefixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testPrefixedEthKey)
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testPrefixedEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("Delete - Suffixed keys", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testSuffixedEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testSuffixedEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testSuffixedEthKey)
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testSuffixedEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("Delete - Header keys", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testHeaderEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testHeaderEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testHeaderEthKey)
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testHeaderEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("Delete - Preimage keys", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testPreimageEthKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testPreimageEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testPreimageEthKey)
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testPreimageEthKey)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
})

115
ethdb/postgres/iterator.go Normal file
View File

@ -0,0 +1,115 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
const (
initPgStr = `SELECT eth_key, data FROM public.blocks
INNER JOIN eth.key_preimages ON (ipfs_key = key)
WHERE eth_key = $1`
nextPgStr = `SELECT eth_key, data FROM public.blocks
INNER JOIN eth.key_preimages ON (ipfs_key = key)
WHERE eth_key > $1
ORDER BY eth_key LIMIT 1`
nextPgStrWithPrefix = `SELECT eth_key, data FROM public.blocks
INNER JOIN eth.key_preimages ON (ipfs_key = key)
WHERE eth_key > $1 AND prefix = $2
ORDER BY eth_key LIMIT 1`
)
type nextModel struct {
Key []byte `db:"eth_key"`
Value []byte `db:"data"`
}
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
type Iterator struct {
db *sqlx.DB
currentKey, prefix, currentValue []byte
err error
init bool
}
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
return &Iterator{
db: db,
prefix: prefix,
currentKey: start,
init: start != nil,
}
}
// Next satisfies the ethdb.Iterator interface
// Next moves the iterator to the next key/value pair
// It returns whether the iterator is exhausted
func (i *Iterator) Next() bool {
next := new(nextModel)
if i.init {
i.init = false
if err := i.db.Get(next, initPgStr, i.currentKey); err != nil {
i.currentKey, i.currentValue, i.err = nil, nil, err
return false
}
} else if i.prefix != nil {
if err := i.db.Get(next, nextPgStrWithPrefix, i.currentKey, i.prefix); err != nil {
i.currentKey, i.currentValue, i.err = nil, nil, err
return false
}
} else {
if err := i.db.Get(next, nextPgStr, i.currentKey); err != nil {
i.currentKey, i.currentValue, i.err = nil, nil, err
return false
}
}
i.currentKey, i.currentValue, i.err = next.Key, next.Value, nil
return true
}
// Error satisfies the ethdb.Iterator interface
// Error returns any accumulated error
// Exhausting all the key/value pairs is not considered to be an error
func (i *Iterator) Error() error {
return i.err
}
// Key satisfies the ethdb.Iterator interface
// Key returns the key of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Key() []byte {
return i.currentKey
}
// Value satisfies the ethdb.Iterator interface
// Value returns the value of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Value() []byte {
return i.currentValue
}
// Release satisfies the ethdb.Iterator interface
// Release releases associated resources
// Release should always succeed and can be called multiple times without causing error
func (i *Iterator) Release() {
i.db, i.currentKey, i.currentValue, i.err, i.prefix = nil, nil, nil, nil, nil
}

View File

@ -0,0 +1,512 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres_test
import (
"database/sql"
"github.com/ethereum/go-ethereum/ethdb"
pgipfsethdb "github.com/ethereum/go-ethereum/ethdb/postgres"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var (
iterator ethdb.Iterator
testPrefix = []byte("testPrefix")
testEthKey1 = []byte{'\x01'}
testEthKey2 = []byte{'\x01', '\x01'}
testEthKey3 = []byte{'\x01', '\x02'}
testEthKey4 = []byte{'\x01', '\x0e'}
testEthKey5 = []byte{'\x01', '\x02', '\x01'}
testEthKey6 = []byte{'\x01', '\x0e', '\x01'}
prefixedTestEthKey1 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey1...)
prefixedTestEthKey2 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey2...)
prefixedTestEthKey3 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey3...)
prefixedTestEthKey4 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey4...)
prefixedTestEthKey5 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey5...)
prefixedTestEthKey6 = append(append(testPrefix, pgipfsethdb.KeyDelineation...), testEthKey6...)
mockValue1 = []byte{1}
mockValue2 = []byte{2}
mockValue3 = []byte{3}
mockValue4 = []byte{4}
mockValue5 = []byte{5}
mockValue6 = []byte{6}
)
var _ = Describe("Iterator", func() {
BeforeEach(func() {
db, err = pgipfsethdb.TestDB()
Expect(err).ToNot(HaveOccurred())
database = pgipfsethdb.NewDatabase(db)
// non-prefixed entries
err = database.Put(testEthKey1, mockValue1)
Expect(err).ToNot(HaveOccurred())
err = database.Put(testEthKey2, mockValue2)
Expect(err).ToNot(HaveOccurred())
err = database.Put(testEthKey3, mockValue3)
Expect(err).ToNot(HaveOccurred())
err = database.Put(testEthKey4, mockValue4)
Expect(err).ToNot(HaveOccurred())
err = database.Put(testEthKey5, mockValue5)
Expect(err).ToNot(HaveOccurred())
err = database.Put(testEthKey6, mockValue6)
Expect(err).ToNot(HaveOccurred())
// prefixed entries
err = database.Put(prefixedTestEthKey1, mockValue1)
Expect(err).ToNot(HaveOccurred())
err = database.Put(prefixedTestEthKey2, mockValue2)
Expect(err).ToNot(HaveOccurred())
err = database.Put(prefixedTestEthKey3, mockValue3)
Expect(err).ToNot(HaveOccurred())
err = database.Put(prefixedTestEthKey4, mockValue4)
Expect(err).ToNot(HaveOccurred())
err = database.Put(prefixedTestEthKey5, mockValue5)
Expect(err).ToNot(HaveOccurred())
err = database.Put(prefixedTestEthKey6, mockValue6)
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
err = pgipfsethdb.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("NewIterator", func() {
It("iterates over the entire key-set (prefixed or not)", func() {
iterator = database.NewIterator()
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(BeNil())
more := iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey1))
Expect(iterator.Value()).To(Equal(mockValue1))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey1))
Expect(iterator.Value()).To(Equal(mockValue1))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).ToNot(BeTrue())
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(Equal(sql.ErrNoRows))
})
})
Describe("NewIteratorWithPrefix", func() {
It("iterates over all db entries that have the provided prefix", func() {
iterator = database.NewIteratorWithPrefix(testPrefix)
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(BeNil())
more := iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey1))
Expect(iterator.Value()).To(Equal(mockValue1))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).ToNot(BeTrue())
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(Equal(sql.ErrNoRows))
})
It("behaves as no prefix is provided if prefix is nil", func() {
iterator = database.NewIteratorWithPrefix(nil)
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(BeNil())
more := iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey1))
Expect(iterator.Value()).To(Equal(mockValue1))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey1))
Expect(iterator.Value()).To(Equal(mockValue1))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).ToNot(BeTrue())
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(Equal(sql.ErrNoRows))
})
It("considers empty but non-nil []byte a valid prefix, which precludes iteration over any other prefixed keys", func() {
iterator = database.NewIteratorWithPrefix([]byte{})
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(BeNil())
more := iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey1))
Expect(iterator.Value()).To(Equal(mockValue1))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).ToNot(BeTrue())
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(Equal(sql.ErrNoRows))
})
})
Describe("NewIteratorWithStart", func() {
It("iterates over the entire key-set (prefixed or not) starting with at the provided path", func() {
iterator = database.NewIteratorWithStart(testEthKey2)
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Error()).To(BeNil())
more := iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey1))
Expect(iterator.Value()).To(Equal(mockValue1))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).ToNot(BeTrue())
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(Equal(sql.ErrNoRows))
})
It("iterates over the entire key-set (prefixed or not) starting with at the provided path", func() {
iterator = database.NewIteratorWithStart(prefixedTestEthKey3)
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey3))
Expect(iterator.Error()).To(BeNil())
more := iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey3))
Expect(iterator.Value()).To(Equal(mockValue3))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey5))
Expect(iterator.Value()).To(Equal(mockValue5))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey4))
Expect(iterator.Value()).To(Equal(mockValue4))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(prefixedTestEthKey6))
Expect(iterator.Value()).To(Equal(mockValue6))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).ToNot(BeTrue())
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(Equal(sql.ErrNoRows))
})
})
Describe("Release", func() {
It("releases resources associated with the Iterator", func() {
iterator = database.NewIteratorWithStart(testEthKey2)
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Error()).To(BeNil())
more := iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
iterator.Release()
iterator.Release() // check that we don't panic if called multiple times
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(BeNil())
Expect(iterator.Error()).To(BeNil())
Expect(func() { iterator.Next() }).To(Panic()) // check that we panic if we try to use released iterator
// We can still create a new iterator from the same backing db
iterator = database.NewIteratorWithStart(testEthKey2)
Expect(iterator.Value()).To(BeNil())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Error()).To(BeNil())
more = iterator.Next()
Expect(more).To(BeTrue())
Expect(iterator.Key()).To(Equal(testEthKey2))
Expect(iterator.Value()).To(Equal(mockValue2))
Expect(iterator.Error()).To(BeNil())
})
})
})

View File

@ -0,0 +1,59 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import (
"bytes"
"github.com/ethereum/go-ethereum/core/rawdb"
)
type KeyType uint
const (
Invalid KeyType = iota
Static
Keccak
Prefixed
Suffixed
Header
Preimage
)
// ResolveKeyType returns the key type based on the prefix
func ResolveKeyType(key []byte) (KeyType, [][]byte) {
sk := bytes.Split(key, rawdb.KeyDelineation)
switch len(sk) {
case 1:
if len(sk[0]) < 32 {
return Static, sk
}
return Keccak, sk
case 2:
switch prefix := sk[0]; {
case bytes.Equal(prefix, rawdb.HeaderPrefix):
return Header, bytes.Split(sk[1], rawdb.NumberDelineation)
case bytes.Equal(prefix, rawdb.PreimagePrefix):
return Preimage, sk
default:
return Prefixed, sk
}
case 3:
return Suffixed, sk
}
return Invalid, sk
}

View File

@ -0,0 +1,29 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestPGIPFSETHDB(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "PG-IPFS ethdb test")
}

View File

@ -0,0 +1,65 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import "github.com/jmoiron/sqlx"
// TestDB connect to the testing database
// it assumes the database has the IPFS public.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db public.blocks table
func ResetTestDB(db *sqlx.DB) error {
tx, err := db.Beginx()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
tx.Rollback()
panic(p)
} else if err != nil {
tx.Rollback()
} else {
err = tx.Commit()
}
}()
if _, err := tx.Exec("TRUNCATE public.blocks CASCADE"); err != nil {
return err
}
if _, err := tx.Exec("TRUNCATE eth.key_preimages CASCADE"); err != nil {
return err
}
if _, err := tx.Exec("TRUNCATE eth.ancient_headers CASCADE"); err != nil {
return err
}
if _, err := tx.Exec("TRUNCATE eth.ancient_hashes CASCADE"); err != nil {
return err
}
if _, err := tx.Exec("TRUNCATE eth.ancient_bodies CASCADE"); err != nil {
return err
}
if _, err := tx.Exec("TRUNCATE eth.ancient_receipts CASCADE"); err != nil {
return err
}
_, err = tx.Exec("TRUNCATE eth.ancient_tds CASCADE")
return err
}

63
ethdb/postgres/util.go Normal file
View File

@ -0,0 +1,63 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package postgres
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-ds-help"
_ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash"
)
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string
func MultihashKeyFromKeccak256(h []byte) (string, error) {
mh, err := multihash.Encode(h, multihash.KECCAK_256)
if err != nil {
return "", err
}
dbKey := dshelp.MultihashToDsKey(mh)
return blockstore.BlockPrefix.String() + dbKey.String(), nil
}
// DatastoreKeyFromGethKey returns the public.blocks key from the provided geth key
// It also returns the key's prefix, if it has one
func DatastoreKeyFromGethKey(h []byte) (string, []byte, error) {
keyType, keyComponents := ResolveKeyType(h)
switch keyType {
case Keccak:
mhKey, err := MultihashKeyFromKeccak256(h)
return mhKey, nil, err
case Header:
mhKey, err := MultihashKeyFromKeccak256(keyComponents[1])
return mhKey, keyComponents[0], err
case Preimage:
mhKey, err := MultihashKeyFromKeccak256(keyComponents[1])
return mhKey, keyComponents[0], err
case Prefixed, Suffixed:
// This data is not mapped by hash => content by geth, store it using the prefixed/suffixed key directly
// I.e. the public.blocks datastore key == the hex representation of the geth key
// Alternatively, decompose the data and derive the hash
return common.Bytes2Hex(h), keyComponents[0], nil
case Static:
return common.Bytes2Hex(h), nil, nil
default:
return "", nil, fmt.Errorf("invalid formatting of database key: %x", h)
}
}