diff --git a/postgres/batch.go b/postgres/batch.go index 838f857..8e38292 100644 --- a/postgres/batch.go +++ b/postgres/batch.go @@ -44,14 +44,14 @@ func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch { // Put inserts the given value into the key-value data store // Key is expected to be the keccak256 hash of value func (b *Batch) Put(key []byte, value []byte) (err error) { - dsKey, err := DatastoreKeyFromGethKey(key) + dsKey, prefix, err := DatastoreKeyFromGethKey(key) if err != nil { return err } if _, err = b.tx.Exec(putPgStr, dsKey, value); err != nil { return err } - if _, err = b.tx.Exec(putPreimagePgStr, key, dsKey); err != nil { + if _, err = b.tx.Exec(putPreimagePgStr, key, dsKey, prefix); err != nil { return err } b.valueSize += len(value) @@ -74,6 +74,7 @@ func (b *Batch) ValueSize() int { // Write satisfies the ethdb.Batch interface // Write flushes any accumulated data to disk +// Reset should be called after every write func (b *Batch) Write() error { if b.tx == nil { return nil diff --git a/postgres/database.go b/postgres/database.go index 0774bd3..6e661db 100644 --- a/postgres/database.go +++ b/postgres/database.go @@ -33,7 +33,7 @@ const ( hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)" getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1" putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING" - putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key) VALUES ($1, $2) ON CONFLICT (eth_key) DO UPDATE SET ipfs_key = $2" + putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)" deletePgStr = "DELETE FROM public.blocks USING eth.key_preimages WHERE ipfs_key = blocks.key AND eth_key = $1" dbSizePgStr = "SELECT pg_database_size(current_database())" ) @@ -78,7 +78,7 @@ func (d *Database) Get(key []byte) ([]byte, error) { // Key is expected to be the keccak256 hash of value // Put inserts the keccak256 key into the eth.key_preimages table func (d *Database) Put(key []byte, value []byte) error { - dsKey, err := DatastoreKeyFromGethKey(key) + dsKey, prefix, err := DatastoreKeyFromGethKey(key) if err != nil { return err } @@ -98,7 +98,7 @@ func (d *Database) Put(key []byte, value []byte) error { if _, err = tx.Exec(putPgStr, dsKey, value); err != nil { return err } - _, err = tx.Exec(putPreimagePgStr, key, dsKey) + _, err = tx.Exec(putPreimagePgStr, key, dsKey, prefix) return err } diff --git a/postgres/db/migrations/00003_create_eth_key_preimages_table.sql b/postgres/db/migrations/00003_create_eth_key_preimages_table.sql index e949506..52974d5 100644 --- a/postgres/db/migrations/00003_create_eth_key_preimages_table.sql +++ b/postgres/db/migrations/00003_create_eth_key_preimages_table.sql @@ -1,6 +1,7 @@ -- +goose Up CREATE TABLE IF NOT EXISTS eth.key_preimages ( eth_key BYTEA UNIQUE NOT NULL, + prefix BYTEA, ipfs_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED ); diff --git a/postgres/iterator.go b/postgres/iterator.go index 656a56e..a5cafb1 100644 --- a/postgres/iterator.go +++ b/postgres/iterator.go @@ -25,7 +25,12 @@ import ( const ( nextPgStr = `SELECT key, data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = key) - WHERE eth_key > $1 ORDER BY eth_key LIMIT 1` + WHERE eth_key > $1 + ORDER BY eth_key LIMIT 1` + nextPgStrWithPrefix = `SELECT key, data FROM public.blocks + INNER JOIN eth.key_preimages ON (ipfs_key = key) + WHERE eth_key > $1 AND prefix = $2 + ORDER BY eth_key LIMIT 1` ) type nextModel struct { @@ -54,6 +59,15 @@ func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator { // It returns whether the iterator is exhausted func (i *Iterator) Next() bool { next := new(nextModel) + if i.prefix != nil { + if err := i.db.Get(next, nextPgStrWithPrefix, i.currentKey, i.prefix); err != nil { + logrus.Errorf("iterator.Next() error: %v", err) + i.currentKey, i.currentValue = nil, nil + return false + } + i.currentKey, i.currentValue = next.Key, next.Value + return true + } if err := i.db.Get(next, nextPgStr, i.currentKey); err != nil { logrus.Errorf("iterator.Next() error: %v", err) i.currentKey, i.currentValue = nil, nil diff --git a/postgres/util.go b/postgres/util.go index 19d02b3..92838d3 100644 --- a/postgres/util.go +++ b/postgres/util.go @@ -37,21 +37,25 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) { } // DatastoreKeyFromGethKey returns the public.blocks key from the provided geth key -func DatastoreKeyFromGethKey(h []byte) (string, error) { +// It also returns the key's prefix, if it has one +func DatastoreKeyFromGethKey(h []byte) (string, []byte, error) { keyType, keyComponents := ResolveKeyType(h) switch keyType { case Keccak: - return MultihashKeyFromKeccak256(h) + mhKey, err := MultihashKeyFromKeccak256(h) + return mhKey, nil, err case Header: - return MultihashKeyFromKeccak256(keyComponents[1]) + mhKey, err := MultihashKeyFromKeccak256(keyComponents[1]) + return mhKey, keyComponents[0], err case Preimage: - return MultihashKeyFromKeccak256(keyComponents[1]) + mhKey, err := MultihashKeyFromKeccak256(keyComponents[1]) + return mhKey, keyComponents[0], err case Prefixed, Suffixed: // This data is not mapped by hash => content by geth, store it using the prefixed/suffixed key directly // I.e. the public.blocks datastore key == the hex representation of the geth key // Alternatively, decompose the data and derive the hash - return common.Bytes2Hex(h), nil + return common.Bytes2Hex(h), keyComponents[0], nil default: - return "", fmt.Errorf("invalid formatting of database key: %x", h) + return "", nil, fmt.Errorf("invalid formatting of database key: %x", h) } }