diff --git a/postgres/ancient.go b/postgres/ancient.go
new file mode 100644
index 0000000..e6c4454
--- /dev/null
+++ b/postgres/ancient.go
@@ -0,0 +1,238 @@
+// VulcanizeDB
+// Copyright © 2020 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb
+
+import (
+ "database/sql"
+ "fmt"
+
+ "github.com/sirupsen/logrus"
+)
+
+const (
+ // FreezerHeaderTable indicates the name of the freezer header table.
+ FreezerHeaderTable = "headers"
+
+ // FreezerHashTable indicates the name of the freezer canonical hash table.
+ FreezerHashTable = "hashes"
+
+ // FreezerBodiesTable indicates the name of the freezer block body table.
+ FreezerBodiesTable = "bodies"
+
+ // FreezerReceiptTable indicates the name of the freezer receipts table.
+ FreezerReceiptTable = "receipts"
+
+ // FreezerDifficultyTable indicates the name of the freezer total difficulty table.
+ FreezerDifficultyTable = "diffs"
+
+ // ancient append Postgres statements
+ appendAncientHeaderPgStr = "INSERT INTO eth.ancient_headers (block_number, header) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET header = $2"
+ appendAncientHashPgStr = "INSERT INTO eth.ancient_hashes (block_number, hash) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET hash = $2"
+ appendAncientBodyPgStr = "INSERT INTO eth.ancient_bodies (block_number, body) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET body = $2"
+ appendAncientReceiptsPgStr = "INSERT INTO eth.ancient_receipts (block_number, receipts) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET receipts = $2"
+ appendAncientTDPgStr = "INSERT INTO eth.ancient_tds (block_number, td) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET td = $2"
+
+ // ancient truncate Postgres statements
+ truncateAncientHeaderPgStr = "DELETE FROM eth.ancient_headers WHERE block_number > $1"
+ truncateAncientHashPgStr = "DELETE FROM eth.ancient_hashes WHERE block_number > $1"
+ truncateAncientBodiesPgStr = "DELETE FROM eth.ancient_bodies WHERE block_number > $1"
+ truncateAncientReceiptsPgStr = "DELETE FROM eth.ancient_receipts WHERE block_number > $1"
+ truncateAncientTDPgStr = "DELETE FROM eth.ancient_tds WHERE block_number > $1"
+
+ // ancient size Postgres statement
+ ancientSizePgStr = "SELECT pg_total_relation_size($1)"
+
+ // ancients Postgres statement
+ ancientsPgStr = "SELECT block_number FROM eth.ancient_headers ORDER BY block_number DESC LIMIT 1"
+
+ // ancient has Postgres statements
+ hasAncientHeaderPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_headers WHERE block_number = $1)"
+ hasAncientHashPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_hashes WHERE block_number = $1)"
+ hasAncientBodyPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_bodies WHERE block_number = $1)"
+ hasAncientReceiptsPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_receipts WHERE block_number = $1)"
+ hasAncientTDPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_tds WHERE block_number = $1)"
+
+ // ancient get Postgres statements
+ getAncientHeaderPgStr = "SELECT header FROM eth.ancient_headers WHERE block_number = $1"
+ getAncientHashPgStr = "SELECT hash FROM eth.ancient_hashes WHERE block_number = $1"
+ getAncientBodyPgStr = "SELECT body FROM eth.ancient_bodies WHERE block_number = $1"
+ getAncientReceiptsPgStr = "SELECT receipts FROM eth.ancient_receipts WHERE block_number = $1"
+ getAncientTDPgStr = "SELECT td FROM eth.ancient_tds WHERE block_number = $1"
+)
+
+// HasAncient satisfies the ethdb.AncientReader interface
+// HasAncient returns an indicator whether the specified data exists in the ancient store
+func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
+ var pgStr string
+ switch kind {
+ case FreezerHeaderTable:
+ pgStr = hasAncientHeaderPgStr
+ case FreezerHashTable:
+ pgStr = hasAncientHashPgStr
+ case FreezerBodiesTable:
+ pgStr = hasAncientBodyPgStr
+ case FreezerReceiptTable:
+ pgStr = hasAncientReceiptsPgStr
+ case FreezerDifficultyTable:
+ pgStr = hasAncientTDPgStr
+ default:
+ return false, fmt.Errorf("unexpected ancient kind: %s", kind)
+ }
+ has := new(bool)
+ return *has, d.db.Get(has, pgStr, number)
+}
+
+// Ancient satisfies the ethdb.AncientReader interface
+// Ancient retrieves an ancient binary blob from the append-only immutable files
+func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
+ var pgStr string
+ switch kind {
+ case FreezerHeaderTable:
+ pgStr = getAncientHeaderPgStr
+ case FreezerHashTable:
+ pgStr = getAncientHashPgStr
+ case FreezerBodiesTable:
+ pgStr = getAncientBodyPgStr
+ case FreezerReceiptTable:
+ pgStr = getAncientReceiptsPgStr
+ case FreezerDifficultyTable:
+ pgStr = getAncientTDPgStr
+ default:
+ return nil, fmt.Errorf("unexpected ancient kind: %s", kind)
+ }
+ data := new([]byte)
+ return *data, d.db.Get(data, pgStr, number)
+}
+
+// Ancients satisfies the ethdb.AncientReader interface
+// Ancients returns the ancient item numbers in the ancient store
+func (d *Database) Ancients() (uint64, error) {
+ num := new(uint64)
+ if err := d.db.Get(num, ancientsPgStr); err != nil {
+ if err == sql.ErrNoRows {
+ return 0, nil
+ }
+ return 0, err
+ }
+ return *num, nil
+}
+
+// AncientSize satisfies the ethdb.AncientReader interface
+// AncientSize returns the ancient size of the specified category
+func (d *Database) AncientSize(kind string) (uint64, error) {
+ var tableName string
+ switch kind {
+ case FreezerHeaderTable:
+ tableName = "eth.ancient_headers"
+ case FreezerHashTable:
+ tableName = "eth.ancient_hashes"
+ case FreezerBodiesTable:
+ tableName = "eth.ancient_bodies"
+ case FreezerReceiptTable:
+ tableName = "eth.ancient_receipts"
+ case FreezerDifficultyTable:
+ tableName = "eth.ancient_tds"
+ default:
+ return 0, fmt.Errorf("unexpected ancient kind: %s", kind)
+ }
+ size := new(uint64)
+ return *size, d.db.Get(size, ancientSizePgStr, tableName)
+}
+
+// AppendAncient satisfies the ethdb.AncientWriter interface
+// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
+func (d *Database) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
+ // append in batch
+ var err error
+ if d.ancientTx == nil {
+ d.ancientTx, err = d.db.Beginx()
+ if err != nil {
+ return err
+ }
+ }
+ defer func() {
+ if err != nil {
+ if err := d.ancientTx.Rollback(); err != nil {
+ logrus.Error(err)
+ d.ancientTx = nil
+ }
+ }
+ }()
+
+ if _, err := d.ancientTx.Exec(appendAncientHashPgStr, number, hash); err != nil {
+ return err
+ }
+ if _, err := d.ancientTx.Exec(appendAncientHeaderPgStr, number, header); err != nil {
+ return err
+ }
+ if _, err := d.ancientTx.Exec(appendAncientBodyPgStr, number, body); err != nil {
+ return err
+ }
+ if _, err := d.ancientTx.Exec(appendAncientReceiptsPgStr, number, receipts); err != nil {
+ return err
+ }
+ _, err = d.ancientTx.Exec(appendAncientTDPgStr, number, td)
+ return err
+}
+
+// TruncateAncients satisfies the ethdb.AncientWriter interface
+// TruncateAncients discards all but the first n ancient data from the ancient store
+func (d *Database) TruncateAncients(n uint64) error {
+ // truncate in batch
+ var err error
+ if d.ancientTx == nil {
+ d.ancientTx, err = d.db.Beginx()
+ if err != nil {
+ return err
+ }
+ }
+ defer func() {
+ if err != nil {
+ if err := d.ancientTx.Rollback(); err != nil {
+ logrus.Error(err)
+ d.ancientTx = nil
+ }
+ }
+ }()
+ if _, err := d.ancientTx.Exec(truncateAncientHeaderPgStr, n); err != nil {
+ return err
+ }
+ if _, err := d.ancientTx.Exec(truncateAncientHashPgStr, n); err != nil {
+ return err
+ }
+ if _, err := d.ancientTx.Exec(truncateAncientBodiesPgStr, n); err != nil {
+ return err
+ }
+ if _, err := d.ancientTx.Exec(truncateAncientReceiptsPgStr, n); err != nil {
+ return err
+ }
+ _, err = d.ancientTx.Exec(truncateAncientTDPgStr, n)
+ return err
+}
+
+// Sync satisfies the ethdb.AncientWriter interface
+// Sync flushes all in-memory ancient store data to disk
+func (d *Database) Sync() error {
+ if d.ancientTx == nil {
+ return nil
+ }
+ if err := d.ancientTx.Commit(); err != nil {
+ return err
+ }
+ d.ancientTx = nil
+ return nil
+}
diff --git a/postgres/ancient_test.go b/postgres/ancient_test.go
new file mode 100644
index 0000000..8cd85ee
--- /dev/null
+++ b/postgres/ancient_test.go
@@ -0,0 +1,232 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb_test
+
+import (
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/rlp"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/vulcanize/ipfs-ethdb/postgres"
+)
+
+var (
+ ancientDB ethdb.Database
+ testBlockNumber uint64 = 1
+ testAncientHeader = types.Header{Number: big.NewInt(2)}
+ testAncientHeaderRLP, _ = rlp.EncodeToBytes(testHeader2)
+ testAncientHash = testAncientHeader.Hash().Bytes()
+ testAncientBodyBytes = make([]byte, 10000)
+ testAncientReceiptsBytes = make([]byte, 5000)
+ testAncientTD, _ = new(big.Int).SetString("1000000000000000000000", 10)
+ testAncientTDBytes = testAncientTD.Bytes()
+)
+
+var _ = Describe("Ancient", func() {
+ BeforeEach(func() {
+ db, err = pgipfsethdb.TestDB()
+ Expect(err).ToNot(HaveOccurred())
+ ancientDB = pgipfsethdb.NewDatabase(db)
+
+ })
+ AfterEach(func() {
+ err = pgipfsethdb.ResetTestDB(db)
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ Describe("AppendAncient/Sync/Has", func() {
+ It("adds eth objects to the Ancient database and returns whether or not an ancient record exists", func() {
+ hasAncient(testBlockNumber, false)
+
+ err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
+ Expect(err).ToNot(HaveOccurred())
+
+ hasAncient(testBlockNumber, false)
+
+ err = ancientDB.Sync()
+ Expect(err).ToNot(HaveOccurred())
+
+ hasAncient(testBlockNumber, true)
+ })
+ })
+
+ Describe("AppendAncient/Sync/Ancient", func() {
+ It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() {
+ hasAncient(testBlockNumber, false)
+
+ _, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber)
+ Expect(err).To(HaveOccurred())
+ _, err = ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber)
+ Expect(err).To(HaveOccurred())
+ _, err = ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber)
+ Expect(err).To(HaveOccurred())
+ _, err = ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber)
+ Expect(err).To(HaveOccurred())
+ _, err = ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber)
+ Expect(err).To(HaveOccurred())
+
+ err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
+ Expect(err).ToNot(HaveOccurred())
+ err = ancientDB.Sync()
+ Expect(err).ToNot(HaveOccurred())
+
+ hasAncient(testBlockNumber, true)
+
+ ancientHeader, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientHeader).To(Equal(testAncientHeaderRLP))
+
+ ancientHash, err := ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientHash).To(Equal(testAncientHash))
+
+ ancientBody, err := ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientBody).To(Equal(testAncientBodyBytes))
+
+ ancientReceipts, err := ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientReceipts).To(Equal(testAncientReceiptsBytes))
+
+ ancientTD, err := ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientTD).To(Equal(testAncientTDBytes))
+ })
+ })
+
+ Describe("AppendAncient/Sync/Ancients", func() {
+ It("returns the height of the ancient database", func() {
+ ancients, err := ancientDB.Ancients()
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancients).To(Equal(uint64(0)))
+
+ for i := uint64(0); i <= 100; i++ {
+ hasAncient(i, false)
+ err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
+ Expect(err).ToNot(HaveOccurred())
+ }
+
+ err = ancientDB.Sync()
+ Expect(err).ToNot(HaveOccurred())
+
+ for i := uint64(0); i <= 100; i++ {
+ hasAncient(i, true)
+ }
+ ancients, err = ancientDB.Ancients()
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancients).To(Equal(uint64(100)))
+ })
+ })
+
+ Describe("AppendAncient/Truncate/Sync", func() {
+ It("truncates the ancient database to the provided height", func() {
+ for i := uint64(0); i <= 100; i++ {
+ hasAncient(i, false)
+ err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
+ Expect(err).ToNot(HaveOccurred())
+ }
+
+ err = ancientDB.Sync()
+ Expect(err).ToNot(HaveOccurred())
+
+ err = ancientDB.TruncateAncients(50)
+ Expect(err).ToNot(HaveOccurred())
+
+ for i := uint64(0); i <= 100; i++ {
+ hasAncient(i, true)
+ }
+
+ ancients, err := ancientDB.Ancients()
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancients).To(Equal(uint64(100)))
+
+ err = ancientDB.Sync()
+ Expect(err).ToNot(HaveOccurred())
+
+ for i := uint64(0); i <= 100; i++ {
+ if i <= 50 {
+ hasAncient(i, true)
+ } else {
+ hasAncient(i, false)
+ }
+ }
+
+ ancients, err = ancientDB.Ancients()
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancients).To(Equal(uint64(50)))
+ })
+ })
+
+ Describe("AppendAncient/Sync/AncientSize", func() {
+ It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() {
+ for i := uint64(0); i <= 100; i++ {
+ hasAncient(i, false)
+ err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
+ Expect(err).ToNot(HaveOccurred())
+ }
+
+ err = ancientDB.Sync()
+ Expect(err).ToNot(HaveOccurred())
+
+ for i := uint64(0); i <= 100; i++ {
+ hasAncient(i, true)
+ }
+
+ ancientHeaderSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHeaderTable)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientHeaderSize).To(Equal(uint64(106496)))
+
+ ancientHashSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHashTable)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientHashSize).To(Equal(uint64(32768)))
+
+ ancientBodySize, err := ancientDB.AncientSize(pgipfsethdb.FreezerBodiesTable)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientBodySize).To(Equal(uint64(73728)))
+
+ ancientReceiptsSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerReceiptTable)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientReceiptsSize).To(Equal(uint64(65536)))
+
+ ancientTDSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerDifficultyTable)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ancientTDSize).To(Equal(uint64(32768)))
+ })
+ })
+})
+
+func hasAncient(blockNumber uint64, shouldHave bool) {
+ has, err := ancientDB.HasAncient(pgipfsethdb.FreezerHeaderTable, blockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(has).To(Equal(shouldHave))
+ has, err = ancientDB.HasAncient(pgipfsethdb.FreezerHashTable, blockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(has).To(Equal(shouldHave))
+ has, err = ancientDB.HasAncient(pgipfsethdb.FreezerBodiesTable, blockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(has).To(Equal(shouldHave))
+ has, err = ancientDB.HasAncient(pgipfsethdb.FreezerReceiptTable, blockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(has).To(Equal(shouldHave))
+ has, err = ancientDB.HasAncient(pgipfsethdb.FreezerDifficultyTable, blockNumber)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(has).To(Equal(shouldHave))
+}
diff --git a/postgres/batch.go b/postgres/batch.go
index 8e38292..96b9b32 100644
--- a/postgres/batch.go
+++ b/postgres/batch.go
@@ -17,22 +17,25 @@
package pgipfsethdb
import (
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
type Batch struct {
- db *sqlx.DB
- tx *sqlx.Tx
- valueSize int
+ db *sqlx.DB
+ tx *sqlx.Tx
+ valueSize int
+ replayCache map[string][]byte
}
// NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
b := &Batch{
- db: db,
- tx: tx,
+ db: db,
+ tx: tx,
+ replayCache: make(map[string][]byte),
}
if tx == nil {
b.Reset()
@@ -55,6 +58,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
return err
}
b.valueSize += len(value)
+ b.replayCache[common.Bytes2Hex(key)] = value
return nil
}
@@ -62,7 +66,11 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
// Delete removes the key from the key-value data store
func (b *Batch) Delete(key []byte) (err error) {
_, err = b.tx.Exec(deletePgStr, key)
- return err
+ if err != nil {
+ return err
+ }
+ delete(b.replayCache, common.Bytes2Hex(key))
+ return nil
}
// ValueSize satisfies the ethdb.Batch interface
@@ -79,13 +87,27 @@ func (b *Batch) Write() error {
if b.tx == nil {
return nil
}
- return b.tx.Commit()
+ if err := b.tx.Commit(); err != nil {
+ return err
+ }
+ b.replayCache = nil
+ return nil
}
// Replay satisfies the ethdb.Batch interface
// Replay replays the batch contents
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
- return errNotSupported
+ if b.tx != nil {
+ b.tx.Rollback()
+ b.tx = nil
+ }
+ for key, value := range b.replayCache {
+ if err := w.Put(common.Hex2Bytes(key), value); err != nil {
+ return err
+ }
+ }
+ b.replayCache = nil
+ return nil
}
// Reset satisfies the ethdb.Batch interface
@@ -97,5 +119,6 @@ func (b *Batch) Reset() {
if err != nil {
panic(err)
}
+ b.replayCache = make(map[string][]byte)
b.valueSize = 0
}
diff --git a/postgres/batch_test.go b/postgres/batch_test.go
index 5d89168..f941be6 100644
--- a/postgres/batch_test.go
+++ b/postgres/batch_test.go
@@ -115,4 +115,30 @@ var _ = Describe("Batch", func() {
Expect(size).To(Equal(0))
})
})
+
+ Describe("Replay", func() {
+ It("returns the size of data in the batch queued for write", func() {
+ err = batch.Put(testKeccakEthKey, testValue)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Put(testKeccakEthKey2, testValue2)
+ Expect(err).ToNot(HaveOccurred())
+
+ _, err = database.Get(testKeccakEthKey)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+ _, err = database.Get(testKeccakEthKey2)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+
+ err = batch.Replay(database)
+ Expect(err).ToNot(HaveOccurred())
+
+ val, err := database.Get(testKeccakEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(val).To(Equal(testValue))
+ val2, err := database.Get(testKeccakEthKey2)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(val2).To(Equal(testValue2))
+ })
+ })
})
diff --git a/postgres/database.go b/postgres/database.go
index 1e5a958..a1253f8 100644
--- a/postgres/database.go
+++ b/postgres/database.go
@@ -17,7 +17,6 @@
package pgipfsethdb
import (
- "errors"
"fmt"
"strings"
@@ -27,10 +26,8 @@ import (
"github.com/jmoiron/sqlx"
)
-var errNotSupported = errors.New("this operation is not supported")
-
const (
- hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)"
+ hasPgStr = "SELECT exists(SELECT 1 FROM eth.key_preimages WHERE eth_key = $1)"
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)"
@@ -40,7 +37,8 @@ const (
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
type Database struct {
- db *sqlx.DB
+ db *sqlx.DB
+ ancientTx *sqlx.Tx
}
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
@@ -187,7 +185,8 @@ func (d *Database) Stat(property string) (string, error) {
// Compact satisfies the ethdb.Compacter interface
// Compact flattens the underlying data store for the given key range
func (d *Database) Compact(start []byte, limit []byte) error {
- return errNotSupported
+ // this leveldb functionality doesn't translate to Postgres
+ return nil
}
// NewBatch satisfies the ethdb.Batcher interface
@@ -221,45 +220,3 @@ func (d *Database) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
func (d *Database) Close() error {
return d.db.DB.Close()
}
-
-// HasAncient satisfies the ethdb.AncientReader interface
-// HasAncient returns an indicator whether the specified data exists in the ancient store
-func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
- return false, errNotSupported
-}
-
-// Ancient satisfies the ethdb.AncientReader interface
-// Ancient retrieves an ancient binary blob from the append-only immutable files
-func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
- return nil, errNotSupported
-}
-
-// Ancients satisfies the ethdb.AncientReader interface
-// Ancients returns the ancient item numbers in the ancient store
-func (d *Database) Ancients() (uint64, error) {
- return 0, errNotSupported
-}
-
-// AncientSize satisfies the ethdb.AncientReader interface
-// AncientSize returns the ancient size of the specified category
-func (d *Database) AncientSize(kind string) (uint64, error) {
- return 0, errNotSupported
-}
-
-// AppendAncient satisfies the ethdb.AncientWriter interface
-// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
-func (d *Database) AppendAncient(number uint64, hash, header, body, receipt, td []byte) error {
- return errNotSupported
-}
-
-// TruncateAncients satisfies the ethdb.AncientWriter interface
-// TruncateAncients discards all but the first n ancient data from the ancient store
-func (d *Database) TruncateAncients(n uint64) error {
- return errNotSupported
-}
-
-// Sync satisfies the ethdb.AncientWriter interface
-// Sync flushes all in-memory ancient store data to disk
-func (d *Database) Sync() error {
- return errNotSupported
-}
diff --git a/postgres/db/migrations/00004_create_eth_ancient_headers_table.sql b/postgres/db/migrations/00004_create_eth_ancient_headers_table.sql
new file mode 100644
index 0000000..832209d
--- /dev/null
+++ b/postgres/db/migrations/00004_create_eth_ancient_headers_table.sql
@@ -0,0 +1,8 @@
+-- +goose Up
+CREATE TABLE IF NOT EXISTS eth.ancient_headers (
+ block_number BIGINT UNIQUE NOT NULL,
+ header BYTEA NOT NULL
+);
+
+-- +goose Down
+DROP TABLE eth.ancient_headers;
\ No newline at end of file
diff --git a/postgres/db/migrations/00005_create_eth_ancient_hashes_table.sql b/postgres/db/migrations/00005_create_eth_ancient_hashes_table.sql
new file mode 100644
index 0000000..b5ec597
--- /dev/null
+++ b/postgres/db/migrations/00005_create_eth_ancient_hashes_table.sql
@@ -0,0 +1,8 @@
+-- +goose Up
+CREATE TABLE IF NOT EXISTS eth.ancient_hashes (
+ block_number BIGINT UNIQUE NOT NULL,
+ hash BYTEA NOT NULL
+);
+
+-- +goose Down
+DROP TABLE eth.ancient_hashes;
\ No newline at end of file
diff --git a/postgres/db/migrations/00006_create_eth_ancient_bodies_table.sql b/postgres/db/migrations/00006_create_eth_ancient_bodies_table.sql
new file mode 100644
index 0000000..2de60cd
--- /dev/null
+++ b/postgres/db/migrations/00006_create_eth_ancient_bodies_table.sql
@@ -0,0 +1,8 @@
+-- +goose Up
+CREATE TABLE IF NOT EXISTS eth.ancient_bodies (
+ block_number BIGINT UNIQUE NOT NULL,
+ body BYTEA NOT NULL
+);
+
+-- +goose Down
+DROP TABLE eth.ancient_bodies;
\ No newline at end of file
diff --git a/postgres/db/migrations/00007_create_eth_ancient_receipts_table.sql b/postgres/db/migrations/00007_create_eth_ancient_receipts_table.sql
new file mode 100644
index 0000000..beec1e7
--- /dev/null
+++ b/postgres/db/migrations/00007_create_eth_ancient_receipts_table.sql
@@ -0,0 +1,8 @@
+-- +goose Up
+CREATE TABLE IF NOT EXISTS eth.ancient_receipts (
+ block_number BIGINT UNIQUE NOT NULL,
+ receipts BYTEA NOT NULL
+);
+
+-- +goose Down
+DROP TABLE eth.ancient_receipts;
\ No newline at end of file
diff --git a/postgres/db/migrations/00008_create_eth_ancient_tds_table.sql b/postgres/db/migrations/00008_create_eth_ancient_tds_table.sql
new file mode 100644
index 0000000..724ecf4
--- /dev/null
+++ b/postgres/db/migrations/00008_create_eth_ancient_tds_table.sql
@@ -0,0 +1,8 @@
+-- +goose Up
+CREATE TABLE IF NOT EXISTS eth.ancient_tds (
+ block_number BIGINT UNIQUE NOT NULL,
+ td BYTEA NOT NULL
+);
+
+-- +goose Down
+DROP TABLE eth.ancient_tds;
\ No newline at end of file
diff --git a/postgres/test_helpers.go b/postgres/test_helpers.go
index 7eb4260..e62f787 100644
--- a/postgres/test_helpers.go
+++ b/postgres/test_helpers.go
@@ -16,7 +16,9 @@
package pgipfsethdb
-import "github.com/jmoiron/sqlx"
+import (
+ "github.com/jmoiron/sqlx"
+)
// TestDB connect to the testing database
// it assumes the database has the IPFS public.blocks table present
@@ -28,6 +30,38 @@ func TestDB() (*sqlx.DB, error) {
// ResetTestDB drops all rows in the test db public.blocks table
func ResetTestDB(db *sqlx.DB) error {
- _, err := db.Exec("TRUNCATE public.blocks CASCADE")
+ tx, err := db.Beginx()
+ if err != nil {
+ return err
+ }
+ defer func() {
+ if p := recover(); p != nil {
+ tx.Rollback()
+ panic(p)
+ } else if err != nil {
+ tx.Rollback()
+ } else {
+ err = tx.Commit()
+ }
+ }()
+ if _, err := tx.Exec("TRUNCATE public.blocks CASCADE"); err != nil {
+ return err
+ }
+ if _, err := tx.Exec("TRUNCATE eth.key_preimages CASCADE"); err != nil {
+ return err
+ }
+ if _, err := tx.Exec("TRUNCATE eth.ancient_headers CASCADE"); err != nil {
+ return err
+ }
+ if _, err := tx.Exec("TRUNCATE eth.ancient_hashes CASCADE"); err != nil {
+ return err
+ }
+ if _, err := tx.Exec("TRUNCATE eth.ancient_bodies CASCADE"); err != nil {
+ return err
+ }
+ if _, err := tx.Exec("TRUNCATE eth.ancient_receipts CASCADE"); err != nil {
+ return err
+ }
+ _, err = tx.Exec("TRUNCATE eth.ancient_tds CASCADE")
return err
}