Merge pull request #33 from deep-stack/pm-v4-schema-changes

Updates to use v4 schema
This commit is contained in:
Ashwin Phatak 2022-05-09 16:30:35 +05:30 committed by GitHub
commit a19b47d67c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 46 additions and 28 deletions

View File

@ -34,7 +34,7 @@ Ancient interfaces are used for Ancient/frozen data operations (e.g. rawdb/table
Outside of these primarily auxiliary capabilities, this package satisfies the interfaces required for many state operations using Ethereum data on IPFS. Outside of these primarily auxiliary capabilities, this package satisfies the interfaces required for many state operations using Ethereum data on IPFS.
e.g. e.g.
go-ethereum trie.NodeIterator and state.NodeIterator can be constructed from the ethdb.KeyValueStore and ethdb.Database interfaces, respectively: go-ethereum trie.NodeIterator and state.NodeIterator can be constructed from the ethdb.KeyValueStore and ethdb.Database interfaces, respectively:
```go ```go
@ -42,7 +42,7 @@ package main
import ( import (
"context" "context"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
@ -50,7 +50,7 @@ import (
"github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo/fsrepo" "github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/vulcanize/ipfs-ethdb" "github.com/vulcanize/ipfs-ethdb/v4"
) )
func main() { func main() {

View File

@ -25,7 +25,7 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
ipfsethdb "github.com/vulcanize/ipfs-ethdb/v3" ipfsethdb "github.com/vulcanize/ipfs-ethdb/v4"
) )
var ( var (

View File

@ -26,7 +26,7 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
ipfsethdb "github.com/vulcanize/ipfs-ethdb/v3" ipfsethdb "github.com/vulcanize/ipfs-ethdb/v4"
) )
var ( var (

2
go.mod
View File

@ -1,4 +1,4 @@
module github.com/vulcanize/ipfs-ethdb/v3 module github.com/vulcanize/ipfs-ethdb/v4
go 1.15 go 1.15

View File

@ -17,6 +17,8 @@
package pgipfsethdb package pgipfsethdb
import ( import (
"math/big"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
@ -28,13 +30,16 @@ type Batch struct {
db *sqlx.DB db *sqlx.DB
tx *sqlx.Tx tx *sqlx.Tx
valueSize int valueSize int
blockNumber *big.Int
} }
// NewBatch returns a ethdb.Batch interface for PG-IPFS // NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch { func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
b := &Batch{ b := &Batch{
db: db, db: db,
tx: tx, tx: tx,
blockNumber: blockNumber,
} }
if tx == nil { if tx == nil {
b.Reset() b.Reset()
@ -50,7 +55,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
if err != nil { if err != nil {
return err return err
} }
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil { if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil {
return err return err
} }
b.valueSize += len(value) b.valueSize += len(value)

View File

@ -27,7 +27,7 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v3/postgres" pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v4/postgres"
) )
var ( var (
@ -49,6 +49,11 @@ var _ = Describe("Batch", func() {
} }
database = pgipfsethdb.NewDatabase(db, cacheConfig) database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
batch = database.NewBatch() batch = database.NewBatch()
}) })
AfterEach(func() { AfterEach(func() {

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/big"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -32,9 +33,9 @@ import (
var errNotSupported = errors.New("this operation is not supported") var errNotSupported = errors.New("this operation is not supported")
var ( var (
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)" hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM public.blocks WHERE key = $1" getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING" putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM public.blocks WHERE key = $1" deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())" dbSizePgStr = "SELECT pg_database_size(current_database())"
) )
@ -45,6 +46,8 @@ var _ ethdb.Database = &Database{}
type Database struct { type Database struct {
db *sqlx.DB db *sqlx.DB
cache *groupcache.Group cache *groupcache.Group
BlockNumber *big.Int
} }
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
@ -136,7 +139,7 @@ func (d *Database) Put(key []byte, value []byte) error {
if err != nil { if err != nil {
return err return err
} }
_, err = d.db.Exec(putPgStr, mhKey, value) _, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64())
return err return err
} }
@ -245,13 +248,13 @@ func (d *Database) Compact(start []byte, limit []byte) error {
// NewBatch creates a write-only database that buffers changes to its host db // NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called // until a final write is called
func (d *Database) NewBatch() ethdb.Batch { func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil) return NewBatch(d.db, nil, d.BlockNumber)
} }
// NewBatchWithSize satisfies the ethdb.Batcher interface. // NewBatchWithSize satisfies the ethdb.Batcher interface.
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer. // NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (d *Database) NewBatchWithSize(size int) ethdb.Batch { func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
return NewBatch(d.db, nil) return NewBatch(d.db, nil, d.BlockNumber)
} }
// NewIterator satisfies the ethdb.Iteratee interface // NewIterator satisfies the ethdb.Iteratee interface

View File

@ -28,17 +28,18 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v3/postgres" pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v4/postgres"
) )
var ( var (
database ethdb.Database database ethdb.Database
db *sqlx.DB db *sqlx.DB
err error err error
testHeader = types.Header{Number: big.NewInt(1337)} testBlockNumber = big.NewInt(1337)
testValue, _ = rlp.EncodeToBytes(testHeader) testHeader = types.Header{Number: testBlockNumber}
testEthKey = testHeader.Hash().Bytes() testValue, _ = rlp.EncodeToBytes(testHeader)
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey) testEthKey = testHeader.Hash().Bytes()
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
) )
var _ = Describe("Database", func() { var _ = Describe("Database", func() {
@ -53,6 +54,10 @@ var _ = Describe("Database", func() {
} }
database = pgipfsethdb.NewDatabase(db, cacheConfig) database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
}) })
AfterEach(func() { AfterEach(func() {
groupcache.DeregisterGroup("db") groupcache.DeregisterGroup("db")
@ -67,7 +72,7 @@ var _ = Describe("Database", func() {
Expect(has).ToNot(BeTrue()) Expect(has).ToNot(BeTrue())
}) })
It("returns true if a key-pair exists in the db", func() { It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testEthKey) has, err := database.Has(testEthKey)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -82,7 +87,7 @@ var _ = Describe("Database", func() {
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
}) })
It("returns the value associated with the key, if the pair exists", func() { It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testEthKey) val, err := database.Get(testEthKey)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -18,7 +18,7 @@ import (
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/vulcanize/ipfs-ethdb/v3/postgres" "github.com/vulcanize/ipfs-ethdb/v4/postgres"
) )
func main() { func main() {