cache prefixes for prefix-based iteration
This commit is contained in:
parent
f5a12a8d3f
commit
147186e529
@ -44,14 +44,14 @@ func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
|
|||||||
// Put inserts the given value into the key-value data store
|
// Put inserts the given value into the key-value data store
|
||||||
// Key is expected to be the keccak256 hash of value
|
// Key is expected to be the keccak256 hash of value
|
||||||
func (b *Batch) Put(key []byte, value []byte) (err error) {
|
func (b *Batch) Put(key []byte, value []byte) (err error) {
|
||||||
dsKey, err := DatastoreKeyFromGethKey(key)
|
dsKey, prefix, err := DatastoreKeyFromGethKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err = b.tx.Exec(putPgStr, dsKey, value); err != nil {
|
if _, err = b.tx.Exec(putPgStr, dsKey, value); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err = b.tx.Exec(putPreimagePgStr, key, dsKey); err != nil {
|
if _, err = b.tx.Exec(putPreimagePgStr, key, dsKey, prefix); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
b.valueSize += len(value)
|
b.valueSize += len(value)
|
||||||
@ -74,6 +74,7 @@ func (b *Batch) ValueSize() int {
|
|||||||
|
|
||||||
// Write satisfies the ethdb.Batch interface
|
// Write satisfies the ethdb.Batch interface
|
||||||
// Write flushes any accumulated data to disk
|
// Write flushes any accumulated data to disk
|
||||||
|
// Reset should be called after every write
|
||||||
func (b *Batch) Write() error {
|
func (b *Batch) Write() error {
|
||||||
if b.tx == nil {
|
if b.tx == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -33,7 +33,7 @@ const (
|
|||||||
hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)"
|
hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)"
|
||||||
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
|
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
|
||||||
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
||||||
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2) ON CONFLICT (eth_key) DO UPDATE SET ipfs_key = $2"
|
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)"
|
||||||
deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1"
|
deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1"
|
||||||
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
||||||
)
|
)
|
||||||
@ -78,7 +78,7 @@ func (d *Database) Get(key []byte) ([]byte, error) {
|
|||||||
// Key is expected to be the keccak256 hash of value
|
// Key is expected to be the keccak256 hash of value
|
||||||
// Put inserts the keccak256 key into the eth.key_preimages table
|
// Put inserts the keccak256 key into the eth.key_preimages table
|
||||||
func (d *Database) Put(key []byte, value []byte) error {
|
func (d *Database) Put(key []byte, value []byte) error {
|
||||||
dsKey, err := DatastoreKeyFromGethKey(key)
|
dsKey, prefix, err := DatastoreKeyFromGethKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -98,7 +98,7 @@ func (d *Database) Put(key []byte, value []byte) error {
|
|||||||
if _, err = tx.Exec(putPgStr, dsKey, value); err != nil {
|
if _, err = tx.Exec(putPgStr, dsKey, value); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = tx.Exec(putPreimagePgStr, key, dsKey)
|
_, err = tx.Exec(putPreimagePgStr, key, dsKey, prefix)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
-- +goose Up
|
-- +goose Up
|
||||||
CREATE TABLE IF NOT EXISTS eth.key_preimages (
|
CREATE TABLE IF NOT EXISTS eth.key_preimages (
|
||||||
eth_key BYTEA UNIQUE NOT NULL,
|
eth_key BYTEA UNIQUE NOT NULL,
|
||||||
|
prefix BYTEA,
|
||||||
ipfs_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
|
ipfs_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -25,7 +25,12 @@ import (
|
|||||||
const (
|
const (
|
||||||
nextPgStr = `SELECT key, data FROM public.blocks
|
nextPgStr = `SELECT key, data FROM public.blocks
|
||||||
INNER JOIN eth.key_preimages ON (ipfs_key = key)
|
INNER JOIN eth.key_preimages ON (ipfs_key = key)
|
||||||
WHERE eth_key > $1 ORDER BY eth_key LIMIT 1`
|
WHERE eth_key > $1
|
||||||
|
ORDER BY eth_key LIMIT 1`
|
||||||
|
nextPgStrWithPrefix = `SELECT key, data FROM public.blocks
|
||||||
|
INNER JOIN eth.key_preimages ON (ipfs_key = key)
|
||||||
|
WHERE eth_key > $1 AND prefix = $2
|
||||||
|
ORDER BY eth_key LIMIT 1`
|
||||||
)
|
)
|
||||||
|
|
||||||
type nextModel struct {
|
type nextModel struct {
|
||||||
@ -54,6 +59,15 @@ func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
|
|||||||
// It returns whether the iterator is exhausted
|
// It returns whether the iterator is exhausted
|
||||||
func (i *Iterator) Next() bool {
|
func (i *Iterator) Next() bool {
|
||||||
next := new(nextModel)
|
next := new(nextModel)
|
||||||
|
if i.prefix != nil {
|
||||||
|
if err := i.db.Get(next, nextPgStrWithPrefix, i.currentKey, i.prefix); err != nil {
|
||||||
|
logrus.Errorf("iterator.Next() error: %v", err)
|
||||||
|
i.currentKey, i.currentValue = nil, nil
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
i.currentKey, i.currentValue = next.Key, next.Value
|
||||||
|
return true
|
||||||
|
}
|
||||||
if err := i.db.Get(next, nextPgStr, i.currentKey); err != nil {
|
if err := i.db.Get(next, nextPgStr, i.currentKey); err != nil {
|
||||||
logrus.Errorf("iterator.Next() error: %v", err)
|
logrus.Errorf("iterator.Next() error: %v", err)
|
||||||
i.currentKey, i.currentValue = nil, nil
|
i.currentKey, i.currentValue = nil, nil
|
||||||
|
@ -37,21 +37,25 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DatastoreKeyFromGethKey returns the public.blocks key from the provided geth key
|
// DatastoreKeyFromGethKey returns the public.blocks key from the provided geth key
|
||||||
func DatastoreKeyFromGethKey(h []byte) (string, error) {
|
// It also returns the key's prefix, if it has one
|
||||||
|
func DatastoreKeyFromGethKey(h []byte) (string, []byte, error) {
|
||||||
keyType, keyComponents := ResolveKeyType(h)
|
keyType, keyComponents := ResolveKeyType(h)
|
||||||
switch keyType {
|
switch keyType {
|
||||||
case Keccak:
|
case Keccak:
|
||||||
return MultihashKeyFromKeccak256(h)
|
mhKey, err := MultihashKeyFromKeccak256(h)
|
||||||
|
return mhKey, nil, err
|
||||||
case Header:
|
case Header:
|
||||||
return MultihashKeyFromKeccak256(keyComponents[1])
|
mhKey, err := MultihashKeyFromKeccak256(keyComponents[1])
|
||||||
|
return mhKey, keyComponents[0], err
|
||||||
case Preimage:
|
case Preimage:
|
||||||
return MultihashKeyFromKeccak256(keyComponents[1])
|
mhKey, err := MultihashKeyFromKeccak256(keyComponents[1])
|
||||||
|
return mhKey, keyComponents[0], err
|
||||||
case Prefixed, Suffixed:
|
case Prefixed, Suffixed:
|
||||||
// This data is not mapped by hash => content by geth, store it using the prefixed/suffixed key directly
|
// This data is not mapped by hash => content by geth, store it using the prefixed/suffixed key directly
|
||||||
// I.e. the public.blocks datastore key == the hex representation of the geth key
|
// I.e. the public.blocks datastore key == the hex representation of the geth key
|
||||||
// Alternatively, decompose the data and derive the hash
|
// Alternatively, decompose the data and derive the hash
|
||||||
return common.Bytes2Hex(h), nil
|
return common.Bytes2Hex(h), keyComponents[0], nil
|
||||||
default:
|
default:
|
||||||
return "", fmt.Errorf("invalid formatting of database key: %x", h)
|
return "", nil, fmt.Errorf("invalid formatting of database key: %x", h)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user