Merge pull request #50 from cerc-io/ian/v5

v0 blockstore compatible version
This commit is contained in:
Ian Norden 2023-03-30 15:51:59 -05:00 committed by GitHub
commit 4a52acb0bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 945 additions and 21 deletions

33
postgres/shared/util.go Normal file
View File

@ -0,0 +1,33 @@
// VulcanizeDB
// Copyright © 2023 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
import "github.com/jmoiron/sqlx"
// TestDB connect to the testing database
// it assumes the database has the IPFS public.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db public.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
return err
}

117
postgres/v0/batch.go Normal file
View File

@ -0,0 +1,117 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"math/big"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
var _ ethdb.Batch = &Batch{}
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
type Batch struct {
db *sqlx.DB
tx *sqlx.Tx
valueSize int
blockNumber *big.Int
}
// NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
b := &Batch{
db: db,
tx: tx,
blockNumber: blockNumber,
}
if tx == nil {
b.Reset()
}
return b
}
// Put satisfies the ethdb.Batch interface
// Put inserts the given value into the key-value data store
// Key is expected to be a fully formulated cid key
// TODO: note, now that we expected a cid we could route to the "cids" tables based on prefix instead of to public.blocks
// but is it better to handle this routing here, or use a completely different interface since we already have to refactor
// at levels above this package in order to pass in cids instead of raw keccak256 hashes
func (b *Batch) Put(cidBytes []byte, value []byte) (err error) {
// cast and resolve strings from cid.Cast
// this will assert that we have a correctly formatted CID
// and will handle the different string encodings for v0 and v1 CIDs
// (note that this v0 vs v1 is different from the blockstore v0 vs v1)
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
if _, err = b.tx.Exec(putPgStr, c.String(), value, b.blockNumber.Uint64()); err != nil {
return err
}
b.valueSize += len(value)
return nil
}
// Delete satisfies the ethdb.Batch interface
// Delete removes the key from the key-value data store
func (b *Batch) Delete(cidBytes []byte) (err error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
_, err = b.tx.Exec(deletePgStr, c.String())
return err
}
// ValueSize satisfies the ethdb.Batch interface
// ValueSize retrieves the amount of data queued up for writing
// The returned value is the total byte length of all data queued to write
func (b *Batch) ValueSize() int {
return b.valueSize
}
// Write satisfies the ethdb.Batch interface
// Write flushes any accumulated data to disk
func (b *Batch) Write() error {
if b.tx == nil {
return nil
}
return b.tx.Commit()
}
// Replay satisfies the ethdb.Batch interface
// Replay replays the batch contents
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
return errNotSupported
}
// Reset satisfies the ethdb.Batch interface
// Reset resets the batch for reuse
// This should be called after every write
func (b *Batch) Reset() {
var err error
b.tx, err = b.db.Beginx()
if err != nil {
panic(err)
}
b.valueSize = 0
}

138
postgres/v0/batch_test.go Normal file
View File

@ -0,0 +1,138 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v4/postgres/shared"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/mailgun/groupcache/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v0"
)
var (
batch ethdb.Batch
testHeader2 = types.Header{Number: big.NewInt(2)}
testValue2, _ = rlp.EncodeToBytes(testHeader2)
testEthKey2 = testHeader2.Hash().Bytes()
testCID2, _ = pgipfsethdb.CIDFromKeccak256(testEthKey2, cid.EthBlock)
)
var _ = Describe("Batch", func() {
BeforeEach(func() {
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
batch = database.NewBatch()
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("Put/Write", func() {
It("adds the key-value pair to the batch", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testCID2.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
val2, err := database.Get(testCID2.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val2).To(Equal(testValue2))
})
})
Describe("Delete/Reset/Write", func() {
It("deletes the key-value pair in the batch", func() {
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
batch.Reset()
err = batch.Delete(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
err = batch.Delete(testCID2.Bytes())
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testCID2.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("ValueSize/Reset", func() {
It("returns the size of data in the batch queued for write", func() {
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
size := batch.ValueSize()
Expect(size).To(Equal(len(testValue) + len(testValue2)))
batch.Reset()
size = batch.ValueSize()
Expect(size).To(Equal(0))
})
})
})

364
postgres/v0/database.go Normal file
View File

@ -0,0 +1,364 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"context"
"database/sql"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"time"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
)
var errNotSupported = errors.New("this operation is not supported")
var (
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
)
var _ ethdb.Database = &Database{}
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
type Database struct {
db *sqlx.DB
cache *groupcache.Group
BlockNumber *big.Int
}
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
return 0, errNotSupported
}
type CacheConfig struct {
Name string
Size int
ExpiryDuration time.Duration
}
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore {
database := Database{db: db}
database.InitCache(cacheConfig)
return &database
}
// NewDatabase returns a ethdb.Database interface for PG-IPFS
func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database {
database := Database{db: db}
database.InitCache(cacheConfig)
return &database
}
func (d *Database) InitCache(cacheConfig CacheConfig) {
d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc(
func(_ context.Context, id string, dest groupcache.Sink) error {
val, err := d.dbGet(id)
if err != nil {
return err
}
// Set the value in the groupcache, with expiry
if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil {
return err
}
return nil
},
))
}
func (d *Database) GetCacheStats() groupcache.Stats {
return d.cache.Stats
}
// Has satisfies the ethdb.KeyValueReader interface
// Has retrieves if a cid is present in the key-value data store
func (d *Database) Has(cidBytes []byte) (bool, error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return false, err
}
var exists bool
return exists, d.db.Get(&exists, hasPgStr, c.String())
}
// Get retrieves the given key if it's present in the key-value data store
func (d *Database) dbGet(key string) ([]byte, error) {
var data []byte
err := d.db.Get(&data, getPgStr, key)
if err == sql.ErrNoRows {
log.Warn("Database miss for key", key)
}
return data, err
}
// Get satisfies the ethdb.KeyValueReader interface
// Get retrieves the given cid if it's present in the key-value data store
func (d *Database) Get(cidBytes []byte) ([]byte, error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
var data []byte
return data, d.cache.Get(ctx, c.String(), groupcache.AllocatingByteSliceSink(&data))
}
// Put satisfies the ethdb.KeyValueWriter interface
// Put inserts the given value into the key-value data store
// Key is expected to be a fully formulated cis of value
func (d *Database) Put(cidBytes []byte, value []byte) error {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
_, err = d.db.Exec(putPgStr, c.String(), value, d.BlockNumber.Uint64())
return err
}
// Delete satisfies the ethdb.KeyValueWriter interface
// Delete removes the cid from the key-value data store
func (d *Database) Delete(cidBytes []byte) error {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
cidString := c.String()
_, err = d.db.Exec(deletePgStr, cidString)
if err != nil {
return err
}
// Remove from cache.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
err = d.cache.Remove(ctx, cidString)
return err
}
// DatabaseProperty enum type
type DatabaseProperty int
const (
Unknown DatabaseProperty = iota
Size
Idle
InUse
MaxIdleClosed
MaxLifetimeClosed
MaxOpenConnections
OpenConnections
WaitCount
WaitDuration
)
// DatabasePropertyFromString helper function
func DatabasePropertyFromString(property string) (DatabaseProperty, error) {
switch strings.ToLower(property) {
case "size":
return Size, nil
case "idle":
return Idle, nil
case "inuse":
return InUse, nil
case "maxidleclosed":
return MaxIdleClosed, nil
case "maxlifetimeclosed":
return MaxLifetimeClosed, nil
case "maxopenconnections":
return MaxOpenConnections, nil
case "openconnections":
return OpenConnections, nil
case "waitcount":
return WaitCount, nil
case "waitduration":
return WaitDuration, nil
default:
return Unknown, fmt.Errorf("unknown database property")
}
}
// Stat satisfies the ethdb.Stater interface
// Stat returns a particular internal stat of the database
func (d *Database) Stat(property string) (string, error) {
prop, err := DatabasePropertyFromString(property)
if err != nil {
return "", err
}
switch prop {
case Size:
var byteSize string
return byteSize, d.db.Get(&byteSize, dbSizePgStr)
case Idle:
return strconv.Itoa(d.db.Stats().Idle), nil
case InUse:
return strconv.Itoa(d.db.Stats().InUse), nil
case MaxIdleClosed:
return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil
case MaxLifetimeClosed:
return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil
case MaxOpenConnections:
return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil
case OpenConnections:
return strconv.Itoa(d.db.Stats().OpenConnections), nil
case WaitCount:
return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil
case WaitDuration:
return d.db.Stats().WaitDuration.String(), nil
default:
return "", fmt.Errorf("unhandled database property")
}
}
// Compact satisfies the ethdb.Compacter interface
// Compact flattens the underlying data store for the given key range
func (d *Database) Compact(start []byte, limit []byte) error {
return errNotSupported
}
// NewBatch satisfies the ethdb.Batcher interface
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called
func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewBatchWithSize satisfies the ethdb.Batcher interface.
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewIterator satisfies the ethdb.Iteratee interface
// it creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
//
// Note: This method assumes that the prefix is NOT part of the start, so there's
// no need for the caller to prepend the prefix to the start
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
return NewIterator(start, prefix, d.db)
}
// Close satisfies the io.Closer interface
// Close closes the db connection
func (d *Database) Close() error {
return d.db.DB.Close()
}
// HasAncient satisfies the ethdb.AncientReader interface
// HasAncient returns an indicator whether the specified data exists in the ancient store
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
return false, errNotSupported
}
// Ancient satisfies the ethdb.AncientReader interface
// Ancient retrieves an ancient binary blob from the append-only immutable files
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}
// Ancients satisfies the ethdb.AncientReader interface
// Ancients returns the ancient item numbers in the ancient store
func (d *Database) Ancients() (uint64, error) {
return 0, errNotSupported
}
// Tail satisfies the ethdb.AncientReader interface.
// Tail returns the number of first stored item in the freezer.
func (d *Database) Tail() (uint64, error) {
return 0, errNotSupported
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
// AncientRange retrieves all the items in a range, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return nil, errNotSupported
}
// ReadAncients applies the provided AncientReader function
func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
return errNotSupported
}
// TruncateHead satisfies the ethdb.AncientWriter interface.
// TruncateHead discards all but the first n ancient data from the ancient store.
func (d *Database) TruncateHead(n uint64) error {
return errNotSupported
}
// TruncateTail satisfies the ethdb.AncientWriter interface.
// TruncateTail discards the first n ancient data from the ancient store.
func (d *Database) TruncateTail(n uint64) error {
return errNotSupported
}
// Sync satisfies the ethdb.AncientWriter interface
// Sync flushes all in-memory ancient store data to disk
func (d *Database) Sync() error {
return errNotSupported
}
// MigrateTable satisfies the ethdb.AncientWriter interface.
// MigrateTable processes and migrates entries of a given table to a new format.
func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
return errNotSupported
}
// NewSnapshot satisfies the ethdb.Snapshotter interface.
// NewSnapshot creates a database snapshot based on the current state.
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
return nil, errNotSupported
}
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (d *Database) AncientDatadir() (string, error) {
return "", errNotSupported
}

View File

@ -0,0 +1,131 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v4/postgres/shared"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v0"
)
var (
database ethdb.Database
db *sqlx.DB
err error
testBlockNumber = big.NewInt(1337)
testHeader = types.Header{Number: testBlockNumber}
testValue, _ = rlp.EncodeToBytes(testHeader)
testEthKey = testHeader.Hash().Bytes()
testCID, _ = pgipfsethdb.CIDFromKeccak256(testEthKey, cid.EthBlock)
)
var _ = Describe("Database", func() {
BeforeEach(func() {
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("Has", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Get", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Delete", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
})

91
postgres/v0/iterator.go Normal file
View File

@ -0,0 +1,91 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ipfs/go-cid"
"github.com/jmoiron/sqlx"
)
var _ ethdb.Iterator = &Iterator{}
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync),
// rawdb.InspectDatabase, and the new core/state/snapshot features.
// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
type Iterator struct {
db *sqlx.DB
currentKey, prefix []byte
err error
}
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
return &Iterator{
db: db,
prefix: prefix,
currentKey: start,
}
}
// Next satisfies the ethdb.Iterator interface
// Next moves the iterator to the next key/value pair
// It returns whether the iterator is exhausted
func (i *Iterator) Next() bool {
// this is complicated by the ipfs db keys not being the keccak256 hashes
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
panic("implement me: Next")
}
// Error satisfies the ethdb.Iterator interface
// Error returns any accumulated error
// Exhausting all the key/value pairs is not considered to be an error
func (i *Iterator) Error() error {
return i.err
}
// Key satisfies the ethdb.Iterator interface
// Key returns the key of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Key() []byte {
return i.currentKey
}
// Value satisfies the ethdb.Iterator interface
// Value returns the value of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Value() []byte {
c, err := cid.Cast(i.currentKey)
if err != nil {
i.err = err
return nil
}
var data []byte
i.err = i.db.Get(&data, getPgStr, c)
return data
}
// Release satisfies the ethdb.Iterator interface
// Release releases associated resources
// Release should always succeed and can be called multiple times without causing error
func (i *Iterator) Release() {
i.db.Close()
}

32
postgres/v0/util.go Normal file
View File

@ -0,0 +1,32 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"github.com/ipfs/go-cid"
_ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash"
)
// CIDFromKeccak256 converts keccak256 hash bytes into a v1 cid
func CIDFromKeccak256(hash []byte, codecType uint64) (cid.Cid, error) {
mh, err := multihash.Encode(hash, multihash.KECCAK_256)
if err != nil {
return cid.Cid{}, err
}
return cid.NewCidV1(codecType, mh), nil
}

View File

@ -20,6 +20,8 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/cerc-io/ipfs-ethdb/v4/postgres/shared"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -27,7 +29,7 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v1"
) )
var ( var (
@ -39,7 +41,7 @@ var (
var _ = Describe("Batch", func() { var _ = Describe("Batch", func() {
BeforeEach(func() { BeforeEach(func() {
db, err = pgipfsethdb.TestDB() db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{ cacheConfig := pgipfsethdb.CacheConfig{
@ -58,7 +60,7 @@ var _ = Describe("Batch", func() {
}) })
AfterEach(func() { AfterEach(func() {
groupcache.DeregisterGroup("db") groupcache.DeregisterGroup("db")
err = pgipfsethdb.ResetTestDB(db) err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })

View File

@ -20,6 +20,8 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/cerc-io/ipfs-ethdb/v4/postgres/shared"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -28,7 +30,7 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v1"
) )
var ( var (
@ -44,7 +46,7 @@ var (
var _ = Describe("Database", func() { var _ = Describe("Database", func() {
BeforeEach(func() { BeforeEach(func() {
db, err = pgipfsethdb.TestDB() db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{ cacheConfig := pgipfsethdb.CacheConfig{
@ -61,7 +63,7 @@ var _ = Describe("Database", func() {
}) })
AfterEach(func() { AfterEach(func() {
groupcache.DeregisterGroup("db") groupcache.DeregisterGroup("db")
err = pgipfsethdb.ResetTestDB(db) err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })

View File

@ -0,0 +1,29 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestPGIPFSETHDB(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "PG-IPFS ethdb test")
}

View File

@ -19,7 +19,6 @@ package pgipfsethdb
import ( import (
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" //postgres driver _ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
) )
@ -33,17 +32,3 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) {
dbKey := dshelp.MultihashToDsKey(mh) dbKey := dshelp.MultihashToDsKey(mh)
return blockstore.BlockPrefix.String() + dbKey.String(), nil return blockstore.BlockPrefix.String() + dbKey.String(), nil
} }
// TestDB connect to the testing database
// it assumes the database has the IPFS public.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db public.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
return err
}