Small refactor.
This commit is contained in:
parent
0aa1634485
commit
81961e1c63
@ -23,6 +23,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethdb"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
@ -38,11 +39,11 @@ var (
|
||||
|
||||
var _ = Describe("Batch", func() {
|
||||
BeforeEach(func() {
|
||||
db, err = pgipfsethdb.TestDB()
|
||||
db, err := postgres.SetupPGXDB()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
cacheConfig := pgipfsethdb.CacheConfig{
|
||||
Name: "driver",
|
||||
Name: "db",
|
||||
Size: 3000000, // 3MB
|
||||
ExpiryDuration: time.Hour,
|
||||
}
|
||||
|
@ -41,8 +41,8 @@ var (
|
||||
|
||||
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
||||
type Database struct {
|
||||
driver sql.Database
|
||||
cache *groupcache.Group
|
||||
db sql.Database
|
||||
cache *groupcache.Group
|
||||
}
|
||||
|
||||
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
|
||||
@ -57,7 +57,7 @@ type CacheConfig struct {
|
||||
|
||||
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
|
||||
func NewKeyValueStore(db sql.Database, cacheConfig CacheConfig) ethdb.KeyValueStore {
|
||||
database := Database{driver: db}
|
||||
database := Database{db: db}
|
||||
database.InitCache(cacheConfig)
|
||||
|
||||
return &database
|
||||
@ -65,7 +65,7 @@ func NewKeyValueStore(db sql.Database, cacheConfig CacheConfig) ethdb.KeyValueSt
|
||||
|
||||
// NewDatabase returns a ethdb.Database interface for PG-IPFS
|
||||
func NewDatabase(db sql.Database, cacheConfig CacheConfig) *Database {
|
||||
database := Database{driver: db}
|
||||
database := Database{db: db}
|
||||
database.InitCache(cacheConfig)
|
||||
|
||||
return &database
|
||||
@ -102,13 +102,13 @@ func (d *Database) Has(key []byte) (bool, error) {
|
||||
return false, err
|
||||
}
|
||||
var exists bool
|
||||
return exists, d.driver.Get(context.Background(), &exists, hasPgStr, mhKey)
|
||||
return exists, d.db.Get(context.Background(), &exists, hasPgStr, mhKey)
|
||||
}
|
||||
|
||||
// Get retrieves the given key if it's present in the key-value data store
|
||||
func (d *Database) dbGet(key string) ([]byte, error) {
|
||||
var data []byte
|
||||
return data, d.driver.Get(context.Background(),&data, getPgStr, key)
|
||||
return data, d.db.Get(context.Background(),&data, getPgStr, key)
|
||||
}
|
||||
|
||||
// Get satisfies the ethdb.KeyValueReader interface
|
||||
@ -134,7 +134,7 @@ func (d *Database) Put(key []byte, value []byte) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = d.driver.Exec(context.Background(),putPgStr, mhKey, value)
|
||||
_, err = d.db.Exec(context.Background(),putPgStr, mhKey, value)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -146,7 +146,7 @@ func (d *Database) Delete(key []byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = d.driver.Exec(context.Background(),deletePgStr, mhKey)
|
||||
_, err = d.db.Exec(context.Background(),deletePgStr, mhKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -211,23 +211,23 @@ func (d *Database) Stat(property string) (string, error) {
|
||||
switch prop {
|
||||
case Size:
|
||||
var byteSize string
|
||||
return byteSize, d.driver.Get(context.Background(), &byteSize, dbSizePgStr)
|
||||
return byteSize, d.db.Get(context.Background(), &byteSize, dbSizePgStr)
|
||||
case Idle:
|
||||
return strconv.FormatInt(d.driver.Stats().Idle(),10), nil
|
||||
return strconv.FormatInt(d.db.Stats().Idle(),10), nil
|
||||
case InUse:
|
||||
return strconv.FormatInt(d.driver.Stats().InUse(),10), nil
|
||||
return strconv.FormatInt(d.db.Stats().InUse(),10), nil
|
||||
case MaxIdleClosed:
|
||||
return strconv.FormatInt(d.driver.Stats().MaxIdleClosed(),10), nil
|
||||
return strconv.FormatInt(d.db.Stats().MaxIdleClosed(),10), nil
|
||||
case MaxLifetimeClosed:
|
||||
return strconv.FormatInt(d.driver.Stats().MaxLifetimeClosed(),10), nil
|
||||
return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed(),10), nil
|
||||
case MaxOpenConnections:
|
||||
return strconv.FormatInt(d.driver.Stats().MaxOpen(), 10), nil
|
||||
return strconv.FormatInt(d.db.Stats().MaxOpen(), 10), nil
|
||||
case OpenConnections:
|
||||
return strconv.FormatInt(d.driver.Stats().Open(), 10), nil
|
||||
return strconv.FormatInt(d.db.Stats().Open(), 10), nil
|
||||
case WaitCount:
|
||||
return strconv.FormatInt(d.driver.Stats().WaitCount(),10), nil
|
||||
return strconv.FormatInt(d.db.Stats().WaitCount(),10), nil
|
||||
case WaitDuration:
|
||||
return d.driver.Stats().WaitDuration().String(), nil
|
||||
return d.db.Stats().WaitDuration().String(), nil
|
||||
default:
|
||||
return "", fmt.Errorf("unhandled database property")
|
||||
}
|
||||
@ -240,10 +240,10 @@ func (d *Database) Compact(start []byte, limit []byte) error {
|
||||
}
|
||||
|
||||
// NewBatch satisfies the ethdb.Batcher interface
|
||||
// NewBatch creates a write-only database that buffers changes to its host driver
|
||||
// NewBatch creates a write-only database that buffers changes to its host db
|
||||
// until a final write is called
|
||||
func (d *Database) NewBatch() ethdb.Batch {
|
||||
return NewBatch(d.driver, nil)
|
||||
return NewBatch(d.db, nil)
|
||||
}
|
||||
|
||||
// NewIterator satisfies the ethdb.Iteratee interface
|
||||
@ -254,13 +254,13 @@ func (d *Database) NewBatch() ethdb.Batch {
|
||||
// Note: This method assumes that the prefix is NOT part of the start, so there's
|
||||
// no need for the caller to prepend the prefix to the start
|
||||
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
|
||||
return NewIterator(start, prefix, d.driver)
|
||||
return NewIterator(start, prefix, d.db)
|
||||
}
|
||||
|
||||
// Close satisfies the io.Closer interface
|
||||
// Close closes the driver connection
|
||||
// Close closes the db connection
|
||||
func (d *Database) Close() error {
|
||||
return d.driver.Close()
|
||||
return d.db.Close()
|
||||
}
|
||||
|
||||
// HasAncient satisfies the ethdb.AncientReader interface
|
||||
|
@ -46,7 +46,7 @@ var _ = Describe("Database", func() {
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
cacheConfig := pgipfsethdb.CacheConfig{
|
||||
Name: "driver",
|
||||
Name: "db",
|
||||
Size: 3000000, // 3MB
|
||||
ExpiryDuration: time.Hour,
|
||||
}
|
||||
@ -59,12 +59,12 @@ var _ = Describe("Database", func() {
|
||||
})
|
||||
|
||||
Describe("Has", func() {
|
||||
It("returns false if a key-pair doesn't exist in the driver", func() {
|
||||
It("returns false if a key-pair doesn't exist in the db", func() {
|
||||
has, err := database.Has(testEthKey)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(has).ToNot(BeTrue())
|
||||
})
|
||||
It("returns true if a key-pair exists in the driver", func() {
|
||||
It("returns true if a key-pair exists in the db", func() {
|
||||
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
has, err := database.Has(testEthKey)
|
||||
@ -74,7 +74,7 @@ var _ = Describe("Database", func() {
|
||||
})
|
||||
|
||||
Describe("Get", func() {
|
||||
It("throws an err if the key-pair doesn't exist in the driver", func() {
|
||||
It("throws an err if the key-pair doesn't exist in the db", func() {
|
||||
_, err = database.Get(testEthKey)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||
|
@ -47,7 +47,7 @@ func NewIterator(start, prefix []byte, driver sql.Database) ethdb.Iterator {
|
||||
// Next moves the iterator to the next key/value pair
|
||||
// It returns whether the iterator is exhausted
|
||||
func (i *Iterator) Next() bool {
|
||||
// this is complicated by the ipfs driver keys not being the keccak256 hashes
|
||||
// this is complicated by the ipfs db keys not being the keccak256 hashes
|
||||
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
|
||||
panic("implement me: Next")
|
||||
}
|
||||
|
@ -20,11 +20,11 @@ import (
|
||||
"github.com/ipfs/go-ipfs-blockstore"
|
||||
"github.com/ipfs/go-ipfs-ds-help"
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/lib/pq" //postgres driver
|
||||
_ "github.com/lib/pq" //postgres db
|
||||
"github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash driver key string
|
||||
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string
|
||||
func MultihashKeyFromKeccak256(h []byte) (string, error) {
|
||||
mh, err := multihash.Encode(h, multihash.KECCAK_256)
|
||||
if err != nil {
|
||||
@ -36,13 +36,13 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) {
|
||||
|
||||
// TestDB connect to the testing database
|
||||
// it assumes the database has the IPFS public.blocks table present
|
||||
// DO NOT use a production driver for the test driver, as it will remove all contents of the public.blocks table
|
||||
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
|
||||
func TestDB() (*sqlx.DB, error) {
|
||||
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
|
||||
return sqlx.Connect("postgres", connectStr)
|
||||
}
|
||||
|
||||
// ResetTestDB drops all rows in the test driver public.blocks table
|
||||
// ResetTestDB drops all rows in the test db public.blocks table
|
||||
func ResetTestDB(db *sqlx.DB) error {
|
||||
_, err := db.Exec("TRUNCATE public.blocks")
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user