Add block number while making inserts

This commit is contained in:
Prathamesh Musale 2022-05-09 14:50:14 +05:30
parent a6b49f89f4
commit 08bc2f87ab
4 changed files with 36 additions and 24 deletions

View File

@ -17,6 +17,8 @@
package pgipfsethdb package pgipfsethdb
import ( import (
"math/big"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
@ -28,13 +30,16 @@ type Batch struct {
db *sqlx.DB db *sqlx.DB
tx *sqlx.Tx tx *sqlx.Tx
valueSize int valueSize int
blockNumber *big.Int
} }
// NewBatch returns a ethdb.Batch interface for PG-IPFS // NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch { func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
b := &Batch{ b := &Batch{
db: db, db: db,
tx: tx, tx: tx,
blockNumber: blockNumber,
} }
if tx == nil { if tx == nil {
b.Reset() b.Reset()
@ -50,7 +55,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
if err != nil { if err != nil {
return err return err
} }
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil { if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil {
return err return err
} }
b.valueSize += len(value) b.valueSize += len(value)

View File

@ -49,6 +49,11 @@ var _ = Describe("Batch", func() {
} }
database = pgipfsethdb.NewDatabase(db, cacheConfig) database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
batch = database.NewBatch() batch = database.NewBatch()
}) })
AfterEach(func() { AfterEach(func() {

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/big"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -33,14 +34,8 @@ var errNotSupported = errors.New("this operation is not supported")
var ( var (
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)" hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)"
// TODO Fix the get query to accomodate block_number field
// Using LIMIT 1 for now
getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1" getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
// TODO Fix the put query to accomodate block_number field
// block_number has been put as 1 always for now.
putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, 1) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM public.blocks WHERE key = $1" deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())" dbSizePgStr = "SELECT pg_database_size(current_database())"
) )
@ -51,6 +46,8 @@ var _ ethdb.Database = &Database{}
type Database struct { type Database struct {
db *sqlx.DB db *sqlx.DB
cache *groupcache.Group cache *groupcache.Group
BlockNumber *big.Int
} }
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
@ -142,7 +139,7 @@ func (d *Database) Put(key []byte, value []byte) error {
if err != nil { if err != nil {
return err return err
} }
_, err = d.db.Exec(putPgStr, mhKey, value) _, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64())
return err return err
} }
@ -251,7 +248,7 @@ func (d *Database) Compact(start []byte, limit []byte) error {
// NewBatch creates a write-only database that buffers changes to its host db // NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called // until a final write is called
func (d *Database) NewBatch() ethdb.Batch { func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil) return NewBatch(d.db, nil, d.BlockNumber)
} }
// NewBatchWithSize satisfies the ethdb.Batcher interface. // NewBatchWithSize satisfies the ethdb.Batcher interface.

View File

@ -35,7 +35,8 @@ var (
database ethdb.Database database ethdb.Database
db *sqlx.DB db *sqlx.DB
err error err error
testHeader = types.Header{Number: big.NewInt(1337)} testBlockNumber = big.NewInt(1337)
testHeader = types.Header{Number: testBlockNumber}
testValue, _ = rlp.EncodeToBytes(testHeader) testValue, _ = rlp.EncodeToBytes(testHeader)
testEthKey = testHeader.Hash().Bytes() testEthKey = testHeader.Hash().Bytes()
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey) testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
@ -53,6 +54,10 @@ var _ = Describe("Database", func() {
} }
database = pgipfsethdb.NewDatabase(db, cacheConfig) database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
}) })
AfterEach(func() { AfterEach(func() {
groupcache.DeregisterGroup("db") groupcache.DeregisterGroup("db")
@ -67,7 +72,7 @@ var _ = Describe("Database", func() {
Expect(has).ToNot(BeTrue()) Expect(has).ToNot(BeTrue())
}) })
It("returns true if a key-pair exists in the db", func() { It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, 1)", testMhKey, testValue) _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testEthKey) has, err := database.Has(testEthKey)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -82,7 +87,7 @@ var _ = Describe("Database", func() {
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
}) })
It("returns the value associated with the key, if the pair exists", func() { It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, 1)", testMhKey, testValue) _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testEthKey) val, err := database.Get(testEthKey)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())