Update geth to postgres-refactor.
This commit is contained in:
parent
cc193fe2ec
commit
0aa1634485
2
go.mod
2
go.mod
@ -19,3 +19,5 @@ require (
|
|||||||
github.com/onsi/gomega v1.10.1
|
github.com/onsi/gomega v1.10.1
|
||||||
google.golang.org/appengine v1.6.7 // indirect
|
google.golang.org/appengine v1.6.7 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
|
replace github.com/ethereum/go-ethereum v1.10.9 => github.com/vulcanize/go-ethereum v0.0.0-20211120204023-ac6ef33f2ad1
|
@ -17,19 +17,21 @@
|
|||||||
package pgipfsethdb
|
package pgipfsethdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
|
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
type Batch struct {
|
type Batch struct {
|
||||||
db *sqlx.DB
|
db sql.Database
|
||||||
tx *sqlx.Tx
|
tx sql.Tx
|
||||||
valueSize int
|
valueSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
||||||
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
|
func NewBatch(db sql.Database, tx sql.Tx) ethdb.Batch {
|
||||||
b := &Batch{
|
b := &Batch{
|
||||||
db: db,
|
db: db,
|
||||||
tx: tx,
|
tx: tx,
|
||||||
@ -48,7 +50,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil {
|
if _, err = b.tx.Exec(context.Background(),putPgStr, mhKey, value); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
b.valueSize += len(value)
|
b.valueSize += len(value)
|
||||||
@ -62,7 +64,7 @@ func (b *Batch) Delete(key []byte) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = b.tx.Exec(deletePgStr, mhKey)
|
_, err = b.tx.Exec(context.Background(),deletePgStr, mhKey)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,7 +81,7 @@ func (b *Batch) Write() error {
|
|||||||
if b.tx == nil {
|
if b.tx == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return b.tx.Commit()
|
return b.tx.Commit(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replay satisfies the ethdb.Batch interface
|
// Replay satisfies the ethdb.Batch interface
|
||||||
@ -93,7 +95,7 @@ func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
|
|||||||
// This should be called after every write
|
// This should be called after every write
|
||||||
func (b *Batch) Reset() {
|
func (b *Batch) Reset() {
|
||||||
var err error
|
var err error
|
||||||
b.tx, err = b.db.Beginx()
|
b.tx, err = b.db.Begin(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ var _ = Describe("Batch", func() {
|
|||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
cacheConfig := pgipfsethdb.CacheConfig{
|
cacheConfig := pgipfsethdb.CacheConfig{
|
||||||
Name: "db",
|
Name: "driver",
|
||||||
Size: 3000000, // 3MB
|
Size: 3000000, // 3MB
|
||||||
ExpiryDuration: time.Hour,
|
ExpiryDuration: time.Hour,
|
||||||
}
|
}
|
||||||
|
@ -20,11 +20,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
"github.com/mailgun/groupcache/v2"
|
"github.com/mailgun/groupcache/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -40,7 +41,7 @@ var (
|
|||||||
|
|
||||||
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
type Database struct {
|
type Database struct {
|
||||||
db *sqlx.DB
|
driver sql.Database
|
||||||
cache *groupcache.Group
|
cache *groupcache.Group
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,16 +56,16 @@ type CacheConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
|
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
|
||||||
func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore {
|
func NewKeyValueStore(db sql.Database, cacheConfig CacheConfig) ethdb.KeyValueStore {
|
||||||
database := Database{db: db}
|
database := Database{driver: db}
|
||||||
database.InitCache(cacheConfig)
|
database.InitCache(cacheConfig)
|
||||||
|
|
||||||
return &database
|
return &database
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDatabase returns a ethdb.Database interface for PG-IPFS
|
// NewDatabase returns a ethdb.Database interface for PG-IPFS
|
||||||
func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) *Database {
|
func NewDatabase(db sql.Database, cacheConfig CacheConfig) *Database {
|
||||||
database := Database{db: db}
|
database := Database{driver: db}
|
||||||
database.InitCache(cacheConfig)
|
database.InitCache(cacheConfig)
|
||||||
|
|
||||||
return &database
|
return &database
|
||||||
@ -101,13 +102,13 @@ func (d *Database) Has(key []byte) (bool, error) {
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
var exists bool
|
var exists bool
|
||||||
return exists, d.db.Get(&exists, hasPgStr, mhKey)
|
return exists, d.driver.Get(context.Background(), &exists, hasPgStr, mhKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get retrieves the given key if it's present in the key-value data store
|
// Get retrieves the given key if it's present in the key-value data store
|
||||||
func (d *Database) dbGet(key string) ([]byte, error) {
|
func (d *Database) dbGet(key string) ([]byte, error) {
|
||||||
var data []byte
|
var data []byte
|
||||||
return data, d.db.Get(&data, getPgStr, key)
|
return data, d.driver.Get(context.Background(),&data, getPgStr, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get satisfies the ethdb.KeyValueReader interface
|
// Get satisfies the ethdb.KeyValueReader interface
|
||||||
@ -133,7 +134,7 @@ func (d *Database) Put(key []byte, value []byte) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = d.db.Exec(putPgStr, mhKey, value)
|
_, err = d.driver.Exec(context.Background(),putPgStr, mhKey, value)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -145,7 +146,7 @@ func (d *Database) Delete(key []byte) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = d.db.Exec(deletePgStr, mhKey)
|
_, err = d.driver.Exec(context.Background(),deletePgStr, mhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -210,23 +211,23 @@ func (d *Database) Stat(property string) (string, error) {
|
|||||||
switch prop {
|
switch prop {
|
||||||
case Size:
|
case Size:
|
||||||
var byteSize string
|
var byteSize string
|
||||||
return byteSize, d.db.Get(&byteSize, dbSizePgStr)
|
return byteSize, d.driver.Get(context.Background(), &byteSize, dbSizePgStr)
|
||||||
case Idle:
|
case Idle:
|
||||||
return string(d.db.Stats().Idle), nil
|
return strconv.FormatInt(d.driver.Stats().Idle(),10), nil
|
||||||
case InUse:
|
case InUse:
|
||||||
return string(d.db.Stats().InUse), nil
|
return strconv.FormatInt(d.driver.Stats().InUse(),10), nil
|
||||||
case MaxIdleClosed:
|
case MaxIdleClosed:
|
||||||
return string(d.db.Stats().MaxIdleClosed), nil
|
return strconv.FormatInt(d.driver.Stats().MaxIdleClosed(),10), nil
|
||||||
case MaxLifetimeClosed:
|
case MaxLifetimeClosed:
|
||||||
return string(d.db.Stats().MaxLifetimeClosed), nil
|
return strconv.FormatInt(d.driver.Stats().MaxLifetimeClosed(),10), nil
|
||||||
case MaxOpenConnections:
|
case MaxOpenConnections:
|
||||||
return string(d.db.Stats().MaxOpenConnections), nil
|
return strconv.FormatInt(d.driver.Stats().MaxOpen(), 10), nil
|
||||||
case OpenConnections:
|
case OpenConnections:
|
||||||
return string(d.db.Stats().OpenConnections), nil
|
return strconv.FormatInt(d.driver.Stats().Open(), 10), nil
|
||||||
case WaitCount:
|
case WaitCount:
|
||||||
return string(d.db.Stats().WaitCount), nil
|
return strconv.FormatInt(d.driver.Stats().WaitCount(),10), nil
|
||||||
case WaitDuration:
|
case WaitDuration:
|
||||||
return d.db.Stats().WaitDuration.String(), nil
|
return d.driver.Stats().WaitDuration().String(), nil
|
||||||
default:
|
default:
|
||||||
return "", fmt.Errorf("unhandled database property")
|
return "", fmt.Errorf("unhandled database property")
|
||||||
}
|
}
|
||||||
@ -239,10 +240,10 @@ func (d *Database) Compact(start []byte, limit []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewBatch satisfies the ethdb.Batcher interface
|
// NewBatch satisfies the ethdb.Batcher interface
|
||||||
// NewBatch creates a write-only database that buffers changes to its host db
|
// NewBatch creates a write-only database that buffers changes to its host driver
|
||||||
// until a final write is called
|
// until a final write is called
|
||||||
func (d *Database) NewBatch() ethdb.Batch {
|
func (d *Database) NewBatch() ethdb.Batch {
|
||||||
return NewBatch(d.db, nil)
|
return NewBatch(d.driver, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewIterator satisfies the ethdb.Iteratee interface
|
// NewIterator satisfies the ethdb.Iteratee interface
|
||||||
@ -253,13 +254,13 @@ func (d *Database) NewBatch() ethdb.Batch {
|
|||||||
// Note: This method assumes that the prefix is NOT part of the start, so there's
|
// Note: This method assumes that the prefix is NOT part of the start, so there's
|
||||||
// no need for the caller to prepend the prefix to the start
|
// no need for the caller to prepend the prefix to the start
|
||||||
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
|
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
|
||||||
return NewIterator(start, prefix, d.db)
|
return NewIterator(start, prefix, d.driver)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close satisfies the io.Closer interface
|
// Close satisfies the io.Closer interface
|
||||||
// Close closes the db connection
|
// Close closes the driver connection
|
||||||
func (d *Database) Close() error {
|
func (d *Database) Close() error {
|
||||||
return d.db.DB.Close()
|
return d.driver.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAncient satisfies the ethdb.AncientReader interface
|
// HasAncient satisfies the ethdb.AncientReader interface
|
||||||
|
@ -46,7 +46,7 @@ var _ = Describe("Database", func() {
|
|||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
cacheConfig := pgipfsethdb.CacheConfig{
|
cacheConfig := pgipfsethdb.CacheConfig{
|
||||||
Name: "db",
|
Name: "driver",
|
||||||
Size: 3000000, // 3MB
|
Size: 3000000, // 3MB
|
||||||
ExpiryDuration: time.Hour,
|
ExpiryDuration: time.Hour,
|
||||||
}
|
}
|
||||||
@ -59,12 +59,12 @@ var _ = Describe("Database", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
Describe("Has", func() {
|
Describe("Has", func() {
|
||||||
It("returns false if a key-pair doesn't exist in the db", func() {
|
It("returns false if a key-pair doesn't exist in the driver", func() {
|
||||||
has, err := database.Has(testEthKey)
|
has, err := database.Has(testEthKey)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(has).ToNot(BeTrue())
|
Expect(has).ToNot(BeTrue())
|
||||||
})
|
})
|
||||||
It("returns true if a key-pair exists in the db", func() {
|
It("returns true if a key-pair exists in the driver", func() {
|
||||||
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
has, err := database.Has(testEthKey)
|
has, err := database.Has(testEthKey)
|
||||||
@ -74,7 +74,7 @@ var _ = Describe("Database", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
Describe("Get", func() {
|
Describe("Get", func() {
|
||||||
It("throws an err if the key-pair doesn't exist in the db", func() {
|
It("throws an err if the key-pair doesn't exist in the driver", func() {
|
||||||
_, err = database.Get(testEthKey)
|
_, err = database.Get(testEthKey)
|
||||||
Expect(err).To(HaveOccurred())
|
Expect(err).To(HaveOccurred())
|
||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
@ -17,8 +17,10 @@
|
|||||||
package pgipfsethdb
|
package pgipfsethdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
|
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
@ -27,15 +29,15 @@ import (
|
|||||||
// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
|
// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
|
||||||
// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
|
// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
|
||||||
type Iterator struct {
|
type Iterator struct {
|
||||||
db *sqlx.DB
|
db sql.Database
|
||||||
currentKey, prefix []byte
|
currentKey, prefix []byte
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
|
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
|
||||||
func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
|
func NewIterator(start, prefix []byte, driver sql.Database) ethdb.Iterator {
|
||||||
return &Iterator{
|
return &Iterator{
|
||||||
db: db,
|
db: driver,
|
||||||
prefix: prefix,
|
prefix: prefix,
|
||||||
currentKey: start,
|
currentKey: start,
|
||||||
}
|
}
|
||||||
@ -45,7 +47,7 @@ func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
|
|||||||
// Next moves the iterator to the next key/value pair
|
// Next moves the iterator to the next key/value pair
|
||||||
// It returns whether the iterator is exhausted
|
// It returns whether the iterator is exhausted
|
||||||
func (i *Iterator) Next() bool {
|
func (i *Iterator) Next() bool {
|
||||||
// this is complicated by the ipfs db keys not being the keccak256 hashes
|
// this is complicated by the ipfs driver keys not being the keccak256 hashes
|
||||||
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
|
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
|
||||||
panic("implement me: Next")
|
panic("implement me: Next")
|
||||||
}
|
}
|
||||||
@ -76,7 +78,7 @@ func (i *Iterator) Value() []byte {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var data []byte
|
var data []byte
|
||||||
i.err = i.db.Get(&data, getPgStr, mhKey)
|
i.err = i.db.Get(context.Background(),&data, getPgStr, mhKey)
|
||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
"github.com/multiformats/go-multihash"
|
"github.com/multiformats/go-multihash"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string
|
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash driver key string
|
||||||
func MultihashKeyFromKeccak256(h []byte) (string, error) {
|
func MultihashKeyFromKeccak256(h []byte) (string, error) {
|
||||||
mh, err := multihash.Encode(h, multihash.KECCAK_256)
|
mh, err := multihash.Encode(h, multihash.KECCAK_256)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -36,13 +36,13 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) {
|
|||||||
|
|
||||||
// TestDB connect to the testing database
|
// TestDB connect to the testing database
|
||||||
// it assumes the database has the IPFS public.blocks table present
|
// it assumes the database has the IPFS public.blocks table present
|
||||||
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
|
// DO NOT use a production driver for the test driver, as it will remove all contents of the public.blocks table
|
||||||
func TestDB() (*sqlx.DB, error) {
|
func TestDB() (*sqlx.DB, error) {
|
||||||
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
|
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
|
||||||
return sqlx.Connect("postgres", connectStr)
|
return sqlx.Connect("postgres", connectStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResetTestDB drops all rows in the test db public.blocks table
|
// ResetTestDB drops all rows in the test driver public.blocks table
|
||||||
func ResetTestDB(db *sqlx.DB) error {
|
func ResetTestDB(db *sqlx.DB) error {
|
||||||
_, err := db.Exec("TRUNCATE public.blocks")
|
_, err := db.Exec("TRUNCATE public.blocks")
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user