Compare commits

...

1 Commits

Author SHA1 Message Date
Ian Norden
0b26f2cc8b utilize eth key preimage table 2020-09-08 14:15:35 -05:00
8 changed files with 63 additions and 27 deletions

1
go.mod
View File

@ -16,4 +16,5 @@ require (
github.com/multiformats/go-multihash v0.0.13
github.com/onsi/ginkgo v1.8.0
github.com/onsi/gomega v1.5.0
github.com/sirupsen/logrus v1.6.0
)

View File

@ -51,6 +51,9 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil {
return err
}
if _, err = b.tx.Exec(putPreimagePgStr, key, mhKey); err != nil {
return err
}
b.valueSize += len(value)
return nil
}
@ -58,11 +61,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
// Delete satisfies the ethdb.Batch interface
// Delete removes the key from the key-value data store
func (b *Batch) Delete(key []byte) (err error) {
mhKey, err := MultihashKeyFromKeccak256(key)
if err != nil {
return err
}
_, err = b.tx.Exec(deletePgStr, mhKey)
_, err = b.tx.Exec(deletePgStr, key)
return err
}

View File

@ -21,6 +21,8 @@ import (
"fmt"
"strings"
"github.com/sirupsen/logrus"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
@ -28,11 +30,12 @@ import (
var errNotSupported = errors.New("this operation is not supported")
var (
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)"
getPgStr = "SELECT data FROM public.blocks WHERE key = $1"
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)"
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2) ON CONFLICT (eth_key) DO NOTHING"
deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
)
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
@ -56,46 +59,54 @@ func NewDatabase(db *sqlx.DB) ethdb.Database {
// Has satisfies the ethdb.KeyValueReader interface
// Has retrieves if a key is present in the key-value data store
// Has uses the eth.key_preimages table
func (d *Database) Has(key []byte) (bool, error) {
mhKey, err := MultihashKeyFromKeccak256(key)
if err != nil {
return false, err
}
var exists bool
return exists, d.db.Get(&exists, hasPgStr, mhKey)
return exists, d.db.Get(&exists, hasPgStr, key)
}
// Get satisfies the ethdb.KeyValueReader interface
// Get retrieves the given key if it's present in the key-value data store
// Get uses the eth.key_preimages table
func (d *Database) Get(key []byte) ([]byte, error) {
mhKey, err := MultihashKeyFromKeccak256(key)
if err != nil {
return nil, err
}
var data []byte
return data, d.db.Get(&data, getPgStr, mhKey)
return data, d.db.Get(&data, getPgStr, key)
}
// Put satisfies the ethdb.KeyValueWriter interface
// Put inserts the given value into the key-value data store
// Key is expected to be the keccak256 hash of value
// Put inserts the keccak256 key into the eth.key_preimages table
func (d *Database) Put(key []byte, value []byte) error {
mhKey, err := MultihashKeyFromKeccak256(key)
if err != nil {
return err
}
_, err = d.db.Exec(putPgStr, mhKey, value)
tx, err := d.db.Beginx()
if err != nil {
return err
}
defer func() {
if err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
} else {
err = tx.Commit()
}
}()
if _, err = tx.Exec(putPgStr, mhKey, value); err != nil {
return err
}
_, err = tx.Exec(putPreimagePgStr, key, mhKey)
return err
}
// Delete satisfies the ethdb.KeyValueWriter interface
// Delete removes the key from the key-value data store
// Delete uses the eth.key_preimages table
func (d *Database) Delete(key []byte) error {
mhKey, err := MultihashKeyFromKeccak256(key)
if err != nil {
return err
}
_, err = d.db.Exec(deletePgStr, mhKey)
_, err := d.db.Exec(deletePgStr, key)
return err
}

View File

@ -59,6 +59,8 @@ var _ = Describe("Database", func() {
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testEthKey, testMhKey)
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
@ -74,6 +76,8 @@ var _ = Describe("Database", func() {
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
Expect(err).ToNot(HaveOccurred())
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testEthKey, testMhKey)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testEthKey)
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))

View File

@ -0,0 +1,8 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS public.blocks (
key TEXT UNIQUE NOT NULL,
data BYTEA NOT NULL
);
-- +goose Down
DROP TABLE public.blocks;

View File

@ -0,0 +1,5 @@
-- +goose Up
CREATE SCHEMA eth;
-- +goose Down
DROP SCHEMA eth;

View File

@ -0,0 +1,8 @@
-- +goose Up
CREATE TABLE IF NOT EXISTS eth.key_preimages (
eth_key BYTEA UNIQUE NOT NULL,
ipfs_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
);
-- +goose Down
DROP TABLE eth.key_preimages;

View File

@ -44,6 +44,6 @@ func TestDB() (*sqlx.DB, error) {
// ResetTestDB drops all rows in the test db public.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE public.blocks")
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
return err
}