diff --git a/postgres/batch.go b/postgres/v0/batch.go
similarity index 100%
rename from postgres/batch.go
rename to postgres/v0/batch.go
diff --git a/postgres/batch_test.go b/postgres/v0/batch_test.go
similarity index 100%
rename from postgres/batch_test.go
rename to postgres/v0/batch_test.go
diff --git a/postgres/database.go b/postgres/v0/database.go
similarity index 100%
rename from postgres/database.go
rename to postgres/v0/database.go
diff --git a/postgres/database_test.go b/postgres/v0/database_test.go
similarity index 100%
rename from postgres/database_test.go
rename to postgres/v0/database_test.go
diff --git a/postgres/iterator.go b/postgres/v0/iterator.go
similarity index 100%
rename from postgres/iterator.go
rename to postgres/v0/iterator.go
diff --git a/postgres/postgres_suite_test.go b/postgres/v0/postgres_suite_test.go
similarity index 100%
rename from postgres/postgres_suite_test.go
rename to postgres/v0/postgres_suite_test.go
diff --git a/postgres/util.go b/postgres/v0/util.go
similarity index 100%
rename from postgres/util.go
rename to postgres/v0/util.go
diff --git a/postgres/v1/batch.go b/postgres/v1/batch.go
new file mode 100644
index 0000000..0099567
--- /dev/null
+++ b/postgres/v1/batch.go
@@ -0,0 +1,108 @@
+// VulcanizeDB
+// Copyright © 2020 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb
+
+import (
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/jmoiron/sqlx"
+)
+
+var _ ethdb.Batch = &Batch{}
+
+// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
+type Batch struct {
+ db *sqlx.DB
+ tx *sqlx.Tx
+ valueSize int
+
+ blockNumber *big.Int
+}
+
+// NewBatch returns a ethdb.Batch interface for PG-IPFS
+func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
+ b := &Batch{
+ db: db,
+ tx: tx,
+ blockNumber: blockNumber,
+ }
+ if tx == nil {
+ b.Reset()
+ }
+ return b
+}
+
+// Put satisfies the ethdb.Batch interface
+// Put inserts the given value into the key-value data store
+// Key is expected to be the keccak256 hash of value
+func (b *Batch) Put(key []byte, value []byte) (err error) {
+ mhKey, err := MultihashKeyFromKeccak256(key)
+ if err != nil {
+ return err
+ }
+ if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil {
+ return err
+ }
+ b.valueSize += len(value)
+ return nil
+}
+
+// Delete satisfies the ethdb.Batch interface
+// Delete removes the key from the key-value data store
+func (b *Batch) Delete(key []byte) (err error) {
+ mhKey, err := MultihashKeyFromKeccak256(key)
+ if err != nil {
+ return err
+ }
+ _, err = b.tx.Exec(deletePgStr, mhKey)
+ return err
+}
+
+// ValueSize satisfies the ethdb.Batch interface
+// ValueSize retrieves the amount of data queued up for writing
+// The returned value is the total byte length of all data queued to write
+func (b *Batch) ValueSize() int {
+ return b.valueSize
+}
+
+// Write satisfies the ethdb.Batch interface
+// Write flushes any accumulated data to disk
+func (b *Batch) Write() error {
+ if b.tx == nil {
+ return nil
+ }
+ return b.tx.Commit()
+}
+
+// Replay satisfies the ethdb.Batch interface
+// Replay replays the batch contents
+func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
+ return errNotSupported
+}
+
+// Reset satisfies the ethdb.Batch interface
+// Reset resets the batch for reuse
+// This should be called after every write
+func (b *Batch) Reset() {
+ var err error
+ b.tx, err = b.db.Beginx()
+ if err != nil {
+ panic(err)
+ }
+ b.valueSize = 0
+}
diff --git a/postgres/v1/batch_test.go b/postgres/v1/batch_test.go
new file mode 100644
index 0000000..ba4ca20
--- /dev/null
+++ b/postgres/v1/batch_test.go
@@ -0,0 +1,133 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb_test
+
+import (
+ "math/big"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/mailgun/groupcache/v2"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres"
+)
+
+var (
+ batch ethdb.Batch
+ testHeader2 = types.Header{Number: big.NewInt(2)}
+ testValue2, _ = rlp.EncodeToBytes(testHeader2)
+ testEthKey2 = testHeader2.Hash().Bytes()
+)
+
+var _ = Describe("Batch", func() {
+ BeforeEach(func() {
+ db, err = pgipfsethdb.TestDB()
+ Expect(err).ToNot(HaveOccurred())
+
+ cacheConfig := pgipfsethdb.CacheConfig{
+ Name: "db",
+ Size: 3000000, // 3MB
+ ExpiryDuration: time.Hour,
+ }
+
+ database = pgipfsethdb.NewDatabase(db, cacheConfig)
+
+ databaseWithBlock, ok := database.(*pgipfsethdb.Database)
+ Expect(ok).To(BeTrue())
+ (*databaseWithBlock).BlockNumber = testBlockNumber
+
+ batch = database.NewBatch()
+ })
+ AfterEach(func() {
+ groupcache.DeregisterGroup("db")
+ err = pgipfsethdb.ResetTestDB(db)
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ Describe("Put/Write", func() {
+ It("adds the key-value pair to the batch", func() {
+ _, err = database.Get(testEthKey)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+ _, err = database.Get(testEthKey2)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+
+ err = batch.Put(testEthKey, testValue)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Put(testEthKey2, testValue2)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Write()
+ Expect(err).ToNot(HaveOccurred())
+
+ val, err := database.Get(testEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(val).To(Equal(testValue))
+ val2, err := database.Get(testEthKey2)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(val2).To(Equal(testValue2))
+ })
+ })
+
+ Describe("Delete/Reset/Write", func() {
+ It("deletes the key-value pair in the batch", func() {
+ err = batch.Put(testEthKey, testValue)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Put(testEthKey2, testValue2)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Write()
+ Expect(err).ToNot(HaveOccurred())
+
+ batch.Reset()
+ err = batch.Delete(testEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Delete(testEthKey2)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Write()
+ Expect(err).ToNot(HaveOccurred())
+
+ _, err = database.Get(testEthKey)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+ _, err = database.Get(testEthKey2)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+ })
+ })
+
+ Describe("ValueSize/Reset", func() {
+ It("returns the size of data in the batch queued for write", func() {
+ err = batch.Put(testEthKey, testValue)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Put(testEthKey2, testValue2)
+ Expect(err).ToNot(HaveOccurred())
+ err = batch.Write()
+ Expect(err).ToNot(HaveOccurred())
+
+ size := batch.ValueSize()
+ Expect(size).To(Equal(len(testValue) + len(testValue2)))
+
+ batch.Reset()
+ size = batch.ValueSize()
+ Expect(size).To(Equal(0))
+ })
+ })
+})
diff --git a/postgres/v1/database.go b/postgres/v1/database.go
new file mode 100644
index 0000000..c21da02
--- /dev/null
+++ b/postgres/v1/database.go
@@ -0,0 +1,361 @@
+// VulcanizeDB
+// Copyright © 2020 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "fmt"
+ "math/big"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/jmoiron/sqlx"
+ "github.com/mailgun/groupcache/v2"
+ log "github.com/sirupsen/logrus"
+)
+
+var errNotSupported = errors.New("this operation is not supported")
+
+var (
+ hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1 LIMIT 1)"
+ getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1"
+ putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
+ deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
+ dbSizePgStr = "SELECT pg_database_size(current_database())"
+)
+
+var _ ethdb.Database = &Database{}
+
+// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
+type Database struct {
+ db *sqlx.DB
+ cache *groupcache.Group
+
+ BlockNumber *big.Int
+}
+
+func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
+ return 0, errNotSupported
+}
+
+type CacheConfig struct {
+ Name string
+ Size int
+ ExpiryDuration time.Duration
+}
+
+// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
+func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore {
+ database := Database{db: db}
+ database.InitCache(cacheConfig)
+
+ return &database
+}
+
+// NewDatabase returns a ethdb.Database interface for PG-IPFS
+func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database {
+ database := Database{db: db}
+ database.InitCache(cacheConfig)
+
+ return &database
+}
+
+func (d *Database) InitCache(cacheConfig CacheConfig) {
+ d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc(
+ func(_ context.Context, id string, dest groupcache.Sink) error {
+ val, err := d.dbGet(id)
+
+ if err != nil {
+ return err
+ }
+
+ // Set the value in the groupcache, with expiry
+ if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil {
+ return err
+ }
+
+ return nil
+ },
+ ))
+}
+
+func (d *Database) GetCacheStats() groupcache.Stats {
+ return d.cache.Stats
+}
+
+// Has satisfies the ethdb.KeyValueReader interface
+// Has retrieves if a key is present in the key-value data store
+func (d *Database) Has(key []byte) (bool, error) {
+ mhKey, err := MultihashKeyFromKeccak256(key)
+ if err != nil {
+ return false, err
+ }
+ var exists bool
+ return exists, d.db.Get(&exists, hasPgStr, mhKey)
+}
+
+// Get retrieves the given key if it's present in the key-value data store
+func (d *Database) dbGet(key string) ([]byte, error) {
+ var data []byte
+ err := d.db.Get(&data, getPgStr, key)
+ if err == sql.ErrNoRows {
+ log.Warn("Database miss for key", key)
+ }
+
+ return data, err
+}
+
+// Get satisfies the ethdb.KeyValueReader interface
+// Get retrieves the given key if it's present in the key-value data store
+func (d *Database) Get(key []byte) ([]byte, error) {
+ mhKey, err := MultihashKeyFromKeccak256(key)
+ if err != nil {
+ return nil, err
+ }
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
+ defer cancel()
+
+ var data []byte
+ return data, d.cache.Get(ctx, mhKey, groupcache.AllocatingByteSliceSink(&data))
+}
+
+// Put satisfies the ethdb.KeyValueWriter interface
+// Put inserts the given value into the key-value data store
+// Key is expected to be the keccak256 hash of value
+func (d *Database) Put(key []byte, value []byte) error {
+ mhKey, err := MultihashKeyFromKeccak256(key)
+ if err != nil {
+ return err
+ }
+ _, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64())
+ return err
+}
+
+// Delete satisfies the ethdb.KeyValueWriter interface
+// Delete removes the key from the key-value data store
+func (d *Database) Delete(key []byte) error {
+ mhKey, err := MultihashKeyFromKeccak256(key)
+ if err != nil {
+ return err
+ }
+
+ _, err = d.db.Exec(deletePgStr, mhKey)
+ if err != nil {
+ return err
+ }
+
+ // Remove from cache.
+ ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
+ defer cancel()
+ err = d.cache.Remove(ctx, mhKey)
+
+ return err
+}
+
+// DatabaseProperty enum type
+type DatabaseProperty int
+
+const (
+ Unknown DatabaseProperty = iota
+ Size
+ Idle
+ InUse
+ MaxIdleClosed
+ MaxLifetimeClosed
+ MaxOpenConnections
+ OpenConnections
+ WaitCount
+ WaitDuration
+)
+
+// DatabasePropertyFromString helper function
+func DatabasePropertyFromString(property string) (DatabaseProperty, error) {
+ switch strings.ToLower(property) {
+ case "size":
+ return Size, nil
+ case "idle":
+ return Idle, nil
+ case "inuse":
+ return InUse, nil
+ case "maxidleclosed":
+ return MaxIdleClosed, nil
+ case "maxlifetimeclosed":
+ return MaxLifetimeClosed, nil
+ case "maxopenconnections":
+ return MaxOpenConnections, nil
+ case "openconnections":
+ return OpenConnections, nil
+ case "waitcount":
+ return WaitCount, nil
+ case "waitduration":
+ return WaitDuration, nil
+ default:
+ return Unknown, fmt.Errorf("unknown database property")
+ }
+}
+
+// Stat satisfies the ethdb.Stater interface
+// Stat returns a particular internal stat of the database
+func (d *Database) Stat(property string) (string, error) {
+ prop, err := DatabasePropertyFromString(property)
+ if err != nil {
+ return "", err
+ }
+ switch prop {
+ case Size:
+ var byteSize string
+ return byteSize, d.db.Get(&byteSize, dbSizePgStr)
+ case Idle:
+ return strconv.Itoa(d.db.Stats().Idle), nil
+ case InUse:
+ return strconv.Itoa(d.db.Stats().InUse), nil
+ case MaxIdleClosed:
+ return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil
+ case MaxLifetimeClosed:
+ return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil
+ case MaxOpenConnections:
+ return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil
+ case OpenConnections:
+ return strconv.Itoa(d.db.Stats().OpenConnections), nil
+ case WaitCount:
+ return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil
+ case WaitDuration:
+ return d.db.Stats().WaitDuration.String(), nil
+ default:
+ return "", fmt.Errorf("unhandled database property")
+ }
+}
+
+// Compact satisfies the ethdb.Compacter interface
+// Compact flattens the underlying data store for the given key range
+func (d *Database) Compact(start []byte, limit []byte) error {
+ return errNotSupported
+}
+
+// NewBatch satisfies the ethdb.Batcher interface
+// NewBatch creates a write-only database that buffers changes to its host db
+// until a final write is called
+func (d *Database) NewBatch() ethdb.Batch {
+ return NewBatch(d.db, nil, d.BlockNumber)
+}
+
+// NewBatchWithSize satisfies the ethdb.Batcher interface.
+// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
+func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
+ return NewBatch(d.db, nil, d.BlockNumber)
+}
+
+// NewIterator satisfies the ethdb.Iteratee interface
+// it creates a binary-alphabetical iterator over a subset
+// of database content with a particular key prefix, starting at a particular
+// initial key (or after, if it does not exist).
+//
+// Note: This method assumes that the prefix is NOT part of the start, so there's
+// no need for the caller to prepend the prefix to the start
+func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
+ return NewIterator(start, prefix, d.db)
+}
+
+// Close satisfies the io.Closer interface
+// Close closes the db connection
+func (d *Database) Close() error {
+ return d.db.DB.Close()
+}
+
+// HasAncient satisfies the ethdb.AncientReader interface
+// HasAncient returns an indicator whether the specified data exists in the ancient store
+func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
+ return false, errNotSupported
+}
+
+// Ancient satisfies the ethdb.AncientReader interface
+// Ancient retrieves an ancient binary blob from the append-only immutable files
+func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
+ return nil, errNotSupported
+}
+
+// Ancients satisfies the ethdb.AncientReader interface
+// Ancients returns the ancient item numbers in the ancient store
+func (d *Database) Ancients() (uint64, error) {
+ return 0, errNotSupported
+}
+
+// Tail satisfies the ethdb.AncientReader interface.
+// Tail returns the number of first stored item in the freezer.
+func (d *Database) Tail() (uint64, error) {
+ return 0, errNotSupported
+}
+
+// AncientSize satisfies the ethdb.AncientReader interface
+// AncientSize returns the ancient size of the specified category
+func (d *Database) AncientSize(kind string) (uint64, error) {
+ return 0, errNotSupported
+}
+
+// AncientRange retrieves all the items in a range, starting from the index 'start'.
+// It will return
+// - at most 'count' items,
+// - at least 1 item (even if exceeding the maxBytes), but will otherwise
+// return as many items as fit into maxBytes.
+func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
+ return nil, errNotSupported
+}
+
+// ReadAncients applies the provided AncientReader function
+func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
+ return errNotSupported
+}
+
+// TruncateHead satisfies the ethdb.AncientWriter interface.
+// TruncateHead discards all but the first n ancient data from the ancient store.
+func (d *Database) TruncateHead(n uint64) error {
+ return errNotSupported
+}
+
+// TruncateTail satisfies the ethdb.AncientWriter interface.
+// TruncateTail discards the first n ancient data from the ancient store.
+func (d *Database) TruncateTail(n uint64) error {
+ return errNotSupported
+}
+
+// Sync satisfies the ethdb.AncientWriter interface
+// Sync flushes all in-memory ancient store data to disk
+func (d *Database) Sync() error {
+ return errNotSupported
+}
+
+// MigrateTable satisfies the ethdb.AncientWriter interface.
+// MigrateTable processes and migrates entries of a given table to a new format.
+func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
+ return errNotSupported
+}
+
+// NewSnapshot satisfies the ethdb.Snapshotter interface.
+// NewSnapshot creates a database snapshot based on the current state.
+func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
+ return nil, errNotSupported
+}
+
+// AncientDatadir returns an error as we don't have a backing chain freezer.
+func (d *Database) AncientDatadir() (string, error) {
+ return "", errNotSupported
+}
diff --git a/postgres/v1/database_test.go b/postgres/v1/database_test.go
new file mode 100644
index 0000000..b929561
--- /dev/null
+++ b/postgres/v1/database_test.go
@@ -0,0 +1,127 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb_test
+
+import (
+ "math/big"
+ "time"
+
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/jmoiron/sqlx"
+ "github.com/mailgun/groupcache/v2"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres"
+)
+
+var (
+ database ethdb.Database
+ db *sqlx.DB
+ err error
+ testBlockNumber = big.NewInt(1337)
+ testHeader = types.Header{Number: testBlockNumber}
+ testValue, _ = rlp.EncodeToBytes(testHeader)
+ testEthKey = testHeader.Hash().Bytes()
+ testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
+)
+
+var _ = Describe("Database", func() {
+ BeforeEach(func() {
+ db, err = pgipfsethdb.TestDB()
+ Expect(err).ToNot(HaveOccurred())
+
+ cacheConfig := pgipfsethdb.CacheConfig{
+ Name: "db",
+ Size: 3000000, // 3MB
+ ExpiryDuration: time.Hour,
+ }
+
+ database = pgipfsethdb.NewDatabase(db, cacheConfig)
+
+ databaseWithBlock, ok := database.(*pgipfsethdb.Database)
+ Expect(ok).To(BeTrue())
+ (*databaseWithBlock).BlockNumber = testBlockNumber
+ })
+ AfterEach(func() {
+ groupcache.DeregisterGroup("db")
+ err = pgipfsethdb.ResetTestDB(db)
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ Describe("Has", func() {
+ It("returns false if a key-pair doesn't exist in the db", func() {
+ has, err := database.Has(testEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(has).ToNot(BeTrue())
+ })
+ It("returns true if a key-pair exists in the db", func() {
+ _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
+ Expect(err).ToNot(HaveOccurred())
+ has, err := database.Has(testEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(has).To(BeTrue())
+ })
+ })
+
+ Describe("Get", func() {
+ It("throws an err if the key-pair doesn't exist in the db", func() {
+ _, err = database.Get(testEthKey)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+ })
+ It("returns the value associated with the key, if the pair exists", func() {
+ _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
+ Expect(err).ToNot(HaveOccurred())
+ val, err := database.Get(testEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(val).To(Equal(testValue))
+ })
+ })
+
+ Describe("Put", func() {
+ It("persists the key-value pair in the database", func() {
+ _, err = database.Get(testEthKey)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+
+ err = database.Put(testEthKey, testValue)
+ Expect(err).ToNot(HaveOccurred())
+ val, err := database.Get(testEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(val).To(Equal(testValue))
+ })
+ })
+
+ Describe("Delete", func() {
+ It("removes the key-value pair from the database", func() {
+ err = database.Put(testEthKey, testValue)
+ Expect(err).ToNot(HaveOccurred())
+ val, err := database.Get(testEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(val).To(Equal(testValue))
+
+ err = database.Delete(testEthKey)
+ Expect(err).ToNot(HaveOccurred())
+ _, err = database.Get(testEthKey)
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
+ })
+ })
+})
diff --git a/postgres/v1/iterator.go b/postgres/v1/iterator.go
new file mode 100644
index 0000000..0d3a0a4
--- /dev/null
+++ b/postgres/v1/iterator.go
@@ -0,0 +1,90 @@
+// VulcanizeDB
+// Copyright © 2020 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb
+
+import (
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/jmoiron/sqlx"
+)
+
+var _ ethdb.Iterator = &Iterator{}
+
+// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
+// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync),
+// rawdb.InspectDatabase, and the new core/state/snapshot features.
+// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
+// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
+type Iterator struct {
+ db *sqlx.DB
+ currentKey, prefix []byte
+ err error
+}
+
+// NewIterator returns an ethdb.Iterator interface for PG-IPFS
+func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
+ return &Iterator{
+ db: db,
+ prefix: prefix,
+ currentKey: start,
+ }
+}
+
+// Next satisfies the ethdb.Iterator interface
+// Next moves the iterator to the next key/value pair
+// It returns whether the iterator is exhausted
+func (i *Iterator) Next() bool {
+ // this is complicated by the ipfs db keys not being the keccak256 hashes
+ // go-ethereum usage of this method expects the iteration to occur over keccak256 keys
+ panic("implement me: Next")
+}
+
+// Error satisfies the ethdb.Iterator interface
+// Error returns any accumulated error
+// Exhausting all the key/value pairs is not considered to be an error
+func (i *Iterator) Error() error {
+ return i.err
+}
+
+// Key satisfies the ethdb.Iterator interface
+// Key returns the key of the current key/value pair, or nil if done
+// The caller should not modify the contents of the returned slice
+// and its contents may change on the next call to Next
+func (i *Iterator) Key() []byte {
+ return i.currentKey
+}
+
+// Value satisfies the ethdb.Iterator interface
+// Value returns the value of the current key/value pair, or nil if done
+// The caller should not modify the contents of the returned slice
+// and its contents may change on the next call to Next
+func (i *Iterator) Value() []byte {
+ mhKey, err := MultihashKeyFromKeccak256(i.currentKey)
+ if err != nil {
+ i.err = err
+ return nil
+ }
+ var data []byte
+ i.err = i.db.Get(&data, getPgStr, mhKey)
+ return data
+}
+
+// Release satisfies the ethdb.Iterator interface
+// Release releases associated resources
+// Release should always succeed and can be called multiple times without causing error
+func (i *Iterator) Release() {
+ i.db.Close()
+}
diff --git a/postgres/v1/postgres_suite_test.go b/postgres/v1/postgres_suite_test.go
new file mode 100644
index 0000000..fa1f767
--- /dev/null
+++ b/postgres/v1/postgres_suite_test.go
@@ -0,0 +1,29 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb_test
+
+import (
+ "testing"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestPGIPFSETHDB(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "PG-IPFS ethdb test")
+}
diff --git a/postgres/v1/util.go b/postgres/v1/util.go
new file mode 100644
index 0000000..eedd8fd
--- /dev/null
+++ b/postgres/v1/util.go
@@ -0,0 +1,49 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package pgipfsethdb
+
+import (
+ blockstore "github.com/ipfs/go-ipfs-blockstore"
+ dshelp "github.com/ipfs/go-ipfs-ds-help"
+ "github.com/jmoiron/sqlx"
+ _ "github.com/lib/pq" //postgres driver
+ "github.com/multiformats/go-multihash"
+)
+
+// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string
+func MultihashKeyFromKeccak256(h []byte) (string, error) {
+ mh, err := multihash.Encode(h, multihash.KECCAK_256)
+ if err != nil {
+ return "", err
+ }
+ dbKey := dshelp.MultihashToDsKey(mh)
+ return blockstore.BlockPrefix.String() + dbKey.String(), nil
+}
+
+// TestDB connect to the testing database
+// it assumes the database has the IPFS public.blocks table present
+// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
+func TestDB() (*sqlx.DB, error) {
+ connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
+ return sqlx.Connect("postgres", connectStr)
+}
+
+// ResetTestDB drops all rows in the test db public.blocks table
+func ResetTestDB(db *sqlx.DB) error {
+ _, err := db.Exec("TRUNCATE public.blocks CASCADE")
+ return err
+}