ancient interfaces

This commit is contained in:
Ian Norden 2020-09-14 08:52:02 -05:00
parent 531bc62eff
commit 95178107c0
2 changed files with 230 additions and 44 deletions

227
postgres/ancient.go Normal file
View File

@ -0,0 +1,227 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"fmt"
"github.com/sirupsen/logrus"
)
const (
// freezerHeaderTable indicates the name of the freezer header table.
freezerHeaderTable = "headers"
// freezerHashTable indicates the name of the freezer canonical hash table.
freezerHashTable = "hashes"
// freezerBodiesTable indicates the name of the freezer block body table.
freezerBodiesTable = "bodies"
// freezerReceiptTable indicates the name of the freezer receipts table.
freezerReceiptTable = "receipts"
// freezerDifficultyTable indicates the name of the freezer total difficulty table.
freezerDifficultyTable = "diffs"
// ancient append Postgres statements
appendAncientHeaderPgStr = "INSERT INTO eth.ancient_headers (block_number, header) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET header = $2"
appendAncientHashPgStr = "INSERT INTO eth.ancient_hashes (block_number, hash) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET hash = $2"
appendAncientBodyPgStr = "INSERT INTO eth.ancient_bodies (block_number, body) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET body = $2"
appendAncientReceiptsPgStr = "INSERT INTO eth.ancient_receipts (block_number, receipts) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET receipts = $2"
appendAncientTDPgStr = "INSERT INTO eth.ancient_tds (block_number, td) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET td = $2"
// ancient truncate Postgres statements
truncateAncientHeaderPgStr = "DELETE FROM eth.ancient_headers WHERE block_number > $1"
truncateAncientHashPgStr = "DELETE FROM eth.ancient_hashes WHERE block_number > $1"
truncateAncientBodiesPgStr = "DELETE FROM eth.ancient_bodies WHERE block_number > $1"
truncateAncientReceiptsPgStr = "DELETE FROM eth.ancient_receipts WHERE block_number > $1"
truncateAncientTDPgStr = "DELETE FROM eth.ancient_tds WHERE block_number > $1"
// ancient size Postgres statement
ancientSizePgStr = "SELECT pg_total_relation_size($1)"
// ancients Postgres statement
ancientsPgStr = "SELECT block_number FROM eth.ancient_headers ORDER BY block_number DESC LIMIT 1"
// ancient has Postgres statements
hasAncientHeaderPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_headers WHERE block_number = $1)"
hasAncientHashPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_hashes WHERE block_number = $1)"
hasAncientBodyPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_bodies WHERE block_number = $1)"
hasAncientReceiptsPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_receipts WHERE block_number = $1)"
hasAncientTDPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_tds WHERE block_number = $1)"
// ancient get Postgres statements
getAncientHeaderPgStr = "SELECT header FROM eth.ancient_headers WHERE block_number = $1"
getAncientHashPgStr = "SELECT hash FROM eth.ancient_hashes WHERE block_number = $1"
getAncientBodyPgStr = "SELECT body FROM eth.ancient_bodies WHERE block_number = $1"
getAncientReceiptsPgStr = "SELECT receipts FROM eth.ancient_receipts WHERE block_number = $1"
getAncientTDPgStr = "SELECT td FROM eth.ancient_tds WHERE block_number = $1"
)
// HasAncient satisfies the ethdb.AncientReader interface
// HasAncient returns an indicator whether the specified data exists in the ancient store
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
var pgStr string
switch kind {
case freezerHeaderTable:
pgStr = hasAncientHeaderPgStr
case freezerHashTable:
pgStr = hasAncientHashPgStr
case freezerBodiesTable:
pgStr = hasAncientBodyPgStr
case freezerReceiptTable:
pgStr = hasAncientReceiptsPgStr
case freezerDifficultyTable:
pgStr = hasAncientTDPgStr
default:
return false, fmt.Errorf("unexpected ancient kind: %s", kind)
}
has := new(bool)
return *has, d.db.Get(has, pgStr, number)
}
// Ancient satisfies the ethdb.AncientReader interface
// Ancient retrieves an ancient binary blob from the append-only immutable files
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
var pgStr string
switch kind {
case freezerHeaderTable:
pgStr = getAncientHeaderPgStr
case freezerHashTable:
pgStr = getAncientHashPgStr
case freezerBodiesTable:
pgStr = getAncientBodyPgStr
case freezerReceiptTable:
pgStr = getAncientReceiptsPgStr
case freezerDifficultyTable:
pgStr = getAncientTDPgStr
default:
return nil, fmt.Errorf("unexpected ancient kind: %s", kind)
}
data := new([]byte)
return *data, d.db.Get(data, pgStr, number)
}
// Ancients satisfies the ethdb.AncientReader interface
// Ancients returns the ancient item numbers in the ancient store
func (d *Database) Ancients() (uint64, error) {
num := new(uint64)
return *num, d.db.Get(num, ancientsPgStr)
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
var tableName string
switch kind {
case freezerHeaderTable:
tableName = "eth.ancient_headers"
case freezerHashTable:
tableName = "eth.ancient_hashes"
case freezerBodiesTable:
tableName = "eth.ancient_bodies"
case freezerReceiptTable:
tableName = "eth.ancient_receipts"
case freezerDifficultyTable:
tableName = "eth.ancient_tds"
default:
return 0, fmt.Errorf("unexpected ancient kind: %s", kind)
}
size := new(uint64)
return *size, d.db.Get(size, ancientSizePgStr, tableName)
}
// AppendAncient satisfies the ethdb.AncientWriter interface
// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
func (d *Database) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
// append in batch
var err error
if d.ancientTx == nil {
d.ancientTx, err = d.db.Beginx()
if err != nil {
return err
}
}
defer func() {
if err != nil {
if err := d.ancientTx.Rollback(); err != nil {
logrus.Error(err)
d.ancientTx = nil
}
}
}()
if _, err := d.ancientTx.Exec(appendAncientHashPgStr, number, hash); err != nil {
return err
}
if _, err := d.ancientTx.Exec(appendAncientHeaderPgStr, number, header); err != nil {
return err
}
if _, err := d.ancientTx.Exec(appendAncientBodyPgStr, number, body); err != nil {
return err
}
if _, err := d.ancientTx.Exec(appendAncientReceiptsPgStr, number, receipts); err != nil {
return err
}
_, err = d.ancientTx.Exec(appendAncientTDPgStr, number, td)
return err
}
// TruncateAncients satisfies the ethdb.AncientWriter interface
// TruncateAncients discards all but the first n ancient data from the ancient store
func (d *Database) TruncateAncients(n uint64) error {
// truncate in batch
var err error
if d.ancientTx == nil {
d.ancientTx, err = d.db.Beginx()
if err != nil {
return err
}
}
defer func() {
if err != nil {
if err := d.ancientTx.Rollback(); err != nil {
logrus.Error(err)
d.ancientTx = nil
}
}
}()
if _, err := d.ancientTx.Exec(truncateAncientHeaderPgStr, n); err != nil {
return err
}
if _, err := d.ancientTx.Exec(truncateAncientHashPgStr, n); err != nil {
return err
}
if _, err := d.ancientTx.Exec(truncateAncientBodiesPgStr, n); err != nil {
return err
}
if _, err := d.ancientTx.Exec(truncateAncientReceiptsPgStr, n); err != nil {
return err
}
_, err = d.ancientTx.Exec(truncateAncientTDPgStr, n)
return err
}
// Sync satisfies the ethdb.AncientWriter interface
// Sync flushes all in-memory ancient store data to disk
func (d *Database) Sync() error {
if d.ancientTx == nil {
return nil
}
return d.ancientTx.Commit()
}

View File

@ -30,7 +30,7 @@ import (
var errNotSupported = errors.New("this operation is not supported")
const (
hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)"
hasPgStr = "SELECT exists(SELECT 1 FROM eth.key_preimages WHERE eth_key = $1)"
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)"
@ -41,6 +41,7 @@ const (
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
type Database struct {
db *sqlx.DB
ancientTx *sqlx.Tx
}
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
@ -221,45 +222,3 @@ func (d *Database) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
func (d *Database) Close() error {
return d.db.DB.Close()
}
// HasAncient satisfies the ethdb.AncientReader interface
// HasAncient returns an indicator whether the specified data exists in the ancient store
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
return false, errNotSupported
}
// Ancient satisfies the ethdb.AncientReader interface
// Ancient retrieves an ancient binary blob from the append-only immutable files
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}
// Ancients satisfies the ethdb.AncientReader interface
// Ancients returns the ancient item numbers in the ancient store
func (d *Database) Ancients() (uint64, error) {
return 0, errNotSupported
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
// AppendAncient satisfies the ethdb.AncientWriter interface
// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
func (d *Database) AppendAncient(number uint64, hash, header, body, receipt, td []byte) error {
return errNotSupported
}
// TruncateAncients satisfies the ethdb.AncientWriter interface
// TruncateAncients discards all but the first n ancient data from the ancient store
func (d *Database) TruncateAncients(n uint64) error {
return errNotSupported
}
// Sync satisfies the ethdb.AncientWriter interface
// Sync flushes all in-memory ancient store data to disk
func (d *Database) Sync() error {
return errNotSupported
}