From 0aa16344859a0c1b219923f9d1b0230f292d58b2 Mon Sep 17 00:00:00 2001 From: Arijit Das Date: Thu, 25 Nov 2021 11:38:29 +0530 Subject: [PATCH] Update geth to postgres-refactor. --- go.mod | 2 ++ postgres/batch.go | 18 ++++++++------ postgres/batch_test.go | 2 +- postgres/database.go | 51 ++++++++++++++++++++------------------- postgres/database_test.go | 8 +++--- postgres/iterator.go | 14 ++++++----- postgres/util.go | 6 ++--- 7 files changed, 54 insertions(+), 47 deletions(-) diff --git a/go.mod b/go.mod index 91c6896..22a19e4 100644 --- a/go.mod +++ b/go.mod @@ -19,3 +19,5 @@ require ( github.com/onsi/gomega v1.10.1 google.golang.org/appengine v1.6.7 // indirect ) + +replace github.com/ethereum/go-ethereum v1.10.9 => github.com/vulcanize/go-ethereum v0.0.0-20211120204023-ac6ef33f2ad1 \ No newline at end of file diff --git a/postgres/batch.go b/postgres/batch.go index d023868..4e540d2 100644 --- a/postgres/batch.go +++ b/postgres/batch.go @@ -17,19 +17,21 @@ package pgipfsethdb import ( + "context" + "github.com/ethereum/go-ethereum/ethdb" - "github.com/jmoiron/sqlx" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" ) // Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection type Batch struct { - db *sqlx.DB - tx *sqlx.Tx + db sql.Database + tx sql.Tx valueSize int } // NewBatch returns a ethdb.Batch interface for PG-IPFS -func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch { +func NewBatch(db sql.Database, tx sql.Tx) ethdb.Batch { b := &Batch{ db: db, tx: tx, @@ -48,7 +50,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) { if err != nil { return err } - if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil { + if _, err = b.tx.Exec(context.Background(),putPgStr, mhKey, value); err != nil { return err } b.valueSize += len(value) @@ -62,7 +64,7 @@ func (b *Batch) Delete(key []byte) (err error) { if err != nil { return err } - _, err = b.tx.Exec(deletePgStr, mhKey) + _, err = b.tx.Exec(context.Background(),deletePgStr, mhKey) return err } @@ -79,7 +81,7 @@ func (b *Batch) Write() error { if b.tx == nil { return nil } - return b.tx.Commit() + return b.tx.Commit(context.Background()) } // Replay satisfies the ethdb.Batch interface @@ -93,7 +95,7 @@ func (b *Batch) Replay(w ethdb.KeyValueWriter) error { // This should be called after every write func (b *Batch) Reset() { var err error - b.tx, err = b.db.Beginx() + b.tx, err = b.db.Begin(context.Background()) if err != nil { panic(err) } diff --git a/postgres/batch_test.go b/postgres/batch_test.go index 54bbd9e..a99422f 100644 --- a/postgres/batch_test.go +++ b/postgres/batch_test.go @@ -42,7 +42,7 @@ var _ = Describe("Batch", func() { Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ - Name: "db", + Name: "driver", Size: 3000000, // 3MB ExpiryDuration: time.Hour, } diff --git a/postgres/database.go b/postgres/database.go index d7a861b..3f58218 100644 --- a/postgres/database.go +++ b/postgres/database.go @@ -20,11 +20,12 @@ import ( "context" "errors" "fmt" + "strconv" "strings" "time" "github.com/ethereum/go-ethereum/ethdb" - "github.com/jmoiron/sqlx" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/mailgun/groupcache/v2" ) @@ -40,8 +41,8 @@ var ( // Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection type Database struct { - db *sqlx.DB - cache *groupcache.Group + driver sql.Database + cache *groupcache.Group } func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { @@ -55,16 +56,16 @@ type CacheConfig struct { } // NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS -func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore { - database := Database{db: db} +func NewKeyValueStore(db sql.Database, cacheConfig CacheConfig) ethdb.KeyValueStore { + database := Database{driver: db} database.InitCache(cacheConfig) return &database } // NewDatabase returns a ethdb.Database interface for PG-IPFS -func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) *Database { - database := Database{db: db} +func NewDatabase(db sql.Database, cacheConfig CacheConfig) *Database { + database := Database{driver: db} database.InitCache(cacheConfig) return &database @@ -101,13 +102,13 @@ func (d *Database) Has(key []byte) (bool, error) { return false, err } var exists bool - return exists, d.db.Get(&exists, hasPgStr, mhKey) + return exists, d.driver.Get(context.Background(), &exists, hasPgStr, mhKey) } // Get retrieves the given key if it's present in the key-value data store func (d *Database) dbGet(key string) ([]byte, error) { var data []byte - return data, d.db.Get(&data, getPgStr, key) + return data, d.driver.Get(context.Background(),&data, getPgStr, key) } // Get satisfies the ethdb.KeyValueReader interface @@ -133,7 +134,7 @@ func (d *Database) Put(key []byte, value []byte) error { if err != nil { return err } - _, err = d.db.Exec(putPgStr, mhKey, value) + _, err = d.driver.Exec(context.Background(),putPgStr, mhKey, value) return err } @@ -145,7 +146,7 @@ func (d *Database) Delete(key []byte) error { return err } - _, err = d.db.Exec(deletePgStr, mhKey) + _, err = d.driver.Exec(context.Background(),deletePgStr, mhKey) if err != nil { return err } @@ -210,23 +211,23 @@ func (d *Database) Stat(property string) (string, error) { switch prop { case Size: var byteSize string - return byteSize, d.db.Get(&byteSize, dbSizePgStr) + return byteSize, d.driver.Get(context.Background(), &byteSize, dbSizePgStr) case Idle: - return string(d.db.Stats().Idle), nil + return strconv.FormatInt(d.driver.Stats().Idle(),10), nil case InUse: - return string(d.db.Stats().InUse), nil + return strconv.FormatInt(d.driver.Stats().InUse(),10), nil case MaxIdleClosed: - return string(d.db.Stats().MaxIdleClosed), nil + return strconv.FormatInt(d.driver.Stats().MaxIdleClosed(),10), nil case MaxLifetimeClosed: - return string(d.db.Stats().MaxLifetimeClosed), nil + return strconv.FormatInt(d.driver.Stats().MaxLifetimeClosed(),10), nil case MaxOpenConnections: - return string(d.db.Stats().MaxOpenConnections), nil + return strconv.FormatInt(d.driver.Stats().MaxOpen(), 10), nil case OpenConnections: - return string(d.db.Stats().OpenConnections), nil + return strconv.FormatInt(d.driver.Stats().Open(), 10), nil case WaitCount: - return string(d.db.Stats().WaitCount), nil + return strconv.FormatInt(d.driver.Stats().WaitCount(),10), nil case WaitDuration: - return d.db.Stats().WaitDuration.String(), nil + return d.driver.Stats().WaitDuration().String(), nil default: return "", fmt.Errorf("unhandled database property") } @@ -239,10 +240,10 @@ func (d *Database) Compact(start []byte, limit []byte) error { } // NewBatch satisfies the ethdb.Batcher interface -// NewBatch creates a write-only database that buffers changes to its host db +// NewBatch creates a write-only database that buffers changes to its host driver // until a final write is called func (d *Database) NewBatch() ethdb.Batch { - return NewBatch(d.db, nil) + return NewBatch(d.driver, nil) } // NewIterator satisfies the ethdb.Iteratee interface @@ -253,13 +254,13 @@ func (d *Database) NewBatch() ethdb.Batch { // Note: This method assumes that the prefix is NOT part of the start, so there's // no need for the caller to prepend the prefix to the start func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator { - return NewIterator(start, prefix, d.db) + return NewIterator(start, prefix, d.driver) } // Close satisfies the io.Closer interface -// Close closes the db connection +// Close closes the driver connection func (d *Database) Close() error { - return d.db.DB.Close() + return d.driver.Close() } // HasAncient satisfies the ethdb.AncientReader interface diff --git a/postgres/database_test.go b/postgres/database_test.go index 9e407e7..253a1b5 100644 --- a/postgres/database_test.go +++ b/postgres/database_test.go @@ -46,7 +46,7 @@ var _ = Describe("Database", func() { Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ - Name: "db", + Name: "driver", Size: 3000000, // 3MB ExpiryDuration: time.Hour, } @@ -59,12 +59,12 @@ var _ = Describe("Database", func() { }) Describe("Has", func() { - It("returns false if a key-pair doesn't exist in the db", func() { + It("returns false if a key-pair doesn't exist in the driver", func() { has, err := database.Has(testEthKey) Expect(err).ToNot(HaveOccurred()) Expect(has).ToNot(BeTrue()) }) - It("returns true if a key-pair exists in the db", func() { + It("returns true if a key-pair exists in the driver", func() { _, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) Expect(err).ToNot(HaveOccurred()) has, err := database.Has(testEthKey) @@ -74,7 +74,7 @@ var _ = Describe("Database", func() { }) Describe("Get", func() { - It("throws an err if the key-pair doesn't exist in the db", func() { + It("throws an err if the key-pair doesn't exist in the driver", func() { _, err = database.Get(testEthKey) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) diff --git a/postgres/iterator.go b/postgres/iterator.go index f673729..d5f8c5a 100644 --- a/postgres/iterator.go +++ b/postgres/iterator.go @@ -17,8 +17,10 @@ package pgipfsethdb import ( + "context" + "github.com/ethereum/go-ethereum/ethdb" - "github.com/jmoiron/sqlx" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" ) // Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection @@ -27,15 +29,15 @@ import ( // This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed // from the ethdb.KeyValueStoreand ethdb.Database interfaces) type Iterator struct { - db *sqlx.DB + db sql.Database currentKey, prefix []byte err error } // NewIterator returns an ethdb.Iterator interface for PG-IPFS -func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator { +func NewIterator(start, prefix []byte, driver sql.Database) ethdb.Iterator { return &Iterator{ - db: db, + db: driver, prefix: prefix, currentKey: start, } @@ -45,7 +47,7 @@ func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator { // Next moves the iterator to the next key/value pair // It returns whether the iterator is exhausted func (i *Iterator) Next() bool { - // this is complicated by the ipfs db keys not being the keccak256 hashes + // this is complicated by the ipfs driver keys not being the keccak256 hashes // go-ethereum usage of this method expects the iteration to occur over keccak256 keys panic("implement me: Next") } @@ -76,7 +78,7 @@ func (i *Iterator) Value() []byte { return nil } var data []byte - i.err = i.db.Get(&data, getPgStr, mhKey) + i.err = i.db.Get(context.Background(),&data, getPgStr, mhKey) return data } diff --git a/postgres/util.go b/postgres/util.go index cdd818f..c533b45 100644 --- a/postgres/util.go +++ b/postgres/util.go @@ -24,7 +24,7 @@ import ( "github.com/multiformats/go-multihash" ) -// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string +// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash driver key string func MultihashKeyFromKeccak256(h []byte) (string, error) { mh, err := multihash.Encode(h, multihash.KECCAK_256) if err != nil { @@ -36,13 +36,13 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) { // TestDB connect to the testing database // it assumes the database has the IPFS public.blocks table present -// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table +// DO NOT use a production driver for the test driver, as it will remove all contents of the public.blocks table func TestDB() (*sqlx.DB, error) { connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" return sqlx.Connect("postgres", connectStr) } -// ResetTestDB drops all rows in the test db public.blocks table +// ResetTestDB drops all rows in the test driver public.blocks table func ResetTestDB(db *sqlx.DB) error { _, err := db.Exec("TRUNCATE public.blocks") return err