forked from cerc-io/ipfs-ethdb
Merge pull request #17 from vulcanize/ancient_support_1.9.10
Ancient support 1.9.10
This commit is contained in:
commit
1574d1a968
238
postgres/ancient.go
Normal file
238
postgres/ancient.go
Normal file
@ -0,0 +1,238 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2020 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// FreezerHeaderTable indicates the name of the freezer header table.
|
||||||
|
FreezerHeaderTable = "headers"
|
||||||
|
|
||||||
|
// FreezerHashTable indicates the name of the freezer canonical hash table.
|
||||||
|
FreezerHashTable = "hashes"
|
||||||
|
|
||||||
|
// FreezerBodiesTable indicates the name of the freezer block body table.
|
||||||
|
FreezerBodiesTable = "bodies"
|
||||||
|
|
||||||
|
// FreezerReceiptTable indicates the name of the freezer receipts table.
|
||||||
|
FreezerReceiptTable = "receipts"
|
||||||
|
|
||||||
|
// FreezerDifficultyTable indicates the name of the freezer total difficulty table.
|
||||||
|
FreezerDifficultyTable = "diffs"
|
||||||
|
|
||||||
|
// ancient append Postgres statements
|
||||||
|
appendAncientHeaderPgStr = "INSERT INTO eth.ancient_headers (block_number, header) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET header = $2"
|
||||||
|
appendAncientHashPgStr = "INSERT INTO eth.ancient_hashes (block_number, hash) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET hash = $2"
|
||||||
|
appendAncientBodyPgStr = "INSERT INTO eth.ancient_bodies (block_number, body) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET body = $2"
|
||||||
|
appendAncientReceiptsPgStr = "INSERT INTO eth.ancient_receipts (block_number, receipts) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET receipts = $2"
|
||||||
|
appendAncientTDPgStr = "INSERT INTO eth.ancient_tds (block_number, td) VALUES ($1, $2) ON CONFLICT (block_number) DO UPDATE SET td = $2"
|
||||||
|
|
||||||
|
// ancient truncate Postgres statements
|
||||||
|
truncateAncientHeaderPgStr = "DELETE FROM eth.ancient_headers WHERE block_number > $1"
|
||||||
|
truncateAncientHashPgStr = "DELETE FROM eth.ancient_hashes WHERE block_number > $1"
|
||||||
|
truncateAncientBodiesPgStr = "DELETE FROM eth.ancient_bodies WHERE block_number > $1"
|
||||||
|
truncateAncientReceiptsPgStr = "DELETE FROM eth.ancient_receipts WHERE block_number > $1"
|
||||||
|
truncateAncientTDPgStr = "DELETE FROM eth.ancient_tds WHERE block_number > $1"
|
||||||
|
|
||||||
|
// ancient size Postgres statement
|
||||||
|
ancientSizePgStr = "SELECT pg_total_relation_size($1)"
|
||||||
|
|
||||||
|
// ancients Postgres statement
|
||||||
|
ancientsPgStr = "SELECT block_number FROM eth.ancient_headers ORDER BY block_number DESC LIMIT 1"
|
||||||
|
|
||||||
|
// ancient has Postgres statements
|
||||||
|
hasAncientHeaderPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_headers WHERE block_number = $1)"
|
||||||
|
hasAncientHashPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_hashes WHERE block_number = $1)"
|
||||||
|
hasAncientBodyPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_bodies WHERE block_number = $1)"
|
||||||
|
hasAncientReceiptsPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_receipts WHERE block_number = $1)"
|
||||||
|
hasAncientTDPgStr = "SELECT exists(SELECT 1 FROM eth.ancient_tds WHERE block_number = $1)"
|
||||||
|
|
||||||
|
// ancient get Postgres statements
|
||||||
|
getAncientHeaderPgStr = "SELECT header FROM eth.ancient_headers WHERE block_number = $1"
|
||||||
|
getAncientHashPgStr = "SELECT hash FROM eth.ancient_hashes WHERE block_number = $1"
|
||||||
|
getAncientBodyPgStr = "SELECT body FROM eth.ancient_bodies WHERE block_number = $1"
|
||||||
|
getAncientReceiptsPgStr = "SELECT receipts FROM eth.ancient_receipts WHERE block_number = $1"
|
||||||
|
getAncientTDPgStr = "SELECT td FROM eth.ancient_tds WHERE block_number = $1"
|
||||||
|
)
|
||||||
|
|
||||||
|
// HasAncient satisfies the ethdb.AncientReader interface
|
||||||
|
// HasAncient returns an indicator whether the specified data exists in the ancient store
|
||||||
|
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
|
||||||
|
var pgStr string
|
||||||
|
switch kind {
|
||||||
|
case FreezerHeaderTable:
|
||||||
|
pgStr = hasAncientHeaderPgStr
|
||||||
|
case FreezerHashTable:
|
||||||
|
pgStr = hasAncientHashPgStr
|
||||||
|
case FreezerBodiesTable:
|
||||||
|
pgStr = hasAncientBodyPgStr
|
||||||
|
case FreezerReceiptTable:
|
||||||
|
pgStr = hasAncientReceiptsPgStr
|
||||||
|
case FreezerDifficultyTable:
|
||||||
|
pgStr = hasAncientTDPgStr
|
||||||
|
default:
|
||||||
|
return false, fmt.Errorf("unexpected ancient kind: %s", kind)
|
||||||
|
}
|
||||||
|
has := new(bool)
|
||||||
|
return *has, d.db.Get(has, pgStr, number)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ancient satisfies the ethdb.AncientReader interface
|
||||||
|
// Ancient retrieves an ancient binary blob from the append-only immutable files
|
||||||
|
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
|
||||||
|
var pgStr string
|
||||||
|
switch kind {
|
||||||
|
case FreezerHeaderTable:
|
||||||
|
pgStr = getAncientHeaderPgStr
|
||||||
|
case FreezerHashTable:
|
||||||
|
pgStr = getAncientHashPgStr
|
||||||
|
case FreezerBodiesTable:
|
||||||
|
pgStr = getAncientBodyPgStr
|
||||||
|
case FreezerReceiptTable:
|
||||||
|
pgStr = getAncientReceiptsPgStr
|
||||||
|
case FreezerDifficultyTable:
|
||||||
|
pgStr = getAncientTDPgStr
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unexpected ancient kind: %s", kind)
|
||||||
|
}
|
||||||
|
data := new([]byte)
|
||||||
|
return *data, d.db.Get(data, pgStr, number)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ancients satisfies the ethdb.AncientReader interface
|
||||||
|
// Ancients returns the ancient item numbers in the ancient store
|
||||||
|
func (d *Database) Ancients() (uint64, error) {
|
||||||
|
num := new(uint64)
|
||||||
|
if err := d.db.Get(num, ancientsPgStr); err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return 0, nil
|
||||||
|
}
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return *num, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AncientSize satisfies the ethdb.AncientReader interface
|
||||||
|
// AncientSize returns the ancient size of the specified category
|
||||||
|
func (d *Database) AncientSize(kind string) (uint64, error) {
|
||||||
|
var tableName string
|
||||||
|
switch kind {
|
||||||
|
case FreezerHeaderTable:
|
||||||
|
tableName = "eth.ancient_headers"
|
||||||
|
case FreezerHashTable:
|
||||||
|
tableName = "eth.ancient_hashes"
|
||||||
|
case FreezerBodiesTable:
|
||||||
|
tableName = "eth.ancient_bodies"
|
||||||
|
case FreezerReceiptTable:
|
||||||
|
tableName = "eth.ancient_receipts"
|
||||||
|
case FreezerDifficultyTable:
|
||||||
|
tableName = "eth.ancient_tds"
|
||||||
|
default:
|
||||||
|
return 0, fmt.Errorf("unexpected ancient kind: %s", kind)
|
||||||
|
}
|
||||||
|
size := new(uint64)
|
||||||
|
return *size, d.db.Get(size, ancientSizePgStr, tableName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AppendAncient satisfies the ethdb.AncientWriter interface
|
||||||
|
// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
|
||||||
|
func (d *Database) AppendAncient(number uint64, hash, header, body, receipts, td []byte) error {
|
||||||
|
// append in batch
|
||||||
|
var err error
|
||||||
|
if d.ancientTx == nil {
|
||||||
|
d.ancientTx, err = d.db.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
if err := d.ancientTx.Rollback(); err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
d.ancientTx = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if _, err := d.ancientTx.Exec(appendAncientHashPgStr, number, hash); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := d.ancientTx.Exec(appendAncientHeaderPgStr, number, header); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := d.ancientTx.Exec(appendAncientBodyPgStr, number, body); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := d.ancientTx.Exec(appendAncientReceiptsPgStr, number, receipts); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = d.ancientTx.Exec(appendAncientTDPgStr, number, td)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TruncateAncients satisfies the ethdb.AncientWriter interface
|
||||||
|
// TruncateAncients discards all but the first n ancient data from the ancient store
|
||||||
|
func (d *Database) TruncateAncients(n uint64) error {
|
||||||
|
// truncate in batch
|
||||||
|
var err error
|
||||||
|
if d.ancientTx == nil {
|
||||||
|
d.ancientTx, err = d.db.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
if err := d.ancientTx.Rollback(); err != nil {
|
||||||
|
logrus.Error(err)
|
||||||
|
d.ancientTx = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if _, err := d.ancientTx.Exec(truncateAncientHeaderPgStr, n); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := d.ancientTx.Exec(truncateAncientHashPgStr, n); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := d.ancientTx.Exec(truncateAncientBodiesPgStr, n); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := d.ancientTx.Exec(truncateAncientReceiptsPgStr, n); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = d.ancientTx.Exec(truncateAncientTDPgStr, n)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sync satisfies the ethdb.AncientWriter interface
|
||||||
|
// Sync flushes all in-memory ancient store data to disk
|
||||||
|
func (d *Database) Sync() error {
|
||||||
|
if d.ancientTx == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := d.ancientTx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
d.ancientTx = nil
|
||||||
|
return nil
|
||||||
|
}
|
232
postgres/ancient_test.go
Normal file
232
postgres/ancient_test.go
Normal file
@ -0,0 +1,232 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package pgipfsethdb_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
. "github.com/onsi/ginkgo"
|
||||||
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
|
"github.com/vulcanize/ipfs-ethdb/postgres"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ancientDB ethdb.Database
|
||||||
|
testBlockNumber uint64 = 1
|
||||||
|
testAncientHeader = types.Header{Number: big.NewInt(2)}
|
||||||
|
testAncientHeaderRLP, _ = rlp.EncodeToBytes(testHeader2)
|
||||||
|
testAncientHash = testAncientHeader.Hash().Bytes()
|
||||||
|
testAncientBodyBytes = make([]byte, 10000)
|
||||||
|
testAncientReceiptsBytes = make([]byte, 5000)
|
||||||
|
testAncientTD, _ = new(big.Int).SetString("1000000000000000000000", 10)
|
||||||
|
testAncientTDBytes = testAncientTD.Bytes()
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ = Describe("Ancient", func() {
|
||||||
|
BeforeEach(func() {
|
||||||
|
db, err = pgipfsethdb.TestDB()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
ancientDB = pgipfsethdb.NewDatabase(db)
|
||||||
|
|
||||||
|
})
|
||||||
|
AfterEach(func() {
|
||||||
|
err = pgipfsethdb.ResetTestDB(db)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("AppendAncient/Sync/Has", func() {
|
||||||
|
It("adds eth objects to the Ancient database and returns whether or not an ancient record exists", func() {
|
||||||
|
hasAncient(testBlockNumber, false)
|
||||||
|
|
||||||
|
err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
hasAncient(testBlockNumber, false)
|
||||||
|
|
||||||
|
err = ancientDB.Sync()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
hasAncient(testBlockNumber, true)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("AppendAncient/Sync/Ancient", func() {
|
||||||
|
It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() {
|
||||||
|
hasAncient(testBlockNumber, false)
|
||||||
|
|
||||||
|
_, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
_, err = ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
_, err = ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
_, err = ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
_, err = ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
|
||||||
|
err = ancientDB.AppendAncient(testBlockNumber, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = ancientDB.Sync()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
hasAncient(testBlockNumber, true)
|
||||||
|
|
||||||
|
ancientHeader, err := ancientDB.Ancient(pgipfsethdb.FreezerHeaderTable, testBlockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientHeader).To(Equal(testAncientHeaderRLP))
|
||||||
|
|
||||||
|
ancientHash, err := ancientDB.Ancient(pgipfsethdb.FreezerHashTable, testBlockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientHash).To(Equal(testAncientHash))
|
||||||
|
|
||||||
|
ancientBody, err := ancientDB.Ancient(pgipfsethdb.FreezerBodiesTable, testBlockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientBody).To(Equal(testAncientBodyBytes))
|
||||||
|
|
||||||
|
ancientReceipts, err := ancientDB.Ancient(pgipfsethdb.FreezerReceiptTable, testBlockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientReceipts).To(Equal(testAncientReceiptsBytes))
|
||||||
|
|
||||||
|
ancientTD, err := ancientDB.Ancient(pgipfsethdb.FreezerDifficultyTable, testBlockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientTD).To(Equal(testAncientTDBytes))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("AppendAncient/Sync/Ancients", func() {
|
||||||
|
It("returns the height of the ancient database", func() {
|
||||||
|
ancients, err := ancientDB.Ancients()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancients).To(Equal(uint64(0)))
|
||||||
|
|
||||||
|
for i := uint64(0); i <= 100; i++ {
|
||||||
|
hasAncient(i, false)
|
||||||
|
err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ancientDB.Sync()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
for i := uint64(0); i <= 100; i++ {
|
||||||
|
hasAncient(i, true)
|
||||||
|
}
|
||||||
|
ancients, err = ancientDB.Ancients()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancients).To(Equal(uint64(100)))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("AppendAncient/Truncate/Sync", func() {
|
||||||
|
It("truncates the ancient database to the provided height", func() {
|
||||||
|
for i := uint64(0); i <= 100; i++ {
|
||||||
|
hasAncient(i, false)
|
||||||
|
err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ancientDB.Sync()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
err = ancientDB.TruncateAncients(50)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
for i := uint64(0); i <= 100; i++ {
|
||||||
|
hasAncient(i, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
ancients, err := ancientDB.Ancients()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancients).To(Equal(uint64(100)))
|
||||||
|
|
||||||
|
err = ancientDB.Sync()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
for i := uint64(0); i <= 100; i++ {
|
||||||
|
if i <= 50 {
|
||||||
|
hasAncient(i, true)
|
||||||
|
} else {
|
||||||
|
hasAncient(i, false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ancients, err = ancientDB.Ancients()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancients).To(Equal(uint64(50)))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Describe("AppendAncient/Sync/AncientSize", func() {
|
||||||
|
It("adds the eth objects to the Ancient database and returns the ancient objects on request", func() {
|
||||||
|
for i := uint64(0); i <= 100; i++ {
|
||||||
|
hasAncient(i, false)
|
||||||
|
err = ancientDB.AppendAncient(i, testAncientHash, testAncientHeaderRLP, testAncientBodyBytes, testAncientReceiptsBytes, testAncientTDBytes)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = ancientDB.Sync()
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
for i := uint64(0); i <= 100; i++ {
|
||||||
|
hasAncient(i, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
ancientHeaderSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHeaderTable)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientHeaderSize).To(Equal(uint64(106496)))
|
||||||
|
|
||||||
|
ancientHashSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerHashTable)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientHashSize).To(Equal(uint64(32768)))
|
||||||
|
|
||||||
|
ancientBodySize, err := ancientDB.AncientSize(pgipfsethdb.FreezerBodiesTable)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientBodySize).To(Equal(uint64(73728)))
|
||||||
|
|
||||||
|
ancientReceiptsSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerReceiptTable)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientReceiptsSize).To(Equal(uint64(65536)))
|
||||||
|
|
||||||
|
ancientTDSize, err := ancientDB.AncientSize(pgipfsethdb.FreezerDifficultyTable)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(ancientTDSize).To(Equal(uint64(32768)))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
func hasAncient(blockNumber uint64, shouldHave bool) {
|
||||||
|
has, err := ancientDB.HasAncient(pgipfsethdb.FreezerHeaderTable, blockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(has).To(Equal(shouldHave))
|
||||||
|
has, err = ancientDB.HasAncient(pgipfsethdb.FreezerHashTable, blockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(has).To(Equal(shouldHave))
|
||||||
|
has, err = ancientDB.HasAncient(pgipfsethdb.FreezerBodiesTable, blockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(has).To(Equal(shouldHave))
|
||||||
|
has, err = ancientDB.HasAncient(pgipfsethdb.FreezerReceiptTable, blockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(has).To(Equal(shouldHave))
|
||||||
|
has, err = ancientDB.HasAncient(pgipfsethdb.FreezerDifficultyTable, blockNumber)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(has).To(Equal(shouldHave))
|
||||||
|
}
|
@ -17,22 +17,25 @@
|
|||||||
package pgipfsethdb
|
package pgipfsethdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
|
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
type Batch struct {
|
type Batch struct {
|
||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
tx *sqlx.Tx
|
tx *sqlx.Tx
|
||||||
valueSize int
|
valueSize int
|
||||||
|
replayCache map[string][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
||||||
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
|
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
|
||||||
b := &Batch{
|
b := &Batch{
|
||||||
db: db,
|
db: db,
|
||||||
tx: tx,
|
tx: tx,
|
||||||
|
replayCache: make(map[string][]byte),
|
||||||
}
|
}
|
||||||
if tx == nil {
|
if tx == nil {
|
||||||
b.Reset()
|
b.Reset()
|
||||||
@ -55,6 +58,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
b.valueSize += len(value)
|
b.valueSize += len(value)
|
||||||
|
b.replayCache[common.Bytes2Hex(key)] = value
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,7 +66,11 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
|||||||
// Delete removes the key from the key-value data store
|
// Delete removes the key from the key-value data store
|
||||||
func (b *Batch) Delete(key []byte) (err error) {
|
func (b *Batch) Delete(key []byte) (err error) {
|
||||||
_, err = b.tx.Exec(deletePgStr, key)
|
_, err = b.tx.Exec(deletePgStr, key)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
delete(b.replayCache, common.Bytes2Hex(key))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValueSize satisfies the ethdb.Batch interface
|
// ValueSize satisfies the ethdb.Batch interface
|
||||||
@ -79,13 +87,27 @@ func (b *Batch) Write() error {
|
|||||||
if b.tx == nil {
|
if b.tx == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return b.tx.Commit()
|
if err := b.tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.replayCache = nil
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replay satisfies the ethdb.Batch interface
|
// Replay satisfies the ethdb.Batch interface
|
||||||
// Replay replays the batch contents
|
// Replay replays the batch contents
|
||||||
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
|
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
|
||||||
return errNotSupported
|
if b.tx != nil {
|
||||||
|
b.tx.Rollback()
|
||||||
|
b.tx = nil
|
||||||
|
}
|
||||||
|
for key, value := range b.replayCache {
|
||||||
|
if err := w.Put(common.Hex2Bytes(key), value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.replayCache = nil
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset satisfies the ethdb.Batch interface
|
// Reset satisfies the ethdb.Batch interface
|
||||||
@ -97,5 +119,6 @@ func (b *Batch) Reset() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
b.replayCache = make(map[string][]byte)
|
||||||
b.valueSize = 0
|
b.valueSize = 0
|
||||||
}
|
}
|
||||||
|
@ -115,4 +115,30 @@ var _ = Describe("Batch", func() {
|
|||||||
Expect(size).To(Equal(0))
|
Expect(size).To(Equal(0))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("Replay", func() {
|
||||||
|
It("returns the size of data in the batch queued for write", func() {
|
||||||
|
err = batch.Put(testKeccakEthKey, testValue)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Put(testKeccakEthKey2, testValue2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
_, err = database.Get(testKeccakEthKey)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
_, err = database.Get(testKeccakEthKey2)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
|
||||||
|
err = batch.Replay(database)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
val, err := database.Get(testKeccakEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val).To(Equal(testValue))
|
||||||
|
val2, err := database.Get(testKeccakEthKey2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val2).To(Equal(testValue2))
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
package pgipfsethdb
|
package pgipfsethdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
@ -27,10 +26,8 @@ import (
|
|||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNotSupported = errors.New("this operation is not supported")
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
hasPgStr = "SELECT exists(select 1 from eth.key_preimages WHERE eth_key = $1)"
|
hasPgStr = "SELECT exists(SELECT 1 FROM eth.key_preimages WHERE eth_key = $1)"
|
||||||
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
|
getPgStr = "SELECT data FROM public.blocks INNER JOIN eth.key_preimages ON (ipfs_key = blocks.key) WHERE eth_key = $1"
|
||||||
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
||||||
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)"
|
putPreimagePgStr = "INSERT INTO eth.key_preimages (eth_key, ipfs_key, prefix) VALUES ($1, $2, $3) ON CONFLICT (eth_key) DO UPDATE SET (ipfs_key, prefix) = ($2, $3)"
|
||||||
@ -40,7 +37,8 @@ const (
|
|||||||
|
|
||||||
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
type Database struct {
|
type Database struct {
|
||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
|
ancientTx *sqlx.Tx
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
|
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
|
||||||
@ -187,7 +185,8 @@ func (d *Database) Stat(property string) (string, error) {
|
|||||||
// Compact satisfies the ethdb.Compacter interface
|
// Compact satisfies the ethdb.Compacter interface
|
||||||
// Compact flattens the underlying data store for the given key range
|
// Compact flattens the underlying data store for the given key range
|
||||||
func (d *Database) Compact(start []byte, limit []byte) error {
|
func (d *Database) Compact(start []byte, limit []byte) error {
|
||||||
return errNotSupported
|
// this leveldb functionality doesn't translate to Postgres
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBatch satisfies the ethdb.Batcher interface
|
// NewBatch satisfies the ethdb.Batcher interface
|
||||||
@ -221,45 +220,3 @@ func (d *Database) NewIteratorWithPrefix(prefix []byte) ethdb.Iterator {
|
|||||||
func (d *Database) Close() error {
|
func (d *Database) Close() error {
|
||||||
return d.db.DB.Close()
|
return d.db.DB.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasAncient satisfies the ethdb.AncientReader interface
|
|
||||||
// HasAncient returns an indicator whether the specified data exists in the ancient store
|
|
||||||
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
|
|
||||||
return false, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ancient satisfies the ethdb.AncientReader interface
|
|
||||||
// Ancient retrieves an ancient binary blob from the append-only immutable files
|
|
||||||
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
|
|
||||||
return nil, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ancients satisfies the ethdb.AncientReader interface
|
|
||||||
// Ancients returns the ancient item numbers in the ancient store
|
|
||||||
func (d *Database) Ancients() (uint64, error) {
|
|
||||||
return 0, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// AncientSize satisfies the ethdb.AncientReader interface
|
|
||||||
// AncientSize returns the ancient size of the specified category
|
|
||||||
func (d *Database) AncientSize(kind string) (uint64, error) {
|
|
||||||
return 0, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// AppendAncient satisfies the ethdb.AncientWriter interface
|
|
||||||
// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
|
|
||||||
func (d *Database) AppendAncient(number uint64, hash, header, body, receipt, td []byte) error {
|
|
||||||
return errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// TruncateAncients satisfies the ethdb.AncientWriter interface
|
|
||||||
// TruncateAncients discards all but the first n ancient data from the ancient store
|
|
||||||
func (d *Database) TruncateAncients(n uint64) error {
|
|
||||||
return errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync satisfies the ethdb.AncientWriter interface
|
|
||||||
// Sync flushes all in-memory ancient store data to disk
|
|
||||||
func (d *Database) Sync() error {
|
|
||||||
return errNotSupported
|
|
||||||
}
|
|
||||||
|
@ -0,0 +1,8 @@
|
|||||||
|
-- +goose Up
|
||||||
|
CREATE TABLE IF NOT EXISTS eth.ancient_headers (
|
||||||
|
block_number BIGINT UNIQUE NOT NULL,
|
||||||
|
header BYTEA NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
DROP TABLE eth.ancient_headers;
|
@ -0,0 +1,8 @@
|
|||||||
|
-- +goose Up
|
||||||
|
CREATE TABLE IF NOT EXISTS eth.ancient_hashes (
|
||||||
|
block_number BIGINT UNIQUE NOT NULL,
|
||||||
|
hash BYTEA NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
DROP TABLE eth.ancient_hashes;
|
@ -0,0 +1,8 @@
|
|||||||
|
-- +goose Up
|
||||||
|
CREATE TABLE IF NOT EXISTS eth.ancient_bodies (
|
||||||
|
block_number BIGINT UNIQUE NOT NULL,
|
||||||
|
body BYTEA NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
DROP TABLE eth.ancient_bodies;
|
@ -0,0 +1,8 @@
|
|||||||
|
-- +goose Up
|
||||||
|
CREATE TABLE IF NOT EXISTS eth.ancient_receipts (
|
||||||
|
block_number BIGINT UNIQUE NOT NULL,
|
||||||
|
receipts BYTEA NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
DROP TABLE eth.ancient_receipts;
|
@ -0,0 +1,8 @@
|
|||||||
|
-- +goose Up
|
||||||
|
CREATE TABLE IF NOT EXISTS eth.ancient_tds (
|
||||||
|
block_number BIGINT UNIQUE NOT NULL,
|
||||||
|
td BYTEA NOT NULL
|
||||||
|
);
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
DROP TABLE eth.ancient_tds;
|
@ -16,7 +16,9 @@
|
|||||||
|
|
||||||
package pgipfsethdb
|
package pgipfsethdb
|
||||||
|
|
||||||
import "github.com/jmoiron/sqlx"
|
import (
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
)
|
||||||
|
|
||||||
// TestDB connect to the testing database
|
// TestDB connect to the testing database
|
||||||
// it assumes the database has the IPFS public.blocks table present
|
// it assumes the database has the IPFS public.blocks table present
|
||||||
@ -28,6 +30,38 @@ func TestDB() (*sqlx.DB, error) {
|
|||||||
|
|
||||||
// ResetTestDB drops all rows in the test db public.blocks table
|
// ResetTestDB drops all rows in the test db public.blocks table
|
||||||
func ResetTestDB(db *sqlx.DB) error {
|
func ResetTestDB(db *sqlx.DB) error {
|
||||||
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
|
tx, err := db.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if p := recover(); p != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
panic(p)
|
||||||
|
} else if err != nil {
|
||||||
|
tx.Rollback()
|
||||||
|
} else {
|
||||||
|
err = tx.Commit()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if _, err := tx.Exec("TRUNCATE public.blocks CASCADE"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec("TRUNCATE eth.key_preimages CASCADE"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec("TRUNCATE eth.ancient_headers CASCADE"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec("TRUNCATE eth.ancient_hashes CASCADE"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec("TRUNCATE eth.ancient_bodies CASCADE"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec("TRUNCATE eth.ancient_receipts CASCADE"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = tx.Exec("TRUNCATE eth.ancient_tds CASCADE")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user