Compare commits
1 Commits
v5
...
iterator_s
Author | SHA1 | Date | |
---|---|---|---|
|
0b26f2cc8b |
1
go.mod
1
go.mod
@ -16,4 +16,5 @@ require (
|
||||
github.com/multiformats/go-multihash v0.0.13
|
||||
github.com/onsi/ginkgo v1.8.0
|
||||
github.com/onsi/gomega v1.5.0
|
||||
github.com/sirupsen/logrus v1.6.0
|
||||
)
|
||||
|
@ -51,6 +51,9 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
||||
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = b.tx.Exec(putPreimagePgStr, key, mhKey); err != nil {
|
||||
return err
|
||||
}
|
||||
b.valueSize += len(value)
|
||||
return nil
|
||||
}
|
||||
@ -58,11 +61,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
||||
// Delete satisfies the ethdb.Batch interface
|
||||
// Delete removes the key from the key-value data store
|
||||
func (b *Batch) Delete(key []byte) (err error) {
|
||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = b.tx.Exec(deletePgStr, mhKey)
|
||||
_, err = b.tx.Exec(deletePgStr, key)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,8 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
@ -28,11 +30,12 @@ import (
|
||||
var errNotSupported = errors.New("this operation is not supported")
|
||||
|
||||
var (
|
||||
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)"
|
||||
getPgStr = "SELECT data FROM public.blocks WHERE key = $1"
|
||||
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
||||
deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
|
||||
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
||||
hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)"
|
||||
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
|
||||
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
||||
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2) ON CONFLICT (eth_key) DO NOTHING"
|
||||
deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1"
|
||||
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
||||
)
|
||||
|
||||
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
||||
@ -56,46 +59,54 @@ func NewDatabase(db *sqlx.DB) ethdb.Database {
|
||||
|
||||
// Has satisfies the ethdb.KeyValueReader interface
|
||||
// Has retrieves if a key is present in the key-value data store
|
||||
// Has uses the eth.key_preimages table
|
||||
func (d *Database) Has(key []byte) (bool, error) {
|
||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
var exists bool
|
||||
return exists, d.db.Get(&exists, hasPgStr, mhKey)
|
||||
return exists, d.db.Get(&exists, hasPgStr, key)
|
||||
}
|
||||
|
||||
// Get satisfies the ethdb.KeyValueReader interface
|
||||
// Get retrieves the given key if it's present in the key-value data store
|
||||
// Get uses the eth.key_preimages table
|
||||
func (d *Database) Get(key []byte) ([]byte, error) {
|
||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var data []byte
|
||||
return data, d.db.Get(&data, getPgStr, mhKey)
|
||||
return data, d.db.Get(&data, getPgStr, key)
|
||||
}
|
||||
|
||||
// Put satisfies the ethdb.KeyValueWriter interface
|
||||
// Put inserts the given value into the key-value data store
|
||||
// Key is expected to be the keccak256 hash of value
|
||||
// Put inserts the keccak256 key into the eth.key_preimages table
|
||||
func (d *Database) Put(key []byte, value []byte) error {
|
||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = d.db.Exec(putPgStr, mhKey, value)
|
||||
tx, err := d.db.Beginx()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if err := tx.Rollback(); err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
} else {
|
||||
err = tx.Commit()
|
||||
}
|
||||
}()
|
||||
if _, err = tx.Exec(putPgStr, mhKey, value); err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = tx.Exec(putPreimagePgStr, key, mhKey)
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete satisfies the ethdb.KeyValueWriter interface
|
||||
// Delete removes the key from the key-value data store
|
||||
// Delete uses the eth.key_preimages table
|
||||
func (d *Database) Delete(key []byte) error {
|
||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = d.db.Exec(deletePgStr, mhKey)
|
||||
_, err := d.db.Exec(deletePgStr, key)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -59,6 +59,8 @@ var _ = Describe("Database", func() {
|
||||
It("returns true if a key-pair exists in the db", func() {
|
||||
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testEthKey, testMhKey)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
has, err := database.Has(testEthKey)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(has).To(BeTrue())
|
||||
@ -74,6 +76,8 @@ var _ = Describe("Database", func() {
|
||||
It("returns the value associated with the key, if the pair exists", func() {
|
||||
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testEthKey, testMhKey)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
val, err := database.Get(testEthKey)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(val).To(Equal(testValue))
|
||||
|
@ -0,0 +1,8 @@
|
||||
-- +goose Up
|
||||
CREATE TABLE IF NOT EXISTS public.blocks (
|
||||
key TEXT UNIQUE NOT NULL,
|
||||
data BYTEA NOT NULL
|
||||
);
|
||||
|
||||
-- +goose Down
|
||||
DROP TABLE public.blocks;
|
5
postgres/db/migrations/00002_create_eth_schema.sql
Normal file
5
postgres/db/migrations/00002_create_eth_schema.sql
Normal file
@ -0,0 +1,5 @@
|
||||
-- +goose Up
|
||||
CREATE SCHEMA eth;
|
||||
|
||||
-- +goose Down
|
||||
DROP SCHEMA eth;
|
@ -0,0 +1,8 @@
|
||||
-- +goose Up
|
||||
CREATE TABLE IF NOT EXISTS eth.key_preimages (
|
||||
eth_key BYTEA UNIQUE NOT NULL,
|
||||
ipfs_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
|
||||
);
|
||||
|
||||
-- +goose Down
|
||||
DROP TABLE eth.key_preimages;
|
@ -44,6 +44,6 @@ func TestDB() (*sqlx.DB, error) {
|
||||
|
||||
// ResetTestDB drops all rows in the test db public.blocks table
|
||||
func ResetTestDB(db *sqlx.DB) error {
|
||||
_, err := db.Exec("TRUNCATE public.blocks")
|
||||
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user