Compare commits
1 Commits
v5
...
iterator_s
Author | SHA1 | Date | |
---|---|---|---|
|
0b26f2cc8b |
1
go.mod
1
go.mod
@ -16,4 +16,5 @@ require (
|
|||||||
github.com/multiformats/go-multihash v0.0.13
|
github.com/multiformats/go-multihash v0.0.13
|
||||||
github.com/onsi/ginkgo v1.8.0
|
github.com/onsi/ginkgo v1.8.0
|
||||||
github.com/onsi/gomega v1.5.0
|
github.com/onsi/gomega v1.5.0
|
||||||
|
github.com/sirupsen/logrus v1.6.0
|
||||||
)
|
)
|
||||||
|
@ -51,6 +51,9 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
|||||||
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil {
|
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if _, err = b.tx.Exec(putPreimagePgStr, key, mhKey); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
b.valueSize += len(value)
|
b.valueSize += len(value)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -58,11 +61,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
|||||||
// Delete satisfies the ethdb.Batch interface
|
// Delete satisfies the ethdb.Batch interface
|
||||||
// Delete removes the key from the key-value data store
|
// Delete removes the key from the key-value data store
|
||||||
func (b *Batch) Delete(key []byte) (err error) {
|
func (b *Batch) Delete(key []byte) (err error) {
|
||||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
_, err = b.tx.Exec(deletePgStr, key)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = b.tx.Exec(deletePgStr, mhKey)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,6 +21,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
@ -28,11 +30,12 @@ import (
|
|||||||
var errNotSupported = errors.New("this operation is not supported")
|
var errNotSupported = errors.New("this operation is not supported")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)"
|
hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)"
|
||||||
getPgStr = "SELECT data FROM public.blocks WHERE key = $1"
|
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
|
||||||
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
||||||
deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
|
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2) ON CONFLICT (eth_key) DO NOTHING"
|
||||||
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1"
|
||||||
|
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
@ -56,46 +59,54 @@ func NewDatabase(db *sqlx.DB) ethdb.Database {
|
|||||||
|
|
||||||
// Has satisfies the ethdb.KeyValueReader interface
|
// Has satisfies the ethdb.KeyValueReader interface
|
||||||
// Has retrieves if a key is present in the key-value data store
|
// Has retrieves if a key is present in the key-value data store
|
||||||
|
// Has uses the eth.key_preimages table
|
||||||
func (d *Database) Has(key []byte) (bool, error) {
|
func (d *Database) Has(key []byte) (bool, error) {
|
||||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
var exists bool
|
var exists bool
|
||||||
return exists, d.db.Get(&exists, hasPgStr, mhKey)
|
return exists, d.db.Get(&exists, hasPgStr, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get satisfies the ethdb.KeyValueReader interface
|
// Get satisfies the ethdb.KeyValueReader interface
|
||||||
// Get retrieves the given key if it's present in the key-value data store
|
// Get retrieves the given key if it's present in the key-value data store
|
||||||
|
// Get uses the eth.key_preimages table
|
||||||
func (d *Database) Get(key []byte) ([]byte, error) {
|
func (d *Database) Get(key []byte) ([]byte, error) {
|
||||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var data []byte
|
var data []byte
|
||||||
return data, d.db.Get(&data, getPgStr, mhKey)
|
return data, d.db.Get(&data, getPgStr, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put satisfies the ethdb.KeyValueWriter interface
|
// Put satisfies the ethdb.KeyValueWriter interface
|
||||||
// Put inserts the given value into the key-value data store
|
// Put inserts the given value into the key-value data store
|
||||||
// Key is expected to be the keccak256 hash of value
|
// Key is expected to be the keccak256 hash of value
|
||||||
|
// Put inserts the keccak256 key into the eth.key_preimages table
|
||||||
func (d *Database) Put(key []byte, value []byte) error {
|
func (d *Database) Put(key []byte, value []byte) error {
|
||||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = d.db.Exec(putPgStr, mhKey, value)
|
tx, err := d.db.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
if err := tx.Rollback(); err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = tx.Commit()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if _, err = tx.Exec(putPgStr, mhKey, value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(putPreimagePgStr, key, mhKey)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete satisfies the ethdb.KeyValueWriter interface
|
// Delete satisfies the ethdb.KeyValueWriter interface
|
||||||
// Delete removes the key from the key-value data store
|
// Delete removes the key from the key-value data store
|
||||||
|
// Delete uses the eth.key_preimages table
|
||||||
func (d *Database) Delete(key []byte) error {
|
func (d *Database) Delete(key []byte) error {
|
||||||
mhKey, err := MultihashKeyFromKeccak256(key)
|
_, err := d.db.Exec(deletePgStr, key)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = d.db.Exec(deletePgStr, mhKey)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,6 +59,8 @@ var _ = Describe("Database", func() {
|
|||||||
It("returns true if a key-pair exists in the db", func() {
|
It("returns true if a key-pair exists in the db", func() {
|
||||||
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testEthKey, testMhKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
has, err := database.Has(testEthKey)
|
has, err := database.Has(testEthKey)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(has).To(BeTrue())
|
Expect(has).To(BeTrue())
|
||||||
@ -74,6 +76,8 @@ var _ = Describe("Database", func() {
|
|||||||
It("returns the value associated with the key, if the pair exists", func() {
|
It("returns the value associated with the key, if the pair exists", func() {
|
||||||
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
_, err = db.Exec("INSERT into eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2)", testEthKey, testMhKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
val, err := database.Get(testEthKey)
|
val, err := database.Get(testEthKey)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(val).To(Equal(testValue))
|
Expect(val).To(Equal(testValue))
|
||||||
|
@ -0,0 +1,8 @@
|
|||||||
|
-- +goose Up
|
||||||
|
CREATE TABLE IF NOT EXISTS public.blocks (
|
||||||
|
key TEXT UNIQUE NOT NULL,
|
||||||
|
data BYTEA NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
DROP TABLE public.blocks;
|
5
postgres/db/migrations/00002_create_eth_schema.sql
Normal file
5
postgres/db/migrations/00002_create_eth_schema.sql
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
-- +goose Up
|
||||||
|
CREATE SCHEMA eth;
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
DROP SCHEMA eth;
|
@ -0,0 +1,8 @@
|
|||||||
|
-- +goose Up
|
||||||
|
CREATE TABLE IF NOT EXISTS eth.key_preimages (
|
||||||
|
eth_key BYTEA UNIQUE NOT NULL,
|
||||||
|
ipfs_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
|
||||||
|
);
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
DROP TABLE eth.key_preimages;
|
@ -44,6 +44,6 @@ func TestDB() (*sqlx.DB, error) {
|
|||||||
|
|
||||||
// ResetTestDB drops all rows in the test db public.blocks table
|
// ResetTestDB drops all rows in the test db public.blocks table
|
||||||
func ResetTestDB(db *sqlx.DB) error {
|
func ResetTestDB(db *sqlx.DB) error {
|
||||||
_, err := db.Exec("TRUNCATE public.blocks")
|
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user