Compare commits

..

No commits in common. "v5" and "release-v0.0.6" have entirely different histories.

26 changed files with 802 additions and 1447 deletions

View File

@ -1,6 +1,6 @@
## ipfs-ethdb
[![Go Report Card](https://goreportcard.com/badge/github.com/cerc/ipfs-ethdb)](https://goreportcard.com/report/github.com/cerc-io/ipfs-ethdb)
[![Go Report Card](https://goreportcard.com/badge/github.com/vulcanize/ipfs-ethdb)](https://goreportcard.com/report/github.com/vulcanize/ipfs-ethdb)
> go-ethereum ethdb interfaces for Ethereum state data stored in IPFS
@ -11,7 +11,7 @@ interfacing with a state database. These interfaces are used to build higher-lev
which are used to perform the bulk of state related needs.
Ethereum data can be stored on IPFS, standard codecs for Etheruem data are defined in the [go-cid](https://github.com/ipfs/go-cid) library.
Using our [statediffing geth client](https://github.com/cerc-io/go-ethereum/releases/tag/v1.9.11-statediff-0.0.2) it is feasible to extract every single
Using our [statediffing geth client](https://github.com/vulcanize/go-ethereum/releases/tag/v1.9.11-statediff-0.0.2) it is feasible to extract every single
state and storage node and publish it to IPFS.
Geth stores state data in leveldb as key-value pairs between the keccak256 hash of the rlp-encoded object and the rlp-encoded object.
@ -21,7 +21,7 @@ ethdb interfaces for Ethereum data on IPFS by handling the conversion of a kecca
## Usage
To use this module import it and build an ethdb interface around an instance of a [Go IPFS blockservice](https://pkg.go.dev/github.com/ipfs/boxo@v0.19.0/blockservice), you can then
To use this module import it and build an ethdb interface around an instance of a [go ipfs blockservice](https://github.com/ipfs/go-blockservice), you can then
employ it as you would the usual [leveldb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/leveldb) or [memorydb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/memorydb) ethdbs
with some exceptions: the AncientReader, AncientWriter, Compacter, and Iteratee/Iterator interfaces are not functionally complete.
@ -34,7 +34,7 @@ Ancient interfaces are used for Ancient/frozen data operations (e.g. rawdb/table
Outside of these primarily auxiliary capabilities, this package satisfies the interfaces required for many state operations using Ethereum data on IPFS.
e.g.
go-ethereum trie.NodeIterator and state.NodeIterator can be constructed from the ethdb.KeyValueStore and ethdb.Database interfaces, respectively:
```go
@ -42,15 +42,15 @@ package main
import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/jmoiron/sqlx"
"github.com/cerc-io/ipfs-ethdb/v5"
"github.com/vulcanize/ipfs-ethdb"
)
func main() {

View File

@ -17,14 +17,13 @@
package ipfsethdb
import (
"context"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/boxo/blockservice"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
)
var (
@ -104,7 +103,7 @@ func (b *Batch) Write() error {
}
puts[i] = b
}
if err := b.blockService.AddBlocks(context.Background(), puts); err != nil {
if err := b.blockService.AddBlocks(puts); err != nil {
return err
}
for _, key := range b.deleteCache.Keys() {
@ -113,7 +112,7 @@ func (b *Batch) Write() error {
if err != nil {
return err
}
if err := b.blockService.DeleteBlock(context.Background(), c); err != nil {
if err := b.blockService.DeleteBlock(c); err != nil {
return err
}
}

View File

@ -25,7 +25,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
ipfsethdb "github.com/cerc-io/ipfs-ethdb/v5"
ipfsethdb "github.com/vulcanize/ipfs-ethdb"
)
var (

View File

@ -20,10 +20,11 @@ import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-blockservice"
)
var (
@ -68,7 +69,7 @@ func (d *Database) Has(key []byte) (bool, error) {
if err != nil {
return false, err
}
return d.blockService.Blockstore().Has(context.Background(), c)
return d.blockService.Blockstore().Has(c)
}
// Get satisfies the ethdb.KeyValueReader interface
@ -94,7 +95,7 @@ func (d *Database) Put(key []byte, value []byte) error {
if err != nil {
return err
}
return d.blockService.AddBlock(context.Background(), b)
return d.blockService.AddBlock(b)
}
// Delete satisfies the ethdb.KeyValueWriter interface
@ -105,7 +106,7 @@ func (d *Database) Delete(key []byte) error {
if err != nil {
return err
}
return d.blockService.DeleteBlock(context.Background(), c)
return d.blockService.DeleteBlock(c)
}
// DatabaseProperty enum type
@ -134,6 +135,9 @@ func (d *Database) Stat(property string) (string, error) {
return "", err
}
switch prop {
case ExchangeOnline:
online := d.blockService.Exchange().IsOnline()
return strconv.FormatBool(online), nil
default:
return "", fmt.Errorf("unhandled database property")
}
@ -156,16 +160,6 @@ func (d *Database) NewBatch() ethdb.Batch {
return b
}
// NewBatchWithSize satisfies the ethdb.Batcher interface.
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
b, err := NewBatch(d.blockService, size)
if err != nil {
panic(err)
}
return b
}
// NewIterator satisfies the ethdb.Iteratee interface
// it creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
@ -201,42 +195,36 @@ func (d *Database) Ancients() (uint64, error) {
return 0, errNotSupported
}
// Tail satisfies the ethdb.AncientReader interface.
// Tail returns the number of first stored item in the freezer.
func (d *Database) Tail() (uint64, error) {
return 0, errNotSupported
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
// AppendAncient satisfies the ethdb.AncientWriter interface
// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
func (d *Database) AppendAncient(number uint64, hash, header, body, receipt, td []byte) error {
return errNotSupported
}
// AncientRange retrieves all the items in a range, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return nil, errNotSupported
}
// ReadAncients applies the provided AncientReader function
func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
func (d *Database) ReadAncients(fn func(ethdb.AncientReader) error) (err error) {
return errNotSupported
}
// TruncateHead satisfies the ethdb.AncientWriter interface.
// TruncateHead discards all but the first n ancient data from the ancient store.
func (d *Database) TruncateHead(n uint64) (uint64, error) {
return 0, errNotSupported
}
// TruncateTail satisfies the ethdb.AncientWriter interface.
// TruncateTail discards the first n ancient data from the ancient store.
func (d *Database) TruncateTail(n uint64) (uint64, error) {
return 0, errNotSupported
// TruncateAncients satisfies the ethdb.AncientWriter interface
// TruncateAncients discards all but the first n ancient data from the ancient store
func (d *Database) TruncateAncients(n uint64) error {
return errNotSupported
}
// Sync satisfies the ethdb.AncientWriter interface
@ -244,20 +232,3 @@ func (d *Database) TruncateTail(n uint64) (uint64, error) {
func (d *Database) Sync() error {
return errNotSupported
}
// MigrateTable satisfies the ethdb.AncientWriter interface.
// MigrateTable processes and migrates entries of a given table to a new format.
func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
return errNotSupported
}
// NewSnapshot satisfies the ethdb.Snapshotter interface.
// NewSnapshot creates a database snapshot based on the current state.
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
return nil, errNotSupported
}
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (d *Database) AncientDatadir() (string, error) {
return "", errNotSupported
}

View File

@ -22,11 +22,11 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-blockservice"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
ipfsethdb "github.com/cerc-io/ipfs-ethdb/v5"
ipfsethdb "github.com/vulcanize/ipfs-ethdb"
)
var (

86
go.mod
View File

@ -1,73 +1,21 @@
module github.com/cerc-io/ipfs-ethdb/v5
module github.com/vulcanize/ipfs-ethdb
go 1.21
go 1.13
require (
github.com/ethereum/go-ethereum v1.13.14
github.com/hashicorp/golang-lru v1.0.2
github.com/ipfs/boxo v0.19.0
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.9
github.com/mailgun/groupcache/v2 v2.3.0
github.com/multiformats/go-multihash v0.2.3
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.19.0
github.com/sirupsen/logrus v1.6.0
)
require (
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-format v0.6.0 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/segmentio/fasthash v1.0.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/metric v1.25.0 // indirect
go.opentelemetry.io/otel/trace v1.25.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.2.2 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
github.com/ethereum/go-ethereum v1.10.14
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.5
github.com/ipfs/go-ipfs-blockstore v1.0.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/jmoiron/sqlx v1.2.0
github.com/lib/pq v1.0.0
github.com/mailgun/groupcache/v2 v2.2.1
github.com/multiformats/go-multihash v0.0.13
github.com/onsi/ginkgo v1.14.0
github.com/onsi/gomega v1.10.1
google.golang.org/appengine v1.6.7 // indirect
)

903
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,7 @@ import (
"context"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-blockservice"
)
var _ ethdb.Iterator = &Iterator{}

View File

@ -20,11 +20,11 @@ import (
"context"
"errors"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
)
var (
@ -52,27 +52,27 @@ func (mbs *MockBlockservice) Exchange() exchange.Interface {
panic("Exchange: implement me")
}
func (mbs *MockBlockservice) AddBlock(ctx context.Context, b blocks.Block) error {
return mbs.blockStore.Put(ctx, b)
func (mbs *MockBlockservice) AddBlock(b blocks.Block) error {
return mbs.blockStore.Put(b)
}
func (mbs *MockBlockservice) AddBlocks(ctx context.Context, bs []blocks.Block) error {
return mbs.blockStore.PutMany(ctx, bs)
func (mbs *MockBlockservice) AddBlocks(bs []blocks.Block) error {
return mbs.blockStore.PutMany(bs)
}
func (mbs *MockBlockservice) DeleteBlock(ctx context.Context, c cid.Cid) error {
return mbs.blockStore.DeleteBlock(ctx, c)
func (mbs *MockBlockservice) DeleteBlock(c cid.Cid) error {
return mbs.blockStore.DeleteBlock(c)
}
func (mbs *MockBlockservice) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
return mbs.blockStore.Get(ctx, c)
return mbs.blockStore.Get(c)
}
func (mbs *MockBlockservice) GetBlocks(ctx context.Context, cs []cid.Cid) <-chan blocks.Block {
blockChan := make(chan blocks.Block)
go func() {
for _, c := range cs {
if b, err := mbs.blockStore.Get(ctx, c); err == nil {
if b, err := mbs.blockStore.Get(c); err == nil {
blockChan <- b
}
}
@ -93,17 +93,17 @@ type MockBlockstore struct {
err error
}
func (mbs *MockBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
func (mbs *MockBlockstore) DeleteBlock(c cid.Cid) error {
delete(mbs.blocks, c.String())
return mbs.err
}
func (mbs *MockBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
func (mbs *MockBlockstore) Has(c cid.Cid) (bool, error) {
_, ok := mbs.blocks[c.String()]
return ok, mbs.err
}
func (mbs *MockBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
func (mbs *MockBlockstore) Get(c cid.Cid) (blocks.Block, error) {
obj, ok := mbs.blocks[c.String()]
if !ok {
return nil, blockNotFoundErr
@ -111,7 +111,7 @@ func (mbs *MockBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, er
return obj, mbs.err
}
func (mbs *MockBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
func (mbs *MockBlockstore) GetSize(c cid.Cid) (int, error) {
obj, ok := mbs.blocks[c.String()]
if !ok {
return 0, blockNotFoundErr
@ -119,12 +119,12 @@ func (mbs *MockBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error)
return len(obj.RawData()), mbs.err
}
func (mbs *MockBlockstore) Put(ctx context.Context, b blocks.Block) error {
func (mbs *MockBlockstore) Put(b blocks.Block) error {
mbs.blocks[b.Cid().String()] = b
return mbs.err
}
func (mbs *MockBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
func (mbs *MockBlockstore) PutMany(bs []blocks.Block) error {
for _, b := range bs {
mbs.blocks[b.Cid().String()] = b
}

View File

@ -17,8 +17,6 @@
package pgipfsethdb
import (
"math/big"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
@ -30,16 +28,13 @@ type Batch struct {
db *sqlx.DB
tx *sqlx.Tx
valueSize int
blockNumber *big.Int
}
// NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
b := &Batch{
db: db,
tx: tx,
blockNumber: blockNumber,
db: db,
tx: tx,
}
if tx == nil {
b.Reset()
@ -55,7 +50,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
if err != nil {
return err
}
if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil {
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil {
return err
}
b.valueSize += len(value)

View File

@ -20,8 +20,6 @@ import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
@ -29,7 +27,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
)
var (
@ -41,7 +39,7 @@ var (
var _ = Describe("Batch", func() {
BeforeEach(func() {
db, err = shared.TestDB()
db, err = pgipfsethdb.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
@ -51,16 +49,11 @@ var _ = Describe("Batch", func() {
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
batch = database.NewBatch()
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
err = pgipfsethdb.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})

View File

@ -18,10 +18,8 @@ package pgipfsethdb
import (
"context"
"database/sql"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"time"
@ -29,23 +27,16 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
)
var errNotSupported = errors.New("this operation is not supported")
var (
hasPgStr = "SELECT exists(select 1 from ipld.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM ipld.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO ipld.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM ipld.blocks WHERE key = $1"
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)"
getPgStr = "SELECT data FROM public.blocks WHERE key = $1"
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
DefaultCacheConfig = CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
)
var _ ethdb.Database = &Database{}
@ -54,8 +45,6 @@ var _ ethdb.Database = &Database{}
type Database struct {
db *sqlx.DB
cache *groupcache.Group
BlockNumber *big.Int
}
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
@ -121,12 +110,7 @@ func (d *Database) Has(key []byte) (bool, error) {
// Get retrieves the given key if it's present in the key-value data store
func (d *Database) dbGet(key string) ([]byte, error) {
var data []byte
err := d.db.Get(&data, getPgStr, key)
if err == sql.ErrNoRows {
log.Warn("Database miss for key", key)
}
return data, err
return data, d.db.Get(&data, getPgStr, key)
}
// Get satisfies the ethdb.KeyValueReader interface
@ -152,7 +136,7 @@ func (d *Database) Put(key []byte, value []byte) error {
if err != nil {
return err
}
_, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64())
_, err = d.db.Exec(putPgStr, mhKey, value)
return err
}
@ -261,13 +245,7 @@ func (d *Database) Compact(start []byte, limit []byte) error {
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called
func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewBatchWithSize satisfies the ethdb.Batcher interface.
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
return NewBatch(d.db, nil)
}
// NewIterator satisfies the ethdb.Iteratee interface
@ -305,42 +283,36 @@ func (d *Database) Ancients() (uint64, error) {
return 0, errNotSupported
}
// Tail satisfies the ethdb.AncientReader interface.
// Tail returns the number of first stored item in the freezer.
func (d *Database) Tail() (uint64, error) {
return 0, errNotSupported
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
// AppendAncient satisfies the ethdb.AncientWriter interface
// AppendAncient injects all binary blobs belong to block at the end of the append-only immutable table files
func (d *Database) AppendAncient(number uint64, hash, header, body, receipt, td []byte) error {
return errNotSupported
}
// AncientRange retrieves all the items in a range, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return nil, errNotSupported
}
// ReadAncients applies the provided AncientReader function
func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
func (d *Database) ReadAncients(fn func(ethdb.AncientReader) error) (err error) {
return errNotSupported
}
// TruncateHead satisfies the ethdb.AncientWriter interface.
// TruncateHead discards all but the first n ancient data from the ancient store.
func (d *Database) TruncateHead(n uint64) (uint64, error) {
return 0, errNotSupported
}
// TruncateTail satisfies the ethdb.AncientWriter interface.
// TruncateTail discards the first n ancient data from the ancient store.
func (d *Database) TruncateTail(n uint64) (uint64, error) {
return 0, errNotSupported
// TruncateAncients satisfies the ethdb.AncientWriter interface
// TruncateAncients discards all but the first n ancient data from the ancient store
func (d *Database) TruncateAncients(n uint64) error {
return errNotSupported
}
// Sync satisfies the ethdb.AncientWriter interface
@ -348,20 +320,3 @@ func (d *Database) TruncateTail(n uint64) (uint64, error) {
func (d *Database) Sync() error {
return errNotSupported
}
// MigrateTable satisfies the ethdb.AncientWriter interface.
// MigrateTable processes and migrates entries of a given table to a new format.
func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
return errNotSupported
}
// NewSnapshot satisfies the ethdb.Snapshotter interface.
// NewSnapshot creates a database snapshot based on the current state.
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
return nil, errNotSupported
}
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (d *Database) AncientDatadir() (string, error) {
return "", errNotSupported
}

View File

@ -20,8 +20,6 @@ import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
@ -30,23 +28,22 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
)
var (
database ethdb.Database
db *sqlx.DB
err error
testBlockNumber = big.NewInt(1337)
testHeader = types.Header{Number: testBlockNumber}
testValue, _ = rlp.EncodeToBytes(testHeader)
testEthKey = testHeader.Hash().Bytes()
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
database ethdb.Database
db *sqlx.DB
err error
testHeader = types.Header{Number: big.NewInt(1337)}
testValue, _ = rlp.EncodeToBytes(testHeader)
testEthKey = testHeader.Hash().Bytes()
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
)
var _ = Describe("Database", func() {
BeforeEach(func() {
db, err = shared.TestDB()
db, err = pgipfsethdb.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
@ -56,16 +53,10 @@ var _ = Describe("Database", func() {
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
err = db.Close()
err = pgipfsethdb.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
@ -76,7 +67,7 @@ var _ = Describe("Database", func() {
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testEthKey)
Expect(err).ToNot(HaveOccurred())
@ -91,7 +82,7 @@ var _ = Describe("Database", func() {
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testEthKey)
Expect(err).ToNot(HaveOccurred())

View File

@ -1,6 +1,6 @@
## ipfs-ethdb
IPFS has been [extended](https://github.com/cerc-io/go-ipfs/releases/tag/v0.4.22-alpha) to [use Postgres](https://github.com/cerc-io/go-ipfs-config/releases/tag/v0.0.8-alpha) as a backing [datastore](https://github.com/ipfs/go-ds-sql/tree/master/postgres).
IPFS has been [extended](https://github.com/vulcanize/go-ipfs/releases/tag/v0.4.22-alpha) to [use Postgres](https://github.com/vulcanize/go-ipfs-config/releases/tag/v0.0.8-alpha) as a backing [datastore](https://github.com/ipfs/go-ds-sql/tree/master/postgres).
Interfacing directly with the IPFS-backing Postgres database has some advantages over using the blockservice interface.
Namely, batching of IPFS writes with other Postgres writes and avoiding lock contention on the ipfs repository (lockfile located at the `IPFS_PATH`).
The downside is that we forgo the block-exchange capabilities of the blockservice, and are only able to fetch data contained in the local datastore.
@ -18,11 +18,11 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
"github.com/vulcanize/ipfs-ethdb/postgres"
)
func main() {
connectStr := "postgresql://vdbm:password@localhost:8077/cerc_testing?sslmode=disable"
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
db, _ := sqlx.Connect("postgres", connectStr)
kvs := pgipfsethdb.NewKeyValueStore(db)
@ -31,11 +31,7 @@ func main() {
trieNodeIterator := t.NodeIterator([]byte{})
// do stuff with trie node iterator
database := pgipfsethdb.NewDatabase(db, pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
})
database := pgipfsethdb.NewDatabase(db)
stateDatabase := state.NewDatabase(database)
stateDB, _ := state.New(common.Hash{}, stateDatabase, nil)
stateDBNodeIterator := state.NewNodeIterator(stateDB)

View File

@ -1,42 +0,0 @@
// VulcanizeDB
// Copyright © 2023 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
import (
"github.com/jmoiron/sqlx"
)
/*
Hostname: "localhost",
Port: 8077,
DatabaseName: "cerc_testing",
Username: "vdbm",
Password: "password",
*/
// TestDB connect to the testing database
// it assumes the database has the IPFS ipld.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the ipld.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://vdbm:password@localhost:8077/cerc_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db ipld.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE ipld.blocks CASCADE")
return err
}

View File

@ -17,8 +17,9 @@
package pgipfsethdb
import (
blockstore "github.com/ipfs/boxo/blockstore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash"
)
@ -32,3 +33,17 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) {
dbKey := dshelp.MultihashToDsKey(mh)
return blockstore.BlockPrefix.String() + dbKey.String(), nil
}
// TestDB connect to the testing database
// it assumes the database has the IPFS public.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db public.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
return err
}

View File

@ -1,117 +0,0 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"math/big"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
var _ ethdb.Batch = &Batch{}
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
type Batch struct {
db *sqlx.DB
tx *sqlx.Tx
valueSize int
blockNumber *big.Int
}
// NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
b := &Batch{
db: db,
tx: tx,
blockNumber: blockNumber,
}
if tx == nil {
b.Reset()
}
return b
}
// Put satisfies the ethdb.Batch interface
// Put inserts the given value into the key-value data store
// Key is expected to be a fully formulated cid key
// TODO: note, now that we expected a cid we could route to the "cids" tables based on prefix instead of to ipld.blocks
// but is it better to handle this routing here, or use a completely different interface since we already have to refactor
// at levels above this package in order to pass in cids instead of raw keccak256 hashes
func (b *Batch) Put(cidBytes []byte, value []byte) (err error) {
// cast and resolve strings from cid.Cast
// this will assert that we have a correctly formatted CID
// and will handle the different string encodings for v0 and v1 CIDs
// (note that this v0 vs v1 is different from the blockstore v0 vs v1)
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
if _, err = b.tx.Exec(putPgStr, c.String(), value, b.blockNumber.Uint64()); err != nil {
return err
}
b.valueSize += len(value)
return nil
}
// Delete satisfies the ethdb.Batch interface
// Delete removes the key from the key-value data store
func (b *Batch) Delete(cidBytes []byte) (err error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
_, err = b.tx.Exec(deletePgStr, c.String())
return err
}
// ValueSize satisfies the ethdb.Batch interface
// ValueSize retrieves the amount of data queued up for writing
// The returned value is the total byte length of all data queued to write
func (b *Batch) ValueSize() int {
return b.valueSize
}
// Write satisfies the ethdb.Batch interface
// Write flushes any accumulated data to disk
func (b *Batch) Write() error {
if b.tx == nil {
return nil
}
return b.tx.Commit()
}
// Replay satisfies the ethdb.Batch interface
// Replay replays the batch contents
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
return errNotSupported
}
// Reset satisfies the ethdb.Batch interface
// Reset resets the batch for reuse
// This should be called after every write
func (b *Batch) Reset() {
var err error
b.tx, err = b.db.Beginx()
if err != nil {
panic(err)
}
b.valueSize = 0
}

View File

@ -1,138 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/mailgun/groupcache/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v0"
)
var (
batch ethdb.Batch
testHeader2 = types.Header{Number: big.NewInt(2)}
testValue2, _ = rlp.EncodeToBytes(testHeader2)
testEthKey2 = testHeader2.Hash().Bytes()
testCID2, _ = pgipfsethdb.CIDFromKeccak256(testEthKey2, cid.EthBlock)
)
var _ = Describe("Batch", func() {
BeforeEach(func() {
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
batch = database.NewBatch()
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("Put/Write", func() {
It("adds the key-value pair to the batch", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testCID2.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
val2, err := database.Get(testCID2.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val2).To(Equal(testValue2))
})
})
Describe("Delete/Reset/Write", func() {
It("deletes the key-value pair in the batch", func() {
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
batch.Reset()
err = batch.Delete(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
err = batch.Delete(testCID2.Bytes())
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testCID2.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("ValueSize/Reset", func() {
It("returns the size of data in the batch queued for write", func() {
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
size := batch.ValueSize()
Expect(size).To(Equal(len(testValue) + len(testValue2)))
batch.Reset()
size = batch.ValueSize()
Expect(size).To(Equal(0))
})
})
})

View File

@ -1,365 +0,0 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"context"
"database/sql"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"time"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
)
var errNotSupported = errors.New("this operation is not supported")
var (
hasPgStr = "SELECT exists(select 1 from ipld.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM ipld.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO ipld.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM ipld.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
)
var _ ethdb.Database = &Database{}
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
type Database struct {
db *sqlx.DB
cache *groupcache.Group
BlockNumber *big.Int
}
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
return 0, errNotSupported
}
type CacheConfig struct {
Name string
Size int
ExpiryDuration time.Duration
}
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore {
database := Database{db: db}
database.InitCache(cacheConfig)
return &database
}
// NewDatabase returns a ethdb.Database interface for PG-IPFS
func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database {
database := Database{db: db}
database.InitCache(cacheConfig)
return &database
}
func (d *Database) InitCache(cacheConfig CacheConfig) {
d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc(
func(_ context.Context, id string, dest groupcache.Sink) error {
val, err := d.dbGet(id)
if err != nil {
return err
}
// Set the value in the groupcache, with expiry
if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil {
return err
}
return nil
},
))
}
func (d *Database) GetCacheStats() groupcache.Stats {
return d.cache.Stats
}
// Has satisfies the ethdb.KeyValueReader interface
// Has retrieves if a cid is present in the key-value data store
func (d *Database) Has(cidBytes []byte) (bool, error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return false, err
}
var exists bool
return exists, d.db.Get(&exists, hasPgStr, c.String())
}
// Get retrieves the given key if it's present in the key-value data store
func (d *Database) dbGet(key string) ([]byte, error) {
var data []byte
err := d.db.Get(&data, getPgStr, key)
if err == sql.ErrNoRows {
log.Warn("Database miss for key ", key)
}
return data, err
}
// Get satisfies the ethdb.KeyValueReader interface
// Get retrieves the given cid if it's present in the key-value data store
func (d *Database) Get(cidBytes []byte) ([]byte, error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
var data []byte
return data, d.cache.Get(ctx, c.String(), groupcache.AllocatingByteSliceSink(&data))
}
// Put satisfies the ethdb.KeyValueWriter interface
// Put inserts the given value into the key-value data store
// Key is expected to be a fully formulated cis of value
func (d *Database) Put(cidBytes []byte, value []byte) error {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
_, err = d.db.Exec(putPgStr, c.String(), value, d.BlockNumber.Uint64())
return err
}
// Delete satisfies the ethdb.KeyValueWriter interface
// Delete removes the cid from the key-value data store
func (d *Database) Delete(cidBytes []byte) error {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
cidString := c.String()
_, err = d.db.Exec(deletePgStr, cidString)
if err != nil {
return err
}
// Remove from cache.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
err = d.cache.Remove(ctx, cidString)
return err
}
// DatabaseProperty enum type
type DatabaseProperty int
const (
Unknown DatabaseProperty = iota
Size
Idle
InUse
MaxIdleClosed
MaxLifetimeClosed
MaxOpenConnections
OpenConnections
WaitCount
WaitDuration
)
// DatabasePropertyFromString helper function
func DatabasePropertyFromString(property string) (DatabaseProperty, error) {
switch strings.ToLower(property) {
case "size":
return Size, nil
case "idle":
return Idle, nil
case "inuse":
return InUse, nil
case "maxidleclosed":
return MaxIdleClosed, nil
case "maxlifetimeclosed":
return MaxLifetimeClosed, nil
case "maxopenconnections":
return MaxOpenConnections, nil
case "openconnections":
return OpenConnections, nil
case "waitcount":
return WaitCount, nil
case "waitduration":
return WaitDuration, nil
default:
return Unknown, fmt.Errorf("unknown database property")
}
}
// Stat satisfies the ethdb.Stater interface
// Stat returns a particular internal stat of the database
func (d *Database) Stat(property string) (string, error) {
prop, err := DatabasePropertyFromString(property)
if err != nil {
return "", err
}
switch prop {
case Size:
var byteSize string
return byteSize, d.db.Get(&byteSize, dbSizePgStr)
case Idle:
return strconv.Itoa(d.db.Stats().Idle), nil
case InUse:
return strconv.Itoa(d.db.Stats().InUse), nil
case MaxIdleClosed:
return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil
case MaxLifetimeClosed:
return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil
case MaxOpenConnections:
return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil
case OpenConnections:
return strconv.Itoa(d.db.Stats().OpenConnections), nil
case WaitCount:
return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil
case WaitDuration:
return d.db.Stats().WaitDuration.String(), nil
default:
return "", fmt.Errorf("unhandled database property")
}
}
// Compact satisfies the ethdb.Compacter interface
// Compact flattens the underlying data store for the given key range
func (d *Database) Compact(start []byte, limit []byte) error {
return errNotSupported
}
// NewBatch satisfies the ethdb.Batcher interface
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called
func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewBatchWithSize satisfies the ethdb.Batcher interface.
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewIterator satisfies the ethdb.Iteratee interface
// it creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
//
// Note: This method assumes that the prefix is NOT part of the start, so there's
// no need for the caller to prepend the prefix to the start
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
return NewIterator(start, prefix, d.db)
}
// Close satisfies the io.Closer interface.
// Close closes the db connection and deregisters from groupcache.
func (d *Database) Close() error {
groupcache.DeregisterGroup(d.cache.Name())
return d.db.DB.Close()
}
// HasAncient satisfies the ethdb.AncientReader interface
// HasAncient returns an indicator whether the specified data exists in the ancient store
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
return false, errNotSupported
}
// Ancient satisfies the ethdb.AncientReader interface
// Ancient retrieves an ancient binary blob from the append-only immutable files
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}
// Ancients satisfies the ethdb.AncientReader interface
// Ancients returns the ancient item numbers in the ancient store
func (d *Database) Ancients() (uint64, error) {
return 0, errNotSupported
}
// Tail satisfies the ethdb.AncientReader interface.
// Tail returns the number of first stored item in the freezer.
func (d *Database) Tail() (uint64, error) {
return 0, errNotSupported
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
// AncientRange retrieves all the items in a range, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return nil, errNotSupported
}
// ReadAncients applies the provided AncientReader function
func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
return errNotSupported
}
// TruncateHead satisfies the ethdb.AncientWriter interface.
// TruncateHead discards all but the first n ancient data from the ancient store.
func (d *Database) TruncateHead(n uint64) (uint64, error) {
return 0, errNotSupported
}
// TruncateTail satisfies the ethdb.AncientWriter interface.
// TruncateTail discards the first n ancient data from the ancient store.
func (d *Database) TruncateTail(n uint64) (uint64, error) {
return 0, errNotSupported
}
// Sync satisfies the ethdb.AncientWriter interface
// Sync flushes all in-memory ancient store data to disk
func (d *Database) Sync() error {
return errNotSupported
}
// MigrateTable satisfies the ethdb.AncientWriter interface.
// MigrateTable processes and migrates entries of a given table to a new format.
func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
return errNotSupported
}
// NewSnapshot satisfies the ethdb.Snapshotter interface.
// NewSnapshot creates a database snapshot based on the current state.
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
return nil, errNotSupported
}
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (d *Database) AncientDatadir() (string, error) {
return "", errNotSupported
}

View File

@ -1,133 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v0"
)
var (
database ethdb.Database
db *sqlx.DB
err error
testBlockNumber = big.NewInt(1337)
testHeader = types.Header{Number: testBlockNumber}
testValue, _ = rlp.EncodeToBytes(testHeader)
testEthKey = testHeader.Hash().Bytes()
testCID, _ = pgipfsethdb.CIDFromKeccak256(testEthKey, cid.EthBlock)
)
var _ = Describe("Database", func() {
BeforeEach(func() {
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
err = db.Close()
Expect(err).ToNot(HaveOccurred())
})
Describe("Has", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Get", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Delete", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
})

View File

@ -1,91 +0,0 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ipfs/go-cid"
"github.com/jmoiron/sqlx"
)
var _ ethdb.Iterator = &Iterator{}
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync),
// rawdb.InspectDatabase, and the new core/state/snapshot features.
// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
type Iterator struct {
db *sqlx.DB
currentKey, prefix []byte
err error
}
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
return &Iterator{
db: db,
prefix: prefix,
currentKey: start,
}
}
// Next satisfies the ethdb.Iterator interface
// Next moves the iterator to the next key/value pair
// It returns whether the iterator is exhausted
func (i *Iterator) Next() bool {
// this is complicated by the ipfs db keys not being the keccak256 hashes
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
panic("implement me: Next")
}
// Error satisfies the ethdb.Iterator interface
// Error returns any accumulated error
// Exhausting all the key/value pairs is not considered to be an error
func (i *Iterator) Error() error {
return i.err
}
// Key satisfies the ethdb.Iterator interface
// Key returns the key of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Key() []byte {
return i.currentKey
}
// Value satisfies the ethdb.Iterator interface
// Value returns the value of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Value() []byte {
c, err := cid.Cast(i.currentKey)
if err != nil {
i.err = err
return nil
}
var data []byte
i.err = i.db.Get(&data, getPgStr, c)
return data
}
// Release satisfies the ethdb.Iterator interface
// Release releases associated resources
// Release should always succeed and can be called multiple times without causing error
func (i *Iterator) Release() {
i.db.Close()
}

View File

@ -1,32 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"github.com/ipfs/go-cid"
_ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash"
)
// CIDFromKeccak256 converts keccak256 hash bytes into a v1 cid
func CIDFromKeccak256(hash []byte, codecType uint64) (cid.Cid, error) {
mh, err := multihash.Encode(hash, multihash.KECCAK_256)
if err != nil {
return cid.Cid{}, err
}
return cid.NewCidV1(codecType, mh), nil
}

View File

@ -1,29 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestPGIPFSETHDB(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "PG-IPFS ethdb test")
}

View File

@ -29,7 +29,7 @@ func Keccak256ToCid(h []byte, codec uint64) (cid.Cid, error) {
if err != nil {
return cid.Cid{}, err
}
return cid.NewCidV1(codec, buf), nil
return cid.NewCidV1(codec, multihash.Multihash(buf)), nil
}
// NewBlock takes a keccak256 hash key and the rlp []byte value it was derived from and creates an ipfs block object