Compare commits

...

47 Commits

Author SHA1 Message Date
3e6321e710 Geth 1.13 (Deneb/Cancun) upgrade (#3)
Reviewed-on: #3
Reviewed-by: Thomas E Lackey <telackey@noreply.git.vdb.to>
2024-05-23 12:47:27 +00:00
e9420ff2b8
Merge pull request #59 from cerc-io/roy/v5-dev
Deregister groupcache on close
2023-05-31 16:24:24 +08:00
c438c158fc Deregister groupcache on close 2023-05-14 02:33:04 +08:00
Ian Norden
05559e2551
Merge pull request #58 from cerc-io/roy/v5-dev
Update to v5
2023-04-09 12:16:03 -05:00
i-norden
a1e3f601b1 revert using vdb fork of geth 2023-04-09 12:11:46 -05:00
4a4ae2f2ea use statediff indexer config; other nits 2023-04-01 00:13:20 +08:00
4695984e1d vulc => cerc 2023-04-01 00:13:20 +08:00
e015d4333d update docs 2023-04-01 00:13:11 +08:00
Ian Norden
2709f7b8a7
Merge pull request #56 from cerc-io/ian/v5
bump to v1.11.5
2023-03-30 16:26:04 -05:00
i-norden
0e49b87509 update pkg version 2023-03-30 16:13:16 -05:00
i-norden
360b0abbbc use stock v1.11.5 geth; use new cerc test db info 2023-03-30 16:11:05 -05:00
Ian Norden
4a52acb0bb
Merge pull request #50 from cerc-io/ian/v5
v0 blockstore compatible version
2023-03-30 15:51:59 -05:00
i-norden
d2176e69af update postgres v0 subpkg and tests to work with cid keys 2023-02-20 12:44:45 -06:00
i-norden
5451c3c225 refactor v1 pkg to work with new dep paths 2023-02-20 12:44:45 -06:00
i-norden
844cda1107 shared util 2023-02-20 12:44:45 -06:00
i-norden
285b0f0db4 v0 and v1 subdirectories for postgres package; for working with the corresponding version of blockstore format 2023-02-20 12:44:40 -06:00
Michael
68a24ab952
Merge pull request #48 from cerc-io/v4.0.10-alpha-wip
dependency updates for 1.10.26 geth release
2022-11-07 10:52:20 -05:00
Michael Shaw
806f5bbe26 dependency updates for 1.10.26 geth release 2022-11-07 10:25:21 -05:00
Michael Shaw
99cbb3b0c1 missed 23->25 change 2022-09-23 13:33:11 -04:00
Michael Shaw
e3e8b2fbc1 dependency updates for geth-statediff 1.10.25 2022-09-23 13:26:57 -04:00
Michael
13c752b90e
Merge pull request #47 from cerc-io/cerc_refactor
cerc github refactor
2022-09-14 13:54:28 -04:00
Michael Shaw
f62c828c86 cerc github refactor 2022-09-14 01:03:07 -04:00
Michael
b34f7e0564
Merge pull request #46 from vulcanize/rebase-1.10.23-wip
version update for statediffing geth 1.10.23
2022-09-01 16:11:49 -04:00
Michael Shaw
32edf85e5d version update for statediffing geth 1.10.23 2022-09-01 15:58:38 -04:00
prathamesh0
fd19af396d
Log key on db misses (#45) 2022-08-23 12:14:42 +05:30
Michael
b706c596c5
Merge pull request #44 from vulcanize/geth1.10.21-update-wip
Exchange.Interface no longer IsOnline and other updates for blockserv…
2022-08-01 17:06:21 -04:00
Michael Shaw
845d37cacf Exchange.Interface no longer IsOnline and other updates for blockservice version update 2022-08-01 17:05:18 -04:00
Michael
963053f23e
Merge pull request #43 from vulcanize/geth1.10.21-update-wip
updating go.mod to depend on vulcanize fork of geth to avoid divergin…
2022-08-01 16:15:37 -04:00
Michael Shaw
f1c3dfccd3 updating go.mod to depend on vulcanize fork of geth to avoid diverging dependencies that are needed higher up the stack 2022-08-01 15:59:24 -04:00
Michael
4a40d2b14e
Merge pull request #42 from vulcanize/geth1.10.21-update-wip
go mod and sum updated for geth 1.10.21
2022-07-29 15:01:21 -04:00
Michael Shaw
77cc282a82 go mod and sum updated for geth 1.10.21 2022-07-29 14:57:24 -04:00
Abdul Rabbani
f3de4233b7
Merge pull request #40 from vulcanize/feature/update-geth-1.10.19
Update and test
2022-06-17 08:16:51 -04:00
Abdul Rabbani
23177c8dff Update and test
```
08:09:45:~/GitHub/cerc/ipfs-ethdb go test ./... -v
=== RUN   TestIPFSETHDB
Running Suite: IPFS ethdb test
==============================
Random Seed: 1655467793
Will run 9 of 9 specs

•••••••••
Ran 9 of 9 Specs in 0.000 seconds
SUCCESS! -- 9 Passed | 0 Failed | 0 Pending | 0 Skipped
--- PASS: TestIPFSETHDB (0.00s)
PASS
ok      github.com/vulcanize/ipfs-ethdb/v4      0.177s
=== RUN   TestPGIPFSETHDB
Running Suite: PG-IPFS ethdb test
=================================
Random Seed: 1655467793
Will run 9 of 9 specs

•••••••••
Ran 9 of 9 Specs in 0.922 seconds
SUCCESS! -- 9 Passed | 0 Failed | 0 Pending | 0 Skipped
--- PASS: TestPGIPFSETHDB (0.92s)
PASS
ok      github.com/vulcanize/ipfs-ethdb/v4/postgres     1.157s
```
2022-06-17 08:10:46 -04:00
Abdul Rabbani
1c90257717
Merge pull request #39 from vulcanize/feature/update-go-version-sharding
Feature/update go version sharding
2022-05-31 13:03:11 -04:00
Abdul Rabbani
86b530c3a3 Merge branch 'master' into feature/update-go-version-sharding 2022-05-31 13:02:06 -04:00
Abdul Rabbani
0069033d39
Merge pull request #38 from vulcanize/feature/update-go-version
Use v1.18 go
2022-05-31 12:59:01 -04:00
Abdul Rabbani
d22e0f70e2 Use v1.18 go 2022-05-31 12:58:33 -04:00
Abdul Rabbani
b220685662
Merge pull request #37 from vulcanize/release-v3.0.2
Update Go version and Fix Interface
2022-05-31 07:35:26 -04:00
Abdul Rabbani
950fb0802d Update Go version and Fix Interface 2022-05-27 13:50:59 -04:00
Ashwin Phatak
a19b47d67c
Merge pull request #33 from deep-stack/pm-v4-schema-changes
Updates to use v4 schema
2022-05-09 16:30:35 +05:30
ed9f8d7c4e Update module path for v4 2022-05-09 16:29:09 +05:30
08bc2f87ab Add block number while making inserts 2022-05-09 15:19:44 +05:30
a6b49f89f4 Updates to use v4 schema 2022-05-09 15:19:28 +05:30
Ashwin Phatak
2bc23f4deb
Merge pull request #32 from deep-stack/pm-v3-release
Update module path for v3
2022-05-02 19:54:15 +05:30
fb4c911adb Update module path for v3 2022-05-02 19:09:05 +05:30
Ashwin Phatak
2966e45d00
Merge pull request #31 from deep-stack/pm-upgrade-geth
Upgrade geth to v1.10.17
2022-05-02 15:22:57 +05:30
83b3f18991 Upgrade dependencies 2022-05-02 15:09:29 +05:30
26 changed files with 1384 additions and 791 deletions

View File

@ -1,6 +1,6 @@
## ipfs-ethdb ## ipfs-ethdb
[![Go Report Card](https://goreportcard.com/badge/github.com/vulcanize/ipfs-ethdb)](https://goreportcard.com/report/github.com/vulcanize/ipfs-ethdb) [![Go Report Card](https://goreportcard.com/badge/github.com/cerc/ipfs-ethdb)](https://goreportcard.com/report/github.com/cerc-io/ipfs-ethdb)
> go-ethereum ethdb interfaces for Ethereum state data stored in IPFS > go-ethereum ethdb interfaces for Ethereum state data stored in IPFS
@ -11,7 +11,7 @@ interfacing with a state database. These interfaces are used to build higher-lev
which are used to perform the bulk of state related needs. which are used to perform the bulk of state related needs.
Ethereum data can be stored on IPFS, standard codecs for Etheruem data are defined in the [go-cid](https://github.com/ipfs/go-cid) library. Ethereum data can be stored on IPFS, standard codecs for Etheruem data are defined in the [go-cid](https://github.com/ipfs/go-cid) library.
Using our [statediffing geth client](https://github.com/vulcanize/go-ethereum/releases/tag/v1.9.11-statediff-0.0.2) it is feasible to extract every single Using our [statediffing geth client](https://github.com/cerc-io/go-ethereum/releases/tag/v1.9.11-statediff-0.0.2) it is feasible to extract every single
state and storage node and publish it to IPFS. state and storage node and publish it to IPFS.
Geth stores state data in leveldb as key-value pairs between the keccak256 hash of the rlp-encoded object and the rlp-encoded object. Geth stores state data in leveldb as key-value pairs between the keccak256 hash of the rlp-encoded object and the rlp-encoded object.
@ -21,7 +21,7 @@ ethdb interfaces for Ethereum data on IPFS by handling the conversion of a kecca
## Usage ## Usage
To use this module import it and build an ethdb interface around an instance of a [go ipfs blockservice](https://github.com/ipfs/go-blockservice), you can then To use this module import it and build an ethdb interface around an instance of a [Go IPFS blockservice](https://pkg.go.dev/github.com/ipfs/boxo@v0.19.0/blockservice), you can then
employ it as you would the usual [leveldb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/leveldb) or [memorydb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/memorydb) ethdbs employ it as you would the usual [leveldb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/leveldb) or [memorydb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/memorydb) ethdbs
with some exceptions: the AncientReader, AncientWriter, Compacter, and Iteratee/Iterator interfaces are not functionally complete. with some exceptions: the AncientReader, AncientWriter, Compacter, and Iteratee/Iterator interfaces are not functionally complete.
@ -46,11 +46,11 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/ipfs/go-blockservice" "github.com/ipfs/boxo/blockservice"
"github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo/fsrepo" "github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/vulcanize/ipfs-ethdb" "github.com/cerc-io/ipfs-ethdb/v5"
) )
func main() { func main() {

View File

@ -17,13 +17,14 @@
package ipfsethdb package ipfsethdb
import ( import (
"context"
"errors" "errors"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/boxo/blockservice"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
) )
var ( var (
@ -103,7 +104,7 @@ func (b *Batch) Write() error {
} }
puts[i] = b puts[i] = b
} }
if err := b.blockService.AddBlocks(puts); err != nil { if err := b.blockService.AddBlocks(context.Background(), puts); err != nil {
return err return err
} }
for _, key := range b.deleteCache.Keys() { for _, key := range b.deleteCache.Keys() {
@ -112,7 +113,7 @@ func (b *Batch) Write() error {
if err != nil { if err != nil {
return err return err
} }
if err := b.blockService.DeleteBlock(c); err != nil { if err := b.blockService.DeleteBlock(context.Background(), c); err != nil {
return err return err
} }
} }

View File

@ -25,7 +25,7 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
ipfsethdb "github.com/vulcanize/ipfs-ethdb" ipfsethdb "github.com/cerc-io/ipfs-ethdb/v5"
) )
var ( var (

View File

@ -20,11 +20,10 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ipfs/go-blockservice" "github.com/ipfs/boxo/blockservice"
) )
var ( var (
@ -69,7 +68,7 @@ func (d *Database) Has(key []byte) (bool, error) {
if err != nil { if err != nil {
return false, err return false, err
} }
return d.blockService.Blockstore().Has(c) return d.blockService.Blockstore().Has(context.Background(), c)
} }
// Get satisfies the ethdb.KeyValueReader interface // Get satisfies the ethdb.KeyValueReader interface
@ -95,7 +94,7 @@ func (d *Database) Put(key []byte, value []byte) error {
if err != nil { if err != nil {
return err return err
} }
return d.blockService.AddBlock(b) return d.blockService.AddBlock(context.Background(), b)
} }
// Delete satisfies the ethdb.KeyValueWriter interface // Delete satisfies the ethdb.KeyValueWriter interface
@ -106,7 +105,7 @@ func (d *Database) Delete(key []byte) error {
if err != nil { if err != nil {
return err return err
} }
return d.blockService.DeleteBlock(c) return d.blockService.DeleteBlock(context.Background(), c)
} }
// DatabaseProperty enum type // DatabaseProperty enum type
@ -135,9 +134,6 @@ func (d *Database) Stat(property string) (string, error) {
return "", err return "", err
} }
switch prop { switch prop {
case ExchangeOnline:
online := d.blockService.Exchange().IsOnline()
return strconv.FormatBool(online), nil
default: default:
return "", fmt.Errorf("unhandled database property") return "", fmt.Errorf("unhandled database property")
} }
@ -227,20 +223,20 @@ func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][
} }
// ReadAncients applies the provided AncientReader function // ReadAncients applies the provided AncientReader function
func (d *Database) ReadAncients(fn func(ethdb.AncientReader) error) (err error) { func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
return errNotSupported return errNotSupported
} }
// TruncateHead satisfies the ethdb.AncientWriter interface. // TruncateHead satisfies the ethdb.AncientWriter interface.
// TruncateHead discards all but the first n ancient data from the ancient store. // TruncateHead discards all but the first n ancient data from the ancient store.
func (d *Database) TruncateHead(n uint64) error { func (d *Database) TruncateHead(n uint64) (uint64, error) {
return errNotSupported return 0, errNotSupported
} }
// TruncateTail satisfies the ethdb.AncientWriter interface. // TruncateTail satisfies the ethdb.AncientWriter interface.
// TruncateTail discards the first n ancient data from the ancient store. // TruncateTail discards the first n ancient data from the ancient store.
func (d *Database) TruncateTail(n uint64) error { func (d *Database) TruncateTail(n uint64) (uint64, error) {
return errNotSupported return 0, errNotSupported
} }
// Sync satisfies the ethdb.AncientWriter interface // Sync satisfies the ethdb.AncientWriter interface
@ -260,3 +256,8 @@ func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) { func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
return nil, errNotSupported return nil, errNotSupported
} }
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (d *Database) AncientDatadir() (string, error) {
return "", errNotSupported
}

View File

@ -22,11 +22,11 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ipfs/go-blockservice" "github.com/ipfs/boxo/blockservice"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
ipfsethdb "github.com/vulcanize/ipfs-ethdb" ipfsethdb "github.com/cerc-io/ipfs-ethdb/v5"
) )
var ( var (

86
go.mod
View File

@ -1,21 +1,73 @@
module github.com/vulcanize/ipfs-ethdb module github.com/cerc-io/ipfs-ethdb/v5
go 1.13 go 1.21
require ( require (
github.com/ethereum/go-ethereum v1.10.14 github.com/ethereum/go-ethereum v1.13.14
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d github.com/hashicorp/golang-lru v1.0.2
github.com/ipfs/go-block-format v0.0.2 github.com/ipfs/boxo v0.19.0
github.com/ipfs/go-blockservice v0.1.3 github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.0.5 github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-ipfs-blockstore v1.0.0 github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-ds-help v1.0.0 github.com/jmoiron/sqlx v1.3.5
github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/lib/pq v1.10.9
github.com/jmoiron/sqlx v1.2.0 github.com/mailgun/groupcache/v2 v2.3.0
github.com/lib/pq v1.0.0 github.com/multiformats/go-multihash v0.2.3
github.com/mailgun/groupcache/v2 v2.2.1 github.com/onsi/ginkgo v1.16.5
github.com/multiformats/go-multihash v0.0.13 github.com/onsi/gomega v1.19.0
github.com/onsi/ginkgo v1.14.0 github.com/sirupsen/logrus v1.6.0
github.com/onsi/gomega v1.10.1 )
google.golang.org/appengine v1.6.7 // indirect
require (
github.com/bits-and-blooms/bitset v1.10.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/holiman/uint256 v1.2.4 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-format v0.6.0 // indirect
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/segmentio/fasthash v1.0.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/metric v1.25.0 // indirect
go.opentelemetry.io/otel/trace v1.25.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
lukechampine.com/blake3 v1.2.2 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
) )

903
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -20,7 +20,7 @@ import (
"context" "context"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ipfs/go-blockservice" "github.com/ipfs/boxo/blockservice"
) )
var _ ethdb.Iterator = &Iterator{} var _ ethdb.Iterator = &Iterator{}

View File

@ -20,11 +20,11 @@ import (
"context" "context"
"errors" "errors"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
) )
var ( var (
@ -52,27 +52,27 @@ func (mbs *MockBlockservice) Exchange() exchange.Interface {
panic("Exchange: implement me") panic("Exchange: implement me")
} }
func (mbs *MockBlockservice) AddBlock(b blocks.Block) error { func (mbs *MockBlockservice) AddBlock(ctx context.Context, b blocks.Block) error {
return mbs.blockStore.Put(b) return mbs.blockStore.Put(ctx, b)
} }
func (mbs *MockBlockservice) AddBlocks(bs []blocks.Block) error { func (mbs *MockBlockservice) AddBlocks(ctx context.Context, bs []blocks.Block) error {
return mbs.blockStore.PutMany(bs) return mbs.blockStore.PutMany(ctx, bs)
} }
func (mbs *MockBlockservice) DeleteBlock(c cid.Cid) error { func (mbs *MockBlockservice) DeleteBlock(ctx context.Context, c cid.Cid) error {
return mbs.blockStore.DeleteBlock(c) return mbs.blockStore.DeleteBlock(ctx, c)
} }
func (mbs *MockBlockservice) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { func (mbs *MockBlockservice) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
return mbs.blockStore.Get(c) return mbs.blockStore.Get(ctx, c)
} }
func (mbs *MockBlockservice) GetBlocks(ctx context.Context, cs []cid.Cid) <-chan blocks.Block { func (mbs *MockBlockservice) GetBlocks(ctx context.Context, cs []cid.Cid) <-chan blocks.Block {
blockChan := make(chan blocks.Block) blockChan := make(chan blocks.Block)
go func() { go func() {
for _, c := range cs { for _, c := range cs {
if b, err := mbs.blockStore.Get(c); err == nil { if b, err := mbs.blockStore.Get(ctx, c); err == nil {
blockChan <- b blockChan <- b
} }
} }
@ -93,17 +93,17 @@ type MockBlockstore struct {
err error err error
} }
func (mbs *MockBlockstore) DeleteBlock(c cid.Cid) error { func (mbs *MockBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
delete(mbs.blocks, c.String()) delete(mbs.blocks, c.String())
return mbs.err return mbs.err
} }
func (mbs *MockBlockstore) Has(c cid.Cid) (bool, error) { func (mbs *MockBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
_, ok := mbs.blocks[c.String()] _, ok := mbs.blocks[c.String()]
return ok, mbs.err return ok, mbs.err
} }
func (mbs *MockBlockstore) Get(c cid.Cid) (blocks.Block, error) { func (mbs *MockBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
obj, ok := mbs.blocks[c.String()] obj, ok := mbs.blocks[c.String()]
if !ok { if !ok {
return nil, blockNotFoundErr return nil, blockNotFoundErr
@ -111,7 +111,7 @@ func (mbs *MockBlockstore) Get(c cid.Cid) (blocks.Block, error) {
return obj, mbs.err return obj, mbs.err
} }
func (mbs *MockBlockstore) GetSize(c cid.Cid) (int, error) { func (mbs *MockBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
obj, ok := mbs.blocks[c.String()] obj, ok := mbs.blocks[c.String()]
if !ok { if !ok {
return 0, blockNotFoundErr return 0, blockNotFoundErr
@ -119,12 +119,12 @@ func (mbs *MockBlockstore) GetSize(c cid.Cid) (int, error) {
return len(obj.RawData()), mbs.err return len(obj.RawData()), mbs.err
} }
func (mbs *MockBlockstore) Put(b blocks.Block) error { func (mbs *MockBlockstore) Put(ctx context.Context, b blocks.Block) error {
mbs.blocks[b.Cid().String()] = b mbs.blocks[b.Cid().String()] = b
return mbs.err return mbs.err
} }
func (mbs *MockBlockstore) PutMany(bs []blocks.Block) error { func (mbs *MockBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
for _, b := range bs { for _, b := range bs {
mbs.blocks[b.Cid().String()] = b mbs.blocks[b.Cid().String()] = b
} }

View File

@ -1,6 +1,6 @@
## ipfs-ethdb ## ipfs-ethdb
IPFS has been [extended](https://github.com/vulcanize/go-ipfs/releases/tag/v0.4.22-alpha) to [use Postgres](https://github.com/vulcanize/go-ipfs-config/releases/tag/v0.0.8-alpha) as a backing [datastore](https://github.com/ipfs/go-ds-sql/tree/master/postgres). IPFS has been [extended](https://github.com/cerc-io/go-ipfs/releases/tag/v0.4.22-alpha) to [use Postgres](https://github.com/cerc-io/go-ipfs-config/releases/tag/v0.0.8-alpha) as a backing [datastore](https://github.com/ipfs/go-ds-sql/tree/master/postgres).
Interfacing directly with the IPFS-backing Postgres database has some advantages over using the blockservice interface. Interfacing directly with the IPFS-backing Postgres database has some advantages over using the blockservice interface.
Namely, batching of IPFS writes with other Postgres writes and avoiding lock contention on the ipfs repository (lockfile located at the `IPFS_PATH`). Namely, batching of IPFS writes with other Postgres writes and avoiding lock contention on the ipfs repository (lockfile located at the `IPFS_PATH`).
The downside is that we forgo the block-exchange capabilities of the blockservice, and are only able to fetch data contained in the local datastore. The downside is that we forgo the block-exchange capabilities of the blockservice, and are only able to fetch data contained in the local datastore.
@ -18,11 +18,11 @@ import (
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/vulcanize/ipfs-ethdb/postgres" "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
) )
func main() { func main() {
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" connectStr := "postgresql://vdbm:password@localhost:8077/cerc_testing?sslmode=disable"
db, _ := sqlx.Connect("postgres", connectStr) db, _ := sqlx.Connect("postgres", connectStr)
kvs := pgipfsethdb.NewKeyValueStore(db) kvs := pgipfsethdb.NewKeyValueStore(db)
@ -31,7 +31,11 @@ func main() {
trieNodeIterator := t.NodeIterator([]byte{}) trieNodeIterator := t.NodeIterator([]byte{})
// do stuff with trie node iterator // do stuff with trie node iterator
database := pgipfsethdb.NewDatabase(db) database := pgipfsethdb.NewDatabase(db, pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
})
stateDatabase := state.NewDatabase(database) stateDatabase := state.NewDatabase(database)
stateDB, _ := state.New(common.Hash{}, stateDatabase, nil) stateDB, _ := state.New(common.Hash{}, stateDatabase, nil)
stateDBNodeIterator := state.NewNodeIterator(stateDB) stateDBNodeIterator := state.NewNodeIterator(stateDB)

42
postgres/shared/util.go Normal file
View File

@ -0,0 +1,42 @@
// VulcanizeDB
// Copyright © 2023 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
import (
"github.com/jmoiron/sqlx"
)
/*
Hostname: "localhost",
Port: 8077,
DatabaseName: "cerc_testing",
Username: "vdbm",
Password: "password",
*/
// TestDB connect to the testing database
// it assumes the database has the IPFS ipld.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the ipld.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://vdbm:password@localhost:8077/cerc_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db ipld.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE ipld.blocks CASCADE")
return err
}

117
postgres/v0/batch.go Normal file
View File

@ -0,0 +1,117 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"math/big"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
var _ ethdb.Batch = &Batch{}
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
type Batch struct {
db *sqlx.DB
tx *sqlx.Tx
valueSize int
blockNumber *big.Int
}
// NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
b := &Batch{
db: db,
tx: tx,
blockNumber: blockNumber,
}
if tx == nil {
b.Reset()
}
return b
}
// Put satisfies the ethdb.Batch interface
// Put inserts the given value into the key-value data store
// Key is expected to be a fully formulated cid key
// TODO: note, now that we expected a cid we could route to the "cids" tables based on prefix instead of to ipld.blocks
// but is it better to handle this routing here, or use a completely different interface since we already have to refactor
// at levels above this package in order to pass in cids instead of raw keccak256 hashes
func (b *Batch) Put(cidBytes []byte, value []byte) (err error) {
// cast and resolve strings from cid.Cast
// this will assert that we have a correctly formatted CID
// and will handle the different string encodings for v0 and v1 CIDs
// (note that this v0 vs v1 is different from the blockstore v0 vs v1)
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
if _, err = b.tx.Exec(putPgStr, c.String(), value, b.blockNumber.Uint64()); err != nil {
return err
}
b.valueSize += len(value)
return nil
}
// Delete satisfies the ethdb.Batch interface
// Delete removes the key from the key-value data store
func (b *Batch) Delete(cidBytes []byte) (err error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
_, err = b.tx.Exec(deletePgStr, c.String())
return err
}
// ValueSize satisfies the ethdb.Batch interface
// ValueSize retrieves the amount of data queued up for writing
// The returned value is the total byte length of all data queued to write
func (b *Batch) ValueSize() int {
return b.valueSize
}
// Write satisfies the ethdb.Batch interface
// Write flushes any accumulated data to disk
func (b *Batch) Write() error {
if b.tx == nil {
return nil
}
return b.tx.Commit()
}
// Replay satisfies the ethdb.Batch interface
// Replay replays the batch contents
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
return errNotSupported
}
// Reset satisfies the ethdb.Batch interface
// Reset resets the batch for reuse
// This should be called after every write
func (b *Batch) Reset() {
var err error
b.tx, err = b.db.Beginx()
if err != nil {
panic(err)
}
b.valueSize = 0
}

138
postgres/v0/batch_test.go Normal file
View File

@ -0,0 +1,138 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/mailgun/groupcache/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v0"
)
var (
batch ethdb.Batch
testHeader2 = types.Header{Number: big.NewInt(2)}
testValue2, _ = rlp.EncodeToBytes(testHeader2)
testEthKey2 = testHeader2.Hash().Bytes()
testCID2, _ = pgipfsethdb.CIDFromKeccak256(testEthKey2, cid.EthBlock)
)
var _ = Describe("Batch", func() {
BeforeEach(func() {
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
batch = database.NewBatch()
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("Put/Write", func() {
It("adds the key-value pair to the batch", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testCID2.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
val2, err := database.Get(testCID2.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val2).To(Equal(testValue2))
})
})
Describe("Delete/Reset/Write", func() {
It("deletes the key-value pair in the batch", func() {
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
batch.Reset()
err = batch.Delete(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
err = batch.Delete(testCID2.Bytes())
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testCID2.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("ValueSize/Reset", func() {
It("returns the size of data in the batch queued for write", func() {
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
size := batch.ValueSize()
Expect(size).To(Equal(len(testValue) + len(testValue2)))
batch.Reset()
size = batch.ValueSize()
Expect(size).To(Equal(0))
})
})
})

365
postgres/v0/database.go Normal file
View File

@ -0,0 +1,365 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"context"
"database/sql"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"time"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
)
var errNotSupported = errors.New("this operation is not supported")
var (
hasPgStr = "SELECT exists(select 1 from ipld.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM ipld.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO ipld.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM ipld.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
)
var _ ethdb.Database = &Database{}
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
type Database struct {
db *sqlx.DB
cache *groupcache.Group
BlockNumber *big.Int
}
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
return 0, errNotSupported
}
type CacheConfig struct {
Name string
Size int
ExpiryDuration time.Duration
}
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore {
database := Database{db: db}
database.InitCache(cacheConfig)
return &database
}
// NewDatabase returns a ethdb.Database interface for PG-IPFS
func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database {
database := Database{db: db}
database.InitCache(cacheConfig)
return &database
}
func (d *Database) InitCache(cacheConfig CacheConfig) {
d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc(
func(_ context.Context, id string, dest groupcache.Sink) error {
val, err := d.dbGet(id)
if err != nil {
return err
}
// Set the value in the groupcache, with expiry
if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil {
return err
}
return nil
},
))
}
func (d *Database) GetCacheStats() groupcache.Stats {
return d.cache.Stats
}
// Has satisfies the ethdb.KeyValueReader interface
// Has retrieves if a cid is present in the key-value data store
func (d *Database) Has(cidBytes []byte) (bool, error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return false, err
}
var exists bool
return exists, d.db.Get(&exists, hasPgStr, c.String())
}
// Get retrieves the given key if it's present in the key-value data store
func (d *Database) dbGet(key string) ([]byte, error) {
var data []byte
err := d.db.Get(&data, getPgStr, key)
if err == sql.ErrNoRows {
log.Warn("Database miss for key ", key)
}
return data, err
}
// Get satisfies the ethdb.KeyValueReader interface
// Get retrieves the given cid if it's present in the key-value data store
func (d *Database) Get(cidBytes []byte) ([]byte, error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
var data []byte
return data, d.cache.Get(ctx, c.String(), groupcache.AllocatingByteSliceSink(&data))
}
// Put satisfies the ethdb.KeyValueWriter interface
// Put inserts the given value into the key-value data store
// Key is expected to be a fully formulated cis of value
func (d *Database) Put(cidBytes []byte, value []byte) error {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
_, err = d.db.Exec(putPgStr, c.String(), value, d.BlockNumber.Uint64())
return err
}
// Delete satisfies the ethdb.KeyValueWriter interface
// Delete removes the cid from the key-value data store
func (d *Database) Delete(cidBytes []byte) error {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
cidString := c.String()
_, err = d.db.Exec(deletePgStr, cidString)
if err != nil {
return err
}
// Remove from cache.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
err = d.cache.Remove(ctx, cidString)
return err
}
// DatabaseProperty enum type
type DatabaseProperty int
const (
Unknown DatabaseProperty = iota
Size
Idle
InUse
MaxIdleClosed
MaxLifetimeClosed
MaxOpenConnections
OpenConnections
WaitCount
WaitDuration
)
// DatabasePropertyFromString helper function
func DatabasePropertyFromString(property string) (DatabaseProperty, error) {
switch strings.ToLower(property) {
case "size":
return Size, nil
case "idle":
return Idle, nil
case "inuse":
return InUse, nil
case "maxidleclosed":
return MaxIdleClosed, nil
case "maxlifetimeclosed":
return MaxLifetimeClosed, nil
case "maxopenconnections":
return MaxOpenConnections, nil
case "openconnections":
return OpenConnections, nil
case "waitcount":
return WaitCount, nil
case "waitduration":
return WaitDuration, nil
default:
return Unknown, fmt.Errorf("unknown database property")
}
}
// Stat satisfies the ethdb.Stater interface
// Stat returns a particular internal stat of the database
func (d *Database) Stat(property string) (string, error) {
prop, err := DatabasePropertyFromString(property)
if err != nil {
return "", err
}
switch prop {
case Size:
var byteSize string
return byteSize, d.db.Get(&byteSize, dbSizePgStr)
case Idle:
return strconv.Itoa(d.db.Stats().Idle), nil
case InUse:
return strconv.Itoa(d.db.Stats().InUse), nil
case MaxIdleClosed:
return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil
case MaxLifetimeClosed:
return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil
case MaxOpenConnections:
return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil
case OpenConnections:
return strconv.Itoa(d.db.Stats().OpenConnections), nil
case WaitCount:
return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil
case WaitDuration:
return d.db.Stats().WaitDuration.String(), nil
default:
return "", fmt.Errorf("unhandled database property")
}
}
// Compact satisfies the ethdb.Compacter interface
// Compact flattens the underlying data store for the given key range
func (d *Database) Compact(start []byte, limit []byte) error {
return errNotSupported
}
// NewBatch satisfies the ethdb.Batcher interface
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called
func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewBatchWithSize satisfies the ethdb.Batcher interface.
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewIterator satisfies the ethdb.Iteratee interface
// it creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
//
// Note: This method assumes that the prefix is NOT part of the start, so there's
// no need for the caller to prepend the prefix to the start
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
return NewIterator(start, prefix, d.db)
}
// Close satisfies the io.Closer interface.
// Close closes the db connection and deregisters from groupcache.
func (d *Database) Close() error {
groupcache.DeregisterGroup(d.cache.Name())
return d.db.DB.Close()
}
// HasAncient satisfies the ethdb.AncientReader interface
// HasAncient returns an indicator whether the specified data exists in the ancient store
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
return false, errNotSupported
}
// Ancient satisfies the ethdb.AncientReader interface
// Ancient retrieves an ancient binary blob from the append-only immutable files
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}
// Ancients satisfies the ethdb.AncientReader interface
// Ancients returns the ancient item numbers in the ancient store
func (d *Database) Ancients() (uint64, error) {
return 0, errNotSupported
}
// Tail satisfies the ethdb.AncientReader interface.
// Tail returns the number of first stored item in the freezer.
func (d *Database) Tail() (uint64, error) {
return 0, errNotSupported
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
// AncientRange retrieves all the items in a range, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return nil, errNotSupported
}
// ReadAncients applies the provided AncientReader function
func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
return errNotSupported
}
// TruncateHead satisfies the ethdb.AncientWriter interface.
// TruncateHead discards all but the first n ancient data from the ancient store.
func (d *Database) TruncateHead(n uint64) (uint64, error) {
return 0, errNotSupported
}
// TruncateTail satisfies the ethdb.AncientWriter interface.
// TruncateTail discards the first n ancient data from the ancient store.
func (d *Database) TruncateTail(n uint64) (uint64, error) {
return 0, errNotSupported
}
// Sync satisfies the ethdb.AncientWriter interface
// Sync flushes all in-memory ancient store data to disk
func (d *Database) Sync() error {
return errNotSupported
}
// MigrateTable satisfies the ethdb.AncientWriter interface.
// MigrateTable processes and migrates entries of a given table to a new format.
func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
return errNotSupported
}
// NewSnapshot satisfies the ethdb.Snapshotter interface.
// NewSnapshot creates a database snapshot based on the current state.
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
return nil, errNotSupported
}
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (d *Database) AncientDatadir() (string, error) {
return "", errNotSupported
}

View File

@ -0,0 +1,133 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v0"
)
var (
database ethdb.Database
db *sqlx.DB
err error
testBlockNumber = big.NewInt(1337)
testHeader = types.Header{Number: testBlockNumber}
testValue, _ = rlp.EncodeToBytes(testHeader)
testEthKey = testHeader.Hash().Bytes()
testCID, _ = pgipfsethdb.CIDFromKeccak256(testEthKey, cid.EthBlock)
)
var _ = Describe("Database", func() {
BeforeEach(func() {
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
err = db.Close()
Expect(err).ToNot(HaveOccurred())
})
Describe("Has", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Get", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Delete", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
})

91
postgres/v0/iterator.go Normal file
View File

@ -0,0 +1,91 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ipfs/go-cid"
"github.com/jmoiron/sqlx"
)
var _ ethdb.Iterator = &Iterator{}
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync),
// rawdb.InspectDatabase, and the new core/state/snapshot features.
// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
type Iterator struct {
db *sqlx.DB
currentKey, prefix []byte
err error
}
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
return &Iterator{
db: db,
prefix: prefix,
currentKey: start,
}
}
// Next satisfies the ethdb.Iterator interface
// Next moves the iterator to the next key/value pair
// It returns whether the iterator is exhausted
func (i *Iterator) Next() bool {
// this is complicated by the ipfs db keys not being the keccak256 hashes
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
panic("implement me: Next")
}
// Error satisfies the ethdb.Iterator interface
// Error returns any accumulated error
// Exhausting all the key/value pairs is not considered to be an error
func (i *Iterator) Error() error {
return i.err
}
// Key satisfies the ethdb.Iterator interface
// Key returns the key of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Key() []byte {
return i.currentKey
}
// Value satisfies the ethdb.Iterator interface
// Value returns the value of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Value() []byte {
c, err := cid.Cast(i.currentKey)
if err != nil {
i.err = err
return nil
}
var data []byte
i.err = i.db.Get(&data, getPgStr, c)
return data
}
// Release satisfies the ethdb.Iterator interface
// Release releases associated resources
// Release should always succeed and can be called multiple times without causing error
func (i *Iterator) Release() {
i.db.Close()
}

32
postgres/v0/util.go Normal file
View File

@ -0,0 +1,32 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"github.com/ipfs/go-cid"
_ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash"
)
// CIDFromKeccak256 converts keccak256 hash bytes into a v1 cid
func CIDFromKeccak256(hash []byte, codecType uint64) (cid.Cid, error) {
mh, err := multihash.Encode(hash, multihash.KECCAK_256)
if err != nil {
return cid.Cid{}, err
}
return cid.NewCidV1(codecType, mh), nil
}

View File

@ -17,6 +17,8 @@
package pgipfsethdb package pgipfsethdb
import ( import (
"math/big"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
) )
@ -28,13 +30,16 @@ type Batch struct {
db *sqlx.DB db *sqlx.DB
tx *sqlx.Tx tx *sqlx.Tx
valueSize int valueSize int
blockNumber *big.Int
} }
// NewBatch returns a ethdb.Batch interface for PG-IPFS // NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch { func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
b := &Batch{ b := &Batch{
db: db, db: db,
tx: tx, tx: tx,
blockNumber: blockNumber,
} }
if tx == nil { if tx == nil {
b.Reset() b.Reset()
@ -50,7 +55,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
if err != nil { if err != nil {
return err return err
} }
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil { if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil {
return err return err
} }
b.valueSize += len(value) b.valueSize += len(value)

View File

@ -20,6 +20,8 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -27,7 +29,7 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres" pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
) )
var ( var (
@ -39,7 +41,7 @@ var (
var _ = Describe("Batch", func() { var _ = Describe("Batch", func() {
BeforeEach(func() { BeforeEach(func() {
db, err = pgipfsethdb.TestDB() db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{ cacheConfig := pgipfsethdb.CacheConfig{
@ -49,11 +51,16 @@ var _ = Describe("Batch", func() {
} }
database = pgipfsethdb.NewDatabase(db, cacheConfig) database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
batch = database.NewBatch() batch = database.NewBatch()
}) })
AfterEach(func() { AfterEach(func() {
groupcache.DeregisterGroup("db") groupcache.DeregisterGroup("db")
err = pgipfsethdb.ResetTestDB(db) err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })

View File

@ -18,8 +18,10 @@ package pgipfsethdb
import ( import (
"context" "context"
"database/sql"
"errors" "errors"
"fmt" "fmt"
"math/big"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -27,16 +29,23 @@ import (
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2" "github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
) )
var errNotSupported = errors.New("this operation is not supported") var errNotSupported = errors.New("this operation is not supported")
var ( var (
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)" hasPgStr = "SELECT exists(select 1 from ipld.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM public.blocks WHERE key = $1" getPgStr = "SELECT data FROM ipld.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING" putPgStr = "INSERT INTO ipld.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM public.blocks WHERE key = $1" deletePgStr = "DELETE FROM ipld.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())" dbSizePgStr = "SELECT pg_database_size(current_database())"
DefaultCacheConfig = CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
) )
var _ ethdb.Database = &Database{} var _ ethdb.Database = &Database{}
@ -45,6 +54,8 @@ var _ ethdb.Database = &Database{}
type Database struct { type Database struct {
db *sqlx.DB db *sqlx.DB
cache *groupcache.Group cache *groupcache.Group
BlockNumber *big.Int
} }
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
@ -110,7 +121,12 @@ func (d *Database) Has(key []byte) (bool, error) {
// Get retrieves the given key if it's present in the key-value data store // Get retrieves the given key if it's present in the key-value data store
func (d *Database) dbGet(key string) ([]byte, error) { func (d *Database) dbGet(key string) ([]byte, error) {
var data []byte var data []byte
return data, d.db.Get(&data, getPgStr, key) err := d.db.Get(&data, getPgStr, key)
if err == sql.ErrNoRows {
log.Warn("Database miss for key", key)
}
return data, err
} }
// Get satisfies the ethdb.KeyValueReader interface // Get satisfies the ethdb.KeyValueReader interface
@ -136,7 +152,7 @@ func (d *Database) Put(key []byte, value []byte) error {
if err != nil { if err != nil {
return err return err
} }
_, err = d.db.Exec(putPgStr, mhKey, value) _, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64())
return err return err
} }
@ -245,13 +261,13 @@ func (d *Database) Compact(start []byte, limit []byte) error {
// NewBatch creates a write-only database that buffers changes to its host db // NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called // until a final write is called
func (d *Database) NewBatch() ethdb.Batch { func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil) return NewBatch(d.db, nil, d.BlockNumber)
} }
// NewBatchWithSize satisfies the ethdb.Batcher interface. // NewBatchWithSize satisfies the ethdb.Batcher interface.
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer. // NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (d *Database) NewBatchWithSize(size int) ethdb.Batch { func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
return NewBatch(d.db, nil) return NewBatch(d.db, nil, d.BlockNumber)
} }
// NewIterator satisfies the ethdb.Iteratee interface // NewIterator satisfies the ethdb.Iteratee interface
@ -311,20 +327,20 @@ func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][
} }
// ReadAncients applies the provided AncientReader function // ReadAncients applies the provided AncientReader function
func (d *Database) ReadAncients(fn func(ethdb.AncientReader) error) (err error) { func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
return errNotSupported return errNotSupported
} }
// TruncateHead satisfies the ethdb.AncientWriter interface. // TruncateHead satisfies the ethdb.AncientWriter interface.
// TruncateHead discards all but the first n ancient data from the ancient store. // TruncateHead discards all but the first n ancient data from the ancient store.
func (d *Database) TruncateHead(n uint64) error { func (d *Database) TruncateHead(n uint64) (uint64, error) {
return errNotSupported return 0, errNotSupported
} }
// TruncateTail satisfies the ethdb.AncientWriter interface. // TruncateTail satisfies the ethdb.AncientWriter interface.
// TruncateTail discards the first n ancient data from the ancient store. // TruncateTail discards the first n ancient data from the ancient store.
func (d *Database) TruncateTail(n uint64) error { func (d *Database) TruncateTail(n uint64) (uint64, error) {
return errNotSupported return 0, errNotSupported
} }
// Sync satisfies the ethdb.AncientWriter interface // Sync satisfies the ethdb.AncientWriter interface
@ -344,3 +360,8 @@ func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) { func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
return nil, errNotSupported return nil, errNotSupported
} }
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (d *Database) AncientDatadir() (string, error) {
return "", errNotSupported
}

View File

@ -20,6 +20,8 @@ import (
"math/big" "math/big"
"time" "time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
@ -28,14 +30,15 @@ import (
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres" pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
) )
var ( var (
database ethdb.Database database ethdb.Database
db *sqlx.DB db *sqlx.DB
err error err error
testHeader = types.Header{Number: big.NewInt(1337)} testBlockNumber = big.NewInt(1337)
testHeader = types.Header{Number: testBlockNumber}
testValue, _ = rlp.EncodeToBytes(testHeader) testValue, _ = rlp.EncodeToBytes(testHeader)
testEthKey = testHeader.Hash().Bytes() testEthKey = testHeader.Hash().Bytes()
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey) testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
@ -43,7 +46,7 @@ var (
var _ = Describe("Database", func() { var _ = Describe("Database", func() {
BeforeEach(func() { BeforeEach(func() {
db, err = pgipfsethdb.TestDB() db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{ cacheConfig := pgipfsethdb.CacheConfig{
@ -53,10 +56,16 @@ var _ = Describe("Database", func() {
} }
database = pgipfsethdb.NewDatabase(db, cacheConfig) database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
}) })
AfterEach(func() { AfterEach(func() {
groupcache.DeregisterGroup("db") groupcache.DeregisterGroup("db")
err = pgipfsethdb.ResetTestDB(db) err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
err = db.Close()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
@ -67,7 +76,7 @@ var _ = Describe("Database", func() {
Expect(has).ToNot(BeTrue()) Expect(has).ToNot(BeTrue())
}) })
It("returns true if a key-pair exists in the db", func() { It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) _, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testEthKey) has, err := database.Has(testEthKey)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
@ -82,7 +91,7 @@ var _ = Describe("Database", func() {
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
}) })
It("returns the value associated with the key, if the pair exists", func() { It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue) _, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testEthKey) val, err := database.Get(testEthKey)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -0,0 +1,29 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestPGIPFSETHDB(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "PG-IPFS ethdb test")
}

View File

@ -17,9 +17,8 @@
package pgipfsethdb package pgipfsethdb
import ( import (
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/boxo/blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" //postgres driver _ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
) )
@ -33,17 +32,3 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) {
dbKey := dshelp.MultihashToDsKey(mh) dbKey := dshelp.MultihashToDsKey(mh)
return blockstore.BlockPrefix.String() + dbKey.String(), nil return blockstore.BlockPrefix.String() + dbKey.String(), nil
} }
// TestDB connect to the testing database
// it assumes the database has the IPFS public.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db public.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
return err
}

View File

@ -29,7 +29,7 @@ func Keccak256ToCid(h []byte, codec uint64) (cid.Cid, error) {
if err != nil { if err != nil {
return cid.Cid{}, err return cid.Cid{}, err
} }
return cid.NewCidV1(codec, multihash.Multihash(buf)), nil return cid.NewCidV1(codec, buf), nil
} }
// NewBlock takes a keccak256 hash key and the rlp []byte value it was derived from and creates an ipfs block object // NewBlock takes a keccak256 hash key and the rlp []byte value it was derived from and creates an ipfs block object