From 95178107c0fc41092262828d2f19926f7bf5a1d8 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 14 Sep 2020 08:52:02 -0500 Subject: [PATCH] ancient interfaces --- postgres/ancient.go | 227 +++++++++++++++++++++++++++++++++++++++++++ postgres/database.go | 47 +-------- 2 files changed, 230 insertions(+), 44 deletions(-) create mode 100644 postgres/ancient.go diff --git a/postgres/ancient.go b/postgres/ancient.go new file mode 100644 index 0000000..ef9c358 --- /dev/null +++ b/postgres/ancient.go @@ -0,0 +1,227 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "fmt" + + "github.com/sirupsen/logrus" +) + +const ( + // freezerHeaderTable indicates the name of the freezer header table. + freezerHeaderTable = "headers" + + // freezerHashTable indicates the name of the freezer canonical hash table. + freezerHashTable = "hashes" + + // freezerBodiesTable indicates the name of the freezer block body table. + freezerBodiesTable = "bodies" + + // freezerReceiptTable indicates the name of the freezer receipts table. + freezerReceiptTable = "receipts" + + // freezerDifficultyTable indicates the name of the freezer total difficulty table. + freezerDifficultyTable = "diffs" + + // ancient append Postgres statements + appendAncientHeaderPgStr = "INSERT INTO eth.ancient_headers (block_number, header) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET header = $2" + appendAncientHashPgStr = "INSERT INTO eth.ancient_hashes (block_number, hash) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET hash = $2" + appendAncientBodyPgStr = "INSERT INTO eth.ancient_bodies (block_number, body) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET body = $2" + appendAncientReceiptsPgStr = "INSERT INTO eth.ancient_receipts (block_number, receipts) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET receipts = $2" + appendAncientTDPgStr = "INSERT INTO eth.ancient_tds (block_number, td) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET td = $2" + + // ancient truncate Postgres statements + truncateAncientHeaderPgStr = "DELETE FROM eth.ancient_headers WHERE block_number > $1" + truncateAncientHashPgStr = "DELETE FROM eth.ancient_hashes WHERE block_number > $1" + truncateAncientBodiesPgStr = "DELETE FROM eth.ancient_bodies WHERE block_number > $1" + truncateAncientReceiptsPgStr = "DELETE FROM eth.ancient_receipts WHERE block_number > $1" + truncateAncientTDPgStr = "DELETE FROM eth.ancient_tds WHERE block_number > $1" + + // ancient size Postgres statement + ancientSizePgStr = "SELECT pg_total_relation_size($1)" + + // ancients Postgres statement + ancientsPgStr = "SELECT block_number FROM eth.ancient_headers ORDER BY block_number DESC LIMIT 1" + + // ancient has Postgres statements + hasAncientHeaderPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_headers WHERE block_number = $1)" + hasAncientHashPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_hashes WHERE block_number = $1)" + hasAncientBodyPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_bodies WHERE block_number = $1)" + hasAncientReceiptsPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_receipts WHERE block_number = $1)" + hasAncientTDPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_tds WHERE block_number = $1)" + + // ancient get Postgres statements + getAncientHeaderPgStr = "SELECT header FROM eth.ancient_headers WHERE block_number = $1" + getAncientHashPgStr = "SELECT hash FROM eth.ancient_hashes WHERE block_number = $1" + getAncientBodyPgStr = "SELECT body FROM eth.ancient_bodies WHERE block_number = $1" + getAncientReceiptsPgStr = "SELECT receipts FROM eth.ancient_receipts WHERE block_number = $1" + getAncientTDPgStr = "SELECT td FROM eth.ancient_tds WHERE block_number = $1" +) + +// HasAncient satisfies the ethdb.AncientReader interface +// HasAncient returns an indicator whether the specified data exists in the ancient store +func (d *Database) HasAncient(kind string, number uint64) (bool, error) { + var pgStr string + switch kind { + case freezerHeaderTable: + pgStr = hasAncientHeaderPgStr + case freezerHashTable: + pgStr = hasAncientHashPgStr + case freezerBodiesTable: + pgStr = hasAncientBodyPgStr + case freezerReceiptTable: + pgStr = hasAncientReceiptsPgStr + case freezerDifficultyTable: + pgStr = hasAncientTDPgStr + default: + return false, fmt.Errorf("unexpected ancient kind: %s", kind) + } + has := new(bool) + return *has, d.db.Get(has, pgStr, number) +} + +// Ancient satisfies the ethdb.AncientReader interface +// Ancient retrieves an ancient binary blob from the append-only immutable files +func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { + var pgStr string + switch kind { + case freezerHeaderTable: + pgStr = getAncientHeaderPgStr + case freezerHashTable: + pgStr = getAncientHashPgStr + case freezerBodiesTable: + pgStr = getAncientBodyPgStr + case freezerReceiptTable: + pgStr = getAncientReceiptsPgStr + case freezerDifficultyTable: + pgStr = getAncientTDPgStr + default: + return nil, fmt.Errorf("unexpected ancient kind: %s", kind) + } + data := new([]byte) + return *data, d.db.Get(data, pgStr, number) +} + +// Ancients satisfies the ethdb.AncientReader interface +// Ancients returns the ancient item numbers in the ancient store +func (d *Database) Ancients() (uint64, error) { + num := new(uint64) + return *num, d.db.Get(num, ancientsPgStr) +} + +// AncientSize satisfies the ethdb.AncientReader interface +// AncientSize returns the ancient size of the specified category +func (d *Database) AncientSize(kind string) (uint64, error) { + var tableName string + switch kind { + case freezerHeaderTable: + tableName = "eth.ancient_headers" + case freezerHashTable: + tableName = "eth.ancient_hashes" + case freezerBodiesTable: + tableName = "eth.ancient_bodies" + case freezerReceiptTable: + tableName = "eth.ancient_receipts" + case freezerDifficultyTable: + tableName = "eth.ancient_tds" + default: + return 0, fmt.Errorf("unexpected ancient kind: %s", kind) + } + size := new(uint64) + return *size, d.db.Get(size, ancientSizePgStr, tableName) +} + +// AppendAncient satisfies the ethdb.AncientWriter interface +// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files +func (d *Database) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error { + // append in batch + var err error + if d.ancientTx == nil { + d.ancientTx, err = d.db.Beginx() + if err != nil { + return err + } + } + defer func() { + if err != nil { + if err := d.ancientTx.Rollback(); err != nil { + logrus.Error(err) + d.ancientTx = nil + } + } + }() + + if _, err := d.ancientTx.Exec(appendAncientHashPgStr, number, hash); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientHeaderPgStr, number, header); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientBodyPgStr, number, body); err != nil { + return err + } + if _, err := d.ancientTx.Exec(appendAncientReceiptsPgStr, number, receipts); err != nil { + return err + } + _, err = d.ancientTx.Exec(appendAncientTDPgStr, number, td) + return err +} + +// TruncateAncients satisfies the ethdb.AncientWriter interface +// TruncateAncients discards all but the first n ancient data from the ancient store +func (d *Database) TruncateAncients(n uint64) error { + // truncate in batch + var err error + if d.ancientTx == nil { + d.ancientTx, err = d.db.Beginx() + if err != nil { + return err + } + } + defer func() { + if err != nil { + if err := d.ancientTx.Rollback(); err != nil { + logrus.Error(err) + d.ancientTx = nil + } + } + }() + if _, err := d.ancientTx.Exec(truncateAncientHeaderPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientHashPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientBodiesPgStr, n); err != nil { + return err + } + if _, err := d.ancientTx.Exec(truncateAncientReceiptsPgStr, n); err != nil { + return err + } + _, err = d.ancientTx.Exec(truncateAncientTDPgStr, n) + return err +} + +// Sync satisfies the ethdb.AncientWriter interface +// Sync flushes all in-memory ancient store data to disk +func (d *Database) Sync() error { + if d.ancientTx == nil { + return nil + } + return d.ancientTx.Commit() +} diff --git a/postgres/database.go b/postgres/database.go index 1e5a958..c00c129 100644 --- a/postgres/database.go +++ b/postgres/database.go @@ -30,7 +30,7 @@ import ( var errNotSupported = errors.New("this operation is not supported") const ( - hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)" + hasPgStr = "SELECT exists(SELECT 1 FROM eth.key_preimages WHERE eth_key = $1)" getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1" putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING" putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)" @@ -40,7 +40,8 @@ const ( // Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection type Database struct { - db *sqlx.DB + db *sqlx.DB + ancientTx *sqlx.Tx } // NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS @@ -221,45 +222,3 @@ func (d *Database) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator { func (d *Database) Close() error { return d.db.DB.Close() } - -// HasAncient satisfies the ethdb.AncientReader interface -// HasAncient returns an indicator whether the specified data exists in the ancient store -func (d *Database) HasAncient(kind string, number uint64) (bool, error) { - return false, errNotSupported -} - -// Ancient satisfies the ethdb.AncientReader interface -// Ancient retrieves an ancient binary blob from the append-only immutable files -func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { - return nil, errNotSupported -} - -// Ancients satisfies the ethdb.AncientReader interface -// Ancients returns the ancient item numbers in the ancient store -func (d *Database) Ancients() (uint64, error) { - return 0, errNotSupported -} - -// AncientSize satisfies the ethdb.AncientReader interface -// AncientSize returns the ancient size of the specified category -func (d *Database) AncientSize(kind string) (uint64, error) { - return 0, errNotSupported -} - -// AppendAncient satisfies the ethdb.AncientWriter interface -// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files -func (d *Database) AppendAncient(number uint64, hash, header, body, receipt, td []byte) error { - return errNotSupported -} - -// TruncateAncients satisfies the ethdb.AncientWriter interface -// TruncateAncients discards all but the first n ancient data from the ancient store -func (d *Database) TruncateAncients(n uint64) error { - return errNotSupported -} - -// Sync satisfies the ethdb.AncientWriter interface -// Sync flushes all in-memory ancient store data to disk -func (d *Database) Sync() error { - return errNotSupported -}