forked from cerc-io/ipfs-ethdb
v0 and v1 subdirectories for postgres package; for working with the corresponding version of blockstore format
This commit is contained in:
parent
68a24ab952
commit
285b0f0db4
108
postgres/v1/batch.go
Normal file
108
postgres/v1/batch.go
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2020 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ ethdb.Batch = &Batch{}
|
||||||
|
|
||||||
|
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
|
type Batch struct {
|
||||||
|
db *sqlx.DB
|
||||||
|
tx *sqlx.Tx
|
||||||
|
valueSize int
|
||||||
|
|
||||||
|
blockNumber *big.Int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
||||||
|
func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
|
||||||
|
b := &Batch{
|
||||||
|
db: db,
|
||||||
|
tx: tx,
|
||||||
|
blockNumber: blockNumber,
|
||||||
|
}
|
||||||
|
if tx == nil {
|
||||||
|
b.Reset()
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put satisfies the ethdb.Batch interface
|
||||||
|
// Put inserts the given value into the key-value data store
|
||||||
|
// Key is expected to be the keccak256 hash of value
|
||||||
|
func (b *Batch) Put(key []byte, value []byte) (err error) {
|
||||||
|
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.valueSize += len(value)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete satisfies the ethdb.Batch interface
|
||||||
|
// Delete removes the key from the key-value data store
|
||||||
|
func (b *Batch) Delete(key []byte) (err error) {
|
||||||
|
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = b.tx.Exec(deletePgStr, mhKey)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ValueSize satisfies the ethdb.Batch interface
|
||||||
|
// ValueSize retrieves the amount of data queued up for writing
|
||||||
|
// The returned value is the total byte length of all data queued to write
|
||||||
|
func (b *Batch) ValueSize() int {
|
||||||
|
return b.valueSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write satisfies the ethdb.Batch interface
|
||||||
|
// Write flushes any accumulated data to disk
|
||||||
|
func (b *Batch) Write() error {
|
||||||
|
if b.tx == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return b.tx.Commit()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Replay satisfies the ethdb.Batch interface
|
||||||
|
// Replay replays the batch contents
|
||||||
|
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
|
||||||
|
return errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset satisfies the ethdb.Batch interface
|
||||||
|
// Reset resets the batch for reuse
|
||||||
|
// This should be called after every write
|
||||||
|
func (b *Batch) Reset() {
|
||||||
|
var err error
|
||||||
|
b.tx, err = b.db.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
b.valueSize = 0
|
||||||
|
}
|
133
postgres/v1/batch_test.go
Normal file
133
postgres/v1/batch_test.go
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
"github.com/mailgun/groupcache/v2"
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
batch ethdb.Batch
|
||||||
|
testHeader2 = types.Header{Number: big.NewInt(2)}
|
||||||
|
testValue2, _ = rlp.EncodeToBytes(testHeader2)
|
||||||
|
testEthKey2 = testHeader2.Hash().Bytes()
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Batch", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
db, err = pgipfsethdb.TestDB()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
cacheConfig := pgipfsethdb.CacheConfig{
|
||||||
|
Name: "db",
|
||||||
|
Size: 3000000, // 3MB
|
||||||
|
ExpiryDuration: time.Hour,
|
||||||
|
}
|
||||||
|
|
||||||
|
database = pgipfsethdb.NewDatabase(db, cacheConfig)
|
||||||
|
|
||||||
|
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
|
||||||
|
Expect(ok).To(BeTrue())
|
||||||
|
(*databaseWithBlock).BlockNumber = testBlockNumber
|
||||||
|
|
||||||
|
batch = database.NewBatch()
|
||||||
|
})
|
||||||
|
AfterEach(func() {
|
||||||
|
groupcache.DeregisterGroup("db")
|
||||||
|
err = pgipfsethdb.ResetTestDB(db)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("Put/Write", func() {
|
||||||
|
It("adds the key-value pair to the batch", func() {
|
||||||
|
_, err = database.Get(testEthKey)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
_, err = database.Get(testEthKey2)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
|
||||||
|
err = batch.Put(testEthKey, testValue)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Put(testEthKey2, testValue2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Write()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
val, err := database.Get(testEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val).To(Equal(testValue))
|
||||||
|
val2, err := database.Get(testEthKey2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val2).To(Equal(testValue2))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("Delete/Reset/Write", func() {
|
||||||
|
It("deletes the key-value pair in the batch", func() {
|
||||||
|
err = batch.Put(testEthKey, testValue)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Put(testEthKey2, testValue2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Write()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
batch.Reset()
|
||||||
|
err = batch.Delete(testEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Delete(testEthKey2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Write()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
_, err = database.Get(testEthKey)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
_, err = database.Get(testEthKey2)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("ValueSize/Reset", func() {
|
||||||
|
It("returns the size of data in the batch queued for write", func() {
|
||||||
|
err = batch.Put(testEthKey, testValue)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Put(testEthKey2, testValue2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Write()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
size := batch.ValueSize()
|
||||||
|
Expect(size).To(Equal(len(testValue) + len(testValue2)))
|
||||||
|
|
||||||
|
batch.Reset()
|
||||||
|
size = batch.ValueSize()
|
||||||
|
Expect(size).To(Equal(0))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
361
postgres/v1/database.go
Normal file
361
postgres/v1/database.go
Normal file
@ -0,0 +1,361 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2020 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"math/big"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
"github.com/mailgun/groupcache/v2"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errNotSupported = errors.New("this operation is not supported")
|
||||||
|
|
||||||
|
var (
|
||||||
|
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1 LIMIT 1)"
|
||||||
|
getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1"
|
||||||
|
putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
|
||||||
|
deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
|
||||||
|
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ ethdb.Database = &Database{}
|
||||||
|
|
||||||
|
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
|
type Database struct {
|
||||||
|
db *sqlx.DB
|
||||||
|
cache *groupcache.Group
|
||||||
|
|
||||||
|
BlockNumber *big.Int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
|
||||||
|
return 0, errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
type CacheConfig struct {
|
||||||
|
Name string
|
||||||
|
Size int
|
||||||
|
ExpiryDuration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
|
||||||
|
func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore {
|
||||||
|
database := Database{db: db}
|
||||||
|
database.InitCache(cacheConfig)
|
||||||
|
|
||||||
|
return &database
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDatabase returns a ethdb.Database interface for PG-IPFS
|
||||||
|
func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database {
|
||||||
|
database := Database{db: db}
|
||||||
|
database.InitCache(cacheConfig)
|
||||||
|
|
||||||
|
return &database
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) InitCache(cacheConfig CacheConfig) {
|
||||||
|
d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc(
|
||||||
|
func(_ context.Context, id string, dest groupcache.Sink) error {
|
||||||
|
val, err := d.dbGet(id)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the value in the groupcache, with expiry
|
||||||
|
if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) GetCacheStats() groupcache.Stats {
|
||||||
|
return d.cache.Stats
|
||||||
|
}
|
||||||
|
|
||||||
|
// Has satisfies the ethdb.KeyValueReader interface
|
||||||
|
// Has retrieves if a key is present in the key-value data store
|
||||||
|
func (d *Database) Has(key []byte) (bool, error) {
|
||||||
|
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
var exists bool
|
||||||
|
return exists, d.db.Get(&exists, hasPgStr, mhKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves the given key if it's present in the key-value data store
|
||||||
|
func (d *Database) dbGet(key string) ([]byte, error) {
|
||||||
|
var data []byte
|
||||||
|
err := d.db.Get(&data, getPgStr, key)
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
log.Warn("Database miss for key", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
return data, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get satisfies the ethdb.KeyValueReader interface
|
||||||
|
// Get retrieves the given key if it's present in the key-value data store
|
||||||
|
func (d *Database) Get(key []byte) ([]byte, error) {
|
||||||
|
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
var data []byte
|
||||||
|
return data, d.cache.Get(ctx, mhKey, groupcache.AllocatingByteSliceSink(&data))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put satisfies the ethdb.KeyValueWriter interface
|
||||||
|
// Put inserts the given value into the key-value data store
|
||||||
|
// Key is expected to be the keccak256 hash of value
|
||||||
|
func (d *Database) Put(key []byte, value []byte) error {
|
||||||
|
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete satisfies the ethdb.KeyValueWriter interface
|
||||||
|
// Delete removes the key from the key-value data store
|
||||||
|
func (d *Database) Delete(key []byte) error {
|
||||||
|
mhKey, err := MultihashKeyFromKeccak256(key)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = d.db.Exec(deletePgStr, mhKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove from cache.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
||||||
|
defer cancel()
|
||||||
|
err = d.cache.Remove(ctx, mhKey)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// DatabaseProperty enum type
|
||||||
|
type DatabaseProperty int
|
||||||
|
|
||||||
|
const (
|
||||||
|
Unknown DatabaseProperty = iota
|
||||||
|
Size
|
||||||
|
Idle
|
||||||
|
InUse
|
||||||
|
MaxIdleClosed
|
||||||
|
MaxLifetimeClosed
|
||||||
|
MaxOpenConnections
|
||||||
|
OpenConnections
|
||||||
|
WaitCount
|
||||||
|
WaitDuration
|
||||||
|
)
|
||||||
|
|
||||||
|
// DatabasePropertyFromString helper function
|
||||||
|
func DatabasePropertyFromString(property string) (DatabaseProperty, error) {
|
||||||
|
switch strings.ToLower(property) {
|
||||||
|
case "size":
|
||||||
|
return Size, nil
|
||||||
|
case "idle":
|
||||||
|
return Idle, nil
|
||||||
|
case "inuse":
|
||||||
|
return InUse, nil
|
||||||
|
case "maxidleclosed":
|
||||||
|
return MaxIdleClosed, nil
|
||||||
|
case "maxlifetimeclosed":
|
||||||
|
return MaxLifetimeClosed, nil
|
||||||
|
case "maxopenconnections":
|
||||||
|
return MaxOpenConnections, nil
|
||||||
|
case "openconnections":
|
||||||
|
return OpenConnections, nil
|
||||||
|
case "waitcount":
|
||||||
|
return WaitCount, nil
|
||||||
|
case "waitduration":
|
||||||
|
return WaitDuration, nil
|
||||||
|
default:
|
||||||
|
return Unknown, fmt.Errorf("unknown database property")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stat satisfies the ethdb.Stater interface
|
||||||
|
// Stat returns a particular internal stat of the database
|
||||||
|
func (d *Database) Stat(property string) (string, error) {
|
||||||
|
prop, err := DatabasePropertyFromString(property)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
switch prop {
|
||||||
|
case Size:
|
||||||
|
var byteSize string
|
||||||
|
return byteSize, d.db.Get(&byteSize, dbSizePgStr)
|
||||||
|
case Idle:
|
||||||
|
return strconv.Itoa(d.db.Stats().Idle), nil
|
||||||
|
case InUse:
|
||||||
|
return strconv.Itoa(d.db.Stats().InUse), nil
|
||||||
|
case MaxIdleClosed:
|
||||||
|
return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil
|
||||||
|
case MaxLifetimeClosed:
|
||||||
|
return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil
|
||||||
|
case MaxOpenConnections:
|
||||||
|
return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil
|
||||||
|
case OpenConnections:
|
||||||
|
return strconv.Itoa(d.db.Stats().OpenConnections), nil
|
||||||
|
case WaitCount:
|
||||||
|
return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil
|
||||||
|
case WaitDuration:
|
||||||
|
return d.db.Stats().WaitDuration.String(), nil
|
||||||
|
default:
|
||||||
|
return "", fmt.Errorf("unhandled database property")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compact satisfies the ethdb.Compacter interface
|
||||||
|
// Compact flattens the underlying data store for the given key range
|
||||||
|
func (d *Database) Compact(start []byte, limit []byte) error {
|
||||||
|
return errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatch satisfies the ethdb.Batcher interface
|
||||||
|
// NewBatch creates a write-only database that buffers changes to its host db
|
||||||
|
// until a final write is called
|
||||||
|
func (d *Database) NewBatch() ethdb.Batch {
|
||||||
|
return NewBatch(d.db, nil, d.BlockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatchWithSize satisfies the ethdb.Batcher interface.
|
||||||
|
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
|
||||||
|
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
|
||||||
|
return NewBatch(d.db, nil, d.BlockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewIterator satisfies the ethdb.Iteratee interface
|
||||||
|
// it creates a binary-alphabetical iterator over a subset
|
||||||
|
// of database content with a particular key prefix, starting at a particular
|
||||||
|
// initial key (or after, if it does not exist).
|
||||||
|
//
|
||||||
|
// Note: This method assumes that the prefix is NOT part of the start, so there's
|
||||||
|
// no need for the caller to prepend the prefix to the start
|
||||||
|
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
|
||||||
|
return NewIterator(start, prefix, d.db)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close satisfies the io.Closer interface
|
||||||
|
// Close closes the db connection
|
||||||
|
func (d *Database) Close() error {
|
||||||
|
return d.db.DB.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasAncient satisfies the ethdb.AncientReader interface
|
||||||
|
// HasAncient returns an indicator whether the specified data exists in the ancient store
|
||||||
|
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
|
||||||
|
return false, errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ancient satisfies the ethdb.AncientReader interface
|
||||||
|
// Ancient retrieves an ancient binary blob from the append-only immutable files
|
||||||
|
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
|
||||||
|
return nil, errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ancients satisfies the ethdb.AncientReader interface
|
||||||
|
// Ancients returns the ancient item numbers in the ancient store
|
||||||
|
func (d *Database) Ancients() (uint64, error) {
|
||||||
|
return 0, errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tail satisfies the ethdb.AncientReader interface.
|
||||||
|
// Tail returns the number of first stored item in the freezer.
|
||||||
|
func (d *Database) Tail() (uint64, error) {
|
||||||
|
return 0, errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// AncientSize satisfies the ethdb.AncientReader interface
|
||||||
|
// AncientSize returns the ancient size of the specified category
|
||||||
|
func (d *Database) AncientSize(kind string) (uint64, error) {
|
||||||
|
return 0, errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// AncientRange retrieves all the items in a range, starting from the index 'start'.
|
||||||
|
// It will return
|
||||||
|
// - at most 'count' items,
|
||||||
|
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
|
||||||
|
// return as many items as fit into maxBytes.
|
||||||
|
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
|
||||||
|
return nil, errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadAncients applies the provided AncientReader function
|
||||||
|
func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
|
||||||
|
return errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// TruncateHead satisfies the ethdb.AncientWriter interface.
|
||||||
|
// TruncateHead discards all but the first n ancient data from the ancient store.
|
||||||
|
func (d *Database) TruncateHead(n uint64) error {
|
||||||
|
return errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// TruncateTail satisfies the ethdb.AncientWriter interface.
|
||||||
|
// TruncateTail discards the first n ancient data from the ancient store.
|
||||||
|
func (d *Database) TruncateTail(n uint64) error {
|
||||||
|
return errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync satisfies the ethdb.AncientWriter interface
|
||||||
|
// Sync flushes all in-memory ancient store data to disk
|
||||||
|
func (d *Database) Sync() error {
|
||||||
|
return errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// MigrateTable satisfies the ethdb.AncientWriter interface.
|
||||||
|
// MigrateTable processes and migrates entries of a given table to a new format.
|
||||||
|
func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
|
||||||
|
return errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSnapshot satisfies the ethdb.Snapshotter interface.
|
||||||
|
// NewSnapshot creates a database snapshot based on the current state.
|
||||||
|
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
|
||||||
|
return nil, errNotSupported
|
||||||
|
}
|
||||||
|
|
||||||
|
// AncientDatadir returns an error as we don't have a backing chain freezer.
|
||||||
|
func (d *Database) AncientDatadir() (string, error) {
|
||||||
|
return "", errNotSupported
|
||||||
|
}
|
127
postgres/v1/database_test.go
Normal file
127
postgres/v1/database_test.go
Normal file
@ -0,0 +1,127 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
"github.com/mailgun/groupcache/v2"
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
database ethdb.Database
|
||||||
|
db *sqlx.DB
|
||||||
|
err error
|
||||||
|
testBlockNumber = big.NewInt(1337)
|
||||||
|
testHeader = types.Header{Number: testBlockNumber}
|
||||||
|
testValue, _ = rlp.EncodeToBytes(testHeader)
|
||||||
|
testEthKey = testHeader.Hash().Bytes()
|
||||||
|
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Database", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
db, err = pgipfsethdb.TestDB()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
cacheConfig := pgipfsethdb.CacheConfig{
|
||||||
|
Name: "db",
|
||||||
|
Size: 3000000, // 3MB
|
||||||
|
ExpiryDuration: time.Hour,
|
||||||
|
}
|
||||||
|
|
||||||
|
database = pgipfsethdb.NewDatabase(db, cacheConfig)
|
||||||
|
|
||||||
|
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
|
||||||
|
Expect(ok).To(BeTrue())
|
||||||
|
(*databaseWithBlock).BlockNumber = testBlockNumber
|
||||||
|
})
|
||||||
|
AfterEach(func() {
|
||||||
|
groupcache.DeregisterGroup("db")
|
||||||
|
err = pgipfsethdb.ResetTestDB(db)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("Has", func() {
|
||||||
|
It("returns false if a key-pair doesn't exist in the db", func() {
|
||||||
|
has, err := database.Has(testEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(has).ToNot(BeTrue())
|
||||||
|
})
|
||||||
|
It("returns true if a key-pair exists in the db", func() {
|
||||||
|
_, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
has, err := database.Has(testEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(has).To(BeTrue())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("Get", func() {
|
||||||
|
It("throws an err if the key-pair doesn't exist in the db", func() {
|
||||||
|
_, err = database.Get(testEthKey)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
})
|
||||||
|
It("returns the value associated with the key, if the pair exists", func() {
|
||||||
|
_, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
val, err := database.Get(testEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val).To(Equal(testValue))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("Put", func() {
|
||||||
|
It("persists the key-value pair in the database", func() {
|
||||||
|
_, err = database.Get(testEthKey)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
|
||||||
|
err = database.Put(testEthKey, testValue)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
val, err := database.Get(testEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val).To(Equal(testValue))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("Delete", func() {
|
||||||
|
It("removes the key-value pair from the database", func() {
|
||||||
|
err = database.Put(testEthKey, testValue)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
val, err := database.Get(testEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val).To(Equal(testValue))
|
||||||
|
|
||||||
|
err = database.Delete(testEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
_, err = database.Get(testEthKey)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
90
postgres/v1/iterator.go
Normal file
90
postgres/v1/iterator.go
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2020 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ ethdb.Iterator = &Iterator{}
|
||||||
|
|
||||||
|
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
|
// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync),
|
||||||
|
// rawdb.InspectDatabase, and the new core/state/snapshot features.
|
||||||
|
// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
|
||||||
|
// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
|
||||||
|
type Iterator struct {
|
||||||
|
db *sqlx.DB
|
||||||
|
currentKey, prefix []byte
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
|
||||||
|
func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
|
||||||
|
return &Iterator{
|
||||||
|
db: db,
|
||||||
|
prefix: prefix,
|
||||||
|
currentKey: start,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next satisfies the ethdb.Iterator interface
|
||||||
|
// Next moves the iterator to the next key/value pair
|
||||||
|
// It returns whether the iterator is exhausted
|
||||||
|
func (i *Iterator) Next() bool {
|
||||||
|
// this is complicated by the ipfs db keys not being the keccak256 hashes
|
||||||
|
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
|
||||||
|
panic("implement me: Next")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Error satisfies the ethdb.Iterator interface
|
||||||
|
// Error returns any accumulated error
|
||||||
|
// Exhausting all the key/value pairs is not considered to be an error
|
||||||
|
func (i *Iterator) Error() error {
|
||||||
|
return i.err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Key satisfies the ethdb.Iterator interface
|
||||||
|
// Key returns the key of the current key/value pair, or nil if done
|
||||||
|
// The caller should not modify the contents of the returned slice
|
||||||
|
// and its contents may change on the next call to Next
|
||||||
|
func (i *Iterator) Key() []byte {
|
||||||
|
return i.currentKey
|
||||||
|
}
|
||||||
|
|
||||||
|
// Value satisfies the ethdb.Iterator interface
|
||||||
|
// Value returns the value of the current key/value pair, or nil if done
|
||||||
|
// The caller should not modify the contents of the returned slice
|
||||||
|
// and its contents may change on the next call to Next
|
||||||
|
func (i *Iterator) Value() []byte {
|
||||||
|
mhKey, err := MultihashKeyFromKeccak256(i.currentKey)
|
||||||
|
if err != nil {
|
||||||
|
i.err = err
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var data []byte
|
||||||
|
i.err = i.db.Get(&data, getPgStr, mhKey)
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release satisfies the ethdb.Iterator interface
|
||||||
|
// Release releases associated resources
|
||||||
|
// Release should always succeed and can be called multiple times without causing error
|
||||||
|
func (i *Iterator) Release() {
|
||||||
|
i.db.Close()
|
||||||
|
}
|
29
postgres/v1/postgres_suite_test.go
Normal file
29
postgres/v1/postgres_suite_test.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPGIPFSETHDB(t *testing.T) {
|
||||||
|
RegisterFailHandler(Fail)
|
||||||
|
RunSpecs(t, "PG-IPFS ethdb test")
|
||||||
|
}
|
49
postgres/v1/util.go
Normal file
49
postgres/v1/util.go
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
|
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
_ "github.com/lib/pq" //postgres driver
|
||||||
|
"github.com/multiformats/go-multihash"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string
|
||||||
|
func MultihashKeyFromKeccak256(h []byte) (string, error) {
|
||||||
|
mh, err := multihash.Encode(h, multihash.KECCAK_256)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
dbKey := dshelp.MultihashToDsKey(mh)
|
||||||
|
return blockstore.BlockPrefix.String() + dbKey.String(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDB connect to the testing database
|
||||||
|
// it assumes the database has the IPFS public.blocks table present
|
||||||
|
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
|
||||||
|
func TestDB() (*sqlx.DB, error) {
|
||||||
|
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
|
||||||
|
return sqlx.Connect("postgres", connectStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetTestDB drops all rows in the test db public.blocks table
|
||||||
|
func ResetTestDB(db *sqlx.DB) error {
|
||||||
|
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user