diff --git a/go.mod b/go.mod index d34e42e..fc6d1a0 100644 --- a/go.mod +++ b/go.mod @@ -16,4 +16,5 @@ require ( github.com/multiformats/go-multihash v0.0.13 github.com/onsi/ginkgo v1.8.0 github.com/onsi/gomega v1.5.0 + github.com/sirupsen/logrus v1.6.0 ) diff --git a/postgres/batch.go b/postgres/batch.go index d023868..74a883d 100644 --- a/postgres/batch.go +++ b/postgres/batch.go @@ -51,6 +51,9 @@ func (b *Batch) Put(key []byte, value []byte) (err error) { if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil { return err } + if _, err = b.tx.Exec(putPreimagePgStr, key, mhKey); err != nil { + return err + } b.valueSize += len(value) return nil } @@ -58,11 +61,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) { // Delete satisfies the ethdb.Batch interface // Delete removes the key from the key-value data store func (b *Batch) Delete(key []byte) (err error) { - mhKey, err := MultihashKeyFromKeccak256(key) - if err != nil { - return err - } - _, err = b.tx.Exec(deletePgStr, mhKey) + _, err = b.tx.Exec(deletePgStr, key) return err } diff --git a/postgres/database.go b/postgres/database.go index 6c2dc03..aa2b1f1 100644 --- a/postgres/database.go +++ b/postgres/database.go @@ -21,6 +21,8 @@ import ( "fmt" "strings" + "github.com/sirupsen/logrus" + "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" ) @@ -28,11 +30,12 @@ import ( var errNotSupported = errors.New("this operation is not supported") var ( - hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)" - getPgStr = "SELECT data FROM public.blocks WHERE key = $1" - putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING" - deletePgStr = "DELETE FROM public.blocks WHERE key = $1" - dbSizePgStr = "SELECT pg_database_size(current_database())" + hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)" + getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1" + putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING" + putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2) ON CONFLICT (eth_key) DO NOTHING" + deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1" + dbSizePgStr = "SELECT pg_database_size(current_database())" ) // Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection @@ -56,46 +59,54 @@ func NewDatabase(db *sqlx.DB) ethdb.Database { // Has satisfies the ethdb.KeyValueReader interface // Has retrieves if a key is present in the key-value data store +// Has uses the eth.key_preimages table func (d *Database) Has(key []byte) (bool, error) { - mhKey, err := MultihashKeyFromKeccak256(key) - if err != nil { - return false, err - } var exists bool - return exists, d.db.Get(&exists, hasPgStr, mhKey) + return exists, d.db.Get(&exists, hasPgStr, key) } // Get satisfies the ethdb.KeyValueReader interface // Get retrieves the given key if it's present in the key-value data store +// Get uses the eth.key_preimages table func (d *Database) Get(key []byte) ([]byte, error) { - mhKey, err := MultihashKeyFromKeccak256(key) - if err != nil { - return nil, err - } var data []byte - return data, d.db.Get(&data, getPgStr, mhKey) + return data, d.db.Get(&data, getPgStr, key) } // Put satisfies the ethdb.KeyValueWriter interface // Put inserts the given value into the key-value data store // Key is expected to be the keccak256 hash of value +// Put inserts the keccak256 key into the eth.key_preimages table func (d *Database) Put(key []byte, value []byte) error { mhKey, err := MultihashKeyFromKeccak256(key) if err != nil { return err } - _, err = d.db.Exec(putPgStr, mhKey, value) + tx, err := d.db.Beginx() + if err != nil { + return err + } + defer func() { + if err != nil { + if err := tx.Rollback(); err != nil { + logrus.Error(err) + } + } else { + err = tx.Commit() + } + }() + if _, err = tx.Exec(putPgStr, mhKey, value); err != nil { + return err + } + _, err = tx.Exec(putPreimagePgStr, key, mhKey) return err } // Delete satisfies the ethdb.KeyValueWriter interface // Delete removes the key from the key-value data store +// Delete uses the eth.key_preimages table func (d *Database) Delete(key []byte) error { - mhKey, err := MultihashKeyFromKeccak256(key) - if err != nil { - return err - } - _, err = d.db.Exec(deletePgStr, mhKey) + _, err := d.db.Exec(deletePgStr, key) return err } diff --git a/postgres/database_test.go b/postgres/database_test.go index 93054f4..ca560bf 100644 --- a/postgres/database_test.go +++ b/postgres/database_test.go @@ -59,6 +59,8 @@ var _ = Describe("Database", func() { It("returns true if a key-pair exists in the db", func() { _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testEthKey, testMhKey) + Expect(err).ToNot(HaveOccurred()) has, err := database.Has(testEthKey) Expect(err).ToNot(HaveOccurred()) Expect(has).To(BeTrue()) @@ -74,6 +76,8 @@ var _ = Describe("Database", func() { It("returns the value associated with the key, if the pair exists", func() { _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) Expect(err).ToNot(HaveOccurred()) + _, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testEthKey, testMhKey) + Expect(err).ToNot(HaveOccurred()) val, err := database.Get(testEthKey) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) diff --git a/postgres/db/migrations/00001_create_ipfs_blocks_table.sql b/postgres/db/migrations/00001_create_ipfs_blocks_table.sql new file mode 100644 index 0000000..6e3941e --- /dev/null +++ b/postgres/db/migrations/00001_create_ipfs_blocks_table.sql @@ -0,0 +1,8 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS public.blocks ( + key TEXT UNIQUE NOT NULL, + data BYTEA NOT NULL +); + +-- +goose Down +DROP TABLE public.blocks; diff --git a/postgres/db/migrations/00002_create_eth_schema.sql b/postgres/db/migrations/00002_create_eth_schema.sql new file mode 100644 index 0000000..79ed225 --- /dev/null +++ b/postgres/db/migrations/00002_create_eth_schema.sql @@ -0,0 +1,5 @@ +-- +goose Up +CREATE SCHEMA eth; + +-- +goose Down +DROP SCHEMA eth; diff --git a/postgres/db/migrations/00003_create_eth_key_preimages_table.sql b/postgres/db/migrations/00003_create_eth_key_preimages_table.sql new file mode 100644 index 0000000..e949506 --- /dev/null +++ b/postgres/db/migrations/00003_create_eth_key_preimages_table.sql @@ -0,0 +1,8 @@ +-- +goose Up +CREATE TABLE IF NOT EXISTS eth.key_preimages ( + eth_key BYTEA UNIQUE NOT NULL, + ipfs_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED +); + +-- +goose Down +DROP TABLE eth.key_preimages; diff --git a/postgres/util.go b/postgres/util.go index cdd818f..e12f0cc 100644 --- a/postgres/util.go +++ b/postgres/util.go @@ -44,6 +44,6 @@ func TestDB() (*sqlx.DB, error) { // ResetTestDB drops all rows in the test db public.blocks table func ResetTestDB(db *sqlx.DB) error { - _, err := db.Exec("TRUNCATE public.blocks") + _, err := db.Exec("TRUNCATE public.blocks CASCADE") return err }