Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
0e24a9f543 |
29
.github/workflows/issues-notion-sync.yml
vendored
Normal file
29
.github/workflows/issues-notion-sync.yml
vendored
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
name: Notion Sync
|
||||||
|
|
||||||
|
on:
|
||||||
|
workflow_dispatch:
|
||||||
|
issues:
|
||||||
|
types:
|
||||||
|
[
|
||||||
|
opened,
|
||||||
|
edited,
|
||||||
|
labeled,
|
||||||
|
unlabeled,
|
||||||
|
assigned,
|
||||||
|
unassigned,
|
||||||
|
milestoned,
|
||||||
|
demilestoned,
|
||||||
|
reopened,
|
||||||
|
closed,
|
||||||
|
]
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
notion_job:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
name: Add GitHub Issues to Notion
|
||||||
|
steps:
|
||||||
|
- name: Add GitHub Issues to Notion
|
||||||
|
uses: vulcanize/notion-github-action@v1.2.4-issueid
|
||||||
|
with:
|
||||||
|
notion-token: ${{ secrets.NOTION_TOKEN }}
|
||||||
|
notion-db: ${{ secrets.NOTION_DATABASE }}
|
10
README.md
10
README.md
@ -1,6 +1,6 @@
|
|||||||
## ipfs-ethdb
|
## ipfs-ethdb
|
||||||
|
|
||||||
[![Go Report Card](https://goreportcard.com/badge/github.com/cerc/ipfs-ethdb)](https://goreportcard.com/report/github.com/cerc-io/ipfs-ethdb)
|
[![Go Report Card](https://goreportcard.com/badge/github.com/vulcanize/ipfs-ethdb)](https://goreportcard.com/report/github.com/vulcanize/ipfs-ethdb)
|
||||||
|
|
||||||
> go-ethereum ethdb interfaces for Ethereum state data stored in IPFS
|
> go-ethereum ethdb interfaces for Ethereum state data stored in IPFS
|
||||||
|
|
||||||
@ -11,7 +11,7 @@ interfacing with a state database. These interfaces are used to build higher-lev
|
|||||||
which are used to perform the bulk of state related needs.
|
which are used to perform the bulk of state related needs.
|
||||||
|
|
||||||
Ethereum data can be stored on IPFS, standard codecs for Etheruem data are defined in the [go-cid](https://github.com/ipfs/go-cid) library.
|
Ethereum data can be stored on IPFS, standard codecs for Etheruem data are defined in the [go-cid](https://github.com/ipfs/go-cid) library.
|
||||||
Using our [statediffing geth client](https://github.com/cerc-io/go-ethereum/releases/tag/v1.9.11-statediff-0.0.2) it is feasible to extract every single
|
Using our [statediffing geth client](https://github.com/vulcanize/go-ethereum/releases/tag/v1.9.11-statediff-0.0.2) it is feasible to extract every single
|
||||||
state and storage node and publish it to IPFS.
|
state and storage node and publish it to IPFS.
|
||||||
|
|
||||||
Geth stores state data in leveldb as key-value pairs between the keccak256 hash of the rlp-encoded object and the rlp-encoded object.
|
Geth stores state data in leveldb as key-value pairs between the keccak256 hash of the rlp-encoded object and the rlp-encoded object.
|
||||||
@ -21,7 +21,7 @@ ethdb interfaces for Ethereum data on IPFS by handling the conversion of a kecca
|
|||||||
|
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
To use this module import it and build an ethdb interface around an instance of a [Go IPFS blockservice](https://pkg.go.dev/github.com/ipfs/boxo@v0.19.0/blockservice), you can then
|
To use this module import it and build an ethdb interface around an instance of a [go ipfs blockservice](https://github.com/ipfs/go-blockservice), you can then
|
||||||
employ it as you would the usual [leveldb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/leveldb) or [memorydb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/memorydb) ethdbs
|
employ it as you would the usual [leveldb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/leveldb) or [memorydb](https://github.com/ethereum/go-ethereum/tree/master/ethdb/memorydb) ethdbs
|
||||||
with some exceptions: the AncientReader, AncientWriter, Compacter, and Iteratee/Iterator interfaces are not functionally complete.
|
with some exceptions: the AncientReader, AncientWriter, Compacter, and Iteratee/Iterator interfaces are not functionally complete.
|
||||||
|
|
||||||
@ -46,11 +46,11 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
"github.com/ipfs/boxo/blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
"github.com/ipfs/go-ipfs/core"
|
"github.com/ipfs/go-ipfs/core"
|
||||||
"github.com/ipfs/go-ipfs/repo/fsrepo"
|
"github.com/ipfs/go-ipfs/repo/fsrepo"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/cerc-io/ipfs-ethdb/v5"
|
"github.com/vulcanize/ipfs-ethdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
7
batch.go
7
batch.go
@ -17,14 +17,13 @@
|
|||||||
package ipfsethdb
|
package ipfsethdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
"github.com/ipfs/boxo/blockservice"
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
blocks "github.com/ipfs/go-block-format"
|
||||||
|
"github.com/ipfs/go-blockservice"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -104,7 +103,7 @@ func (b *Batch) Write() error {
|
|||||||
}
|
}
|
||||||
puts[i] = b
|
puts[i] = b
|
||||||
}
|
}
|
||||||
if err := b.blockService.AddBlocks(context.Background(), puts); err != nil {
|
if err := b.blockService.AddBlocks(puts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, key := range b.deleteCache.Keys() {
|
for _, key := range b.deleteCache.Keys() {
|
||||||
@ -113,7 +112,7 @@ func (b *Batch) Write() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := b.blockService.DeleteBlock(context.Background(), c); err != nil {
|
if err := b.blockService.DeleteBlock(c); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
ipfsethdb "github.com/cerc-io/ipfs-ethdb/v5"
|
ipfsethdb "github.com/vulcanize/ipfs-ethdb/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
26
database.go
26
database.go
@ -20,10 +20,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ipfs/boxo/blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -68,7 +69,7 @@ func (d *Database) Has(key []byte) (bool, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return d.blockService.Blockstore().Has(context.Background(), c)
|
return d.blockService.Blockstore().Has(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get satisfies the ethdb.KeyValueReader interface
|
// Get satisfies the ethdb.KeyValueReader interface
|
||||||
@ -94,7 +95,7 @@ func (d *Database) Put(key []byte, value []byte) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return d.blockService.AddBlock(context.Background(), b)
|
return d.blockService.AddBlock(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete satisfies the ethdb.KeyValueWriter interface
|
// Delete satisfies the ethdb.KeyValueWriter interface
|
||||||
@ -105,7 +106,7 @@ func (d *Database) Delete(key []byte) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return d.blockService.DeleteBlock(context.Background(), c)
|
return d.blockService.DeleteBlock(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DatabaseProperty enum type
|
// DatabaseProperty enum type
|
||||||
@ -134,6 +135,9 @@ func (d *Database) Stat(property string) (string, error) {
|
|||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
switch prop {
|
switch prop {
|
||||||
|
case ExchangeOnline:
|
||||||
|
online := d.blockService.Exchange().IsOnline()
|
||||||
|
return strconv.FormatBool(online), nil
|
||||||
default:
|
default:
|
||||||
return "", fmt.Errorf("unhandled database property")
|
return "", fmt.Errorf("unhandled database property")
|
||||||
}
|
}
|
||||||
@ -215,9 +219,9 @@ func (d *Database) AncientSize(kind string) (uint64, error) {
|
|||||||
|
|
||||||
// AncientRange retrieves all the items in a range, starting from the index 'start'.
|
// AncientRange retrieves all the items in a range, starting from the index 'start'.
|
||||||
// It will return
|
// It will return
|
||||||
// - at most 'count' items,
|
// - at most 'count' items,
|
||||||
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
|
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
|
||||||
// return as many items as fit into maxBytes.
|
// return as many items as fit into maxBytes.
|
||||||
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
|
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
|
||||||
return nil, errNotSupported
|
return nil, errNotSupported
|
||||||
}
|
}
|
||||||
@ -229,14 +233,14 @@ func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error
|
|||||||
|
|
||||||
// TruncateHead satisfies the ethdb.AncientWriter interface.
|
// TruncateHead satisfies the ethdb.AncientWriter interface.
|
||||||
// TruncateHead discards all but the first n ancient data from the ancient store.
|
// TruncateHead discards all but the first n ancient data from the ancient store.
|
||||||
func (d *Database) TruncateHead(n uint64) (uint64, error) {
|
func (d *Database) TruncateHead(n uint64) error {
|
||||||
return 0, errNotSupported
|
return errNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
// TruncateTail satisfies the ethdb.AncientWriter interface.
|
// TruncateTail satisfies the ethdb.AncientWriter interface.
|
||||||
// TruncateTail discards the first n ancient data from the ancient store.
|
// TruncateTail discards the first n ancient data from the ancient store.
|
||||||
func (d *Database) TruncateTail(n uint64) (uint64, error) {
|
func (d *Database) TruncateTail(n uint64) error {
|
||||||
return 0, errNotSupported
|
return errNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync satisfies the ethdb.AncientWriter interface
|
// Sync satisfies the ethdb.AncientWriter interface
|
||||||
|
@ -22,11 +22,11 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ipfs/boxo/blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
ipfsethdb "github.com/cerc-io/ipfs-ethdb/v5"
|
ipfsethdb "github.com/vulcanize/ipfs-ethdb/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
97
go.mod
97
go.mod
@ -1,73 +1,64 @@
|
|||||||
module github.com/cerc-io/ipfs-ethdb/v5
|
module github.com/vulcanize/ipfs-ethdb/v3
|
||||||
|
|
||||||
go 1.21
|
go 1.18
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/ethereum/go-ethereum v1.13.14
|
github.com/ethereum/go-ethereum v1.10.18
|
||||||
github.com/hashicorp/golang-lru v1.0.2
|
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
|
||||||
github.com/ipfs/boxo v0.19.0
|
github.com/ipfs/go-block-format v0.0.3
|
||||||
github.com/ipfs/go-block-format v0.2.0
|
github.com/ipfs/go-blockservice v0.1.3
|
||||||
github.com/ipfs/go-cid v0.4.1
|
github.com/ipfs/go-cid v0.0.7
|
||||||
github.com/ipfs/go-ipfs-ds-help v1.1.0
|
github.com/ipfs/go-ipfs-blockstore v1.0.1
|
||||||
|
github.com/ipfs/go-ipfs-ds-help v1.0.0
|
||||||
|
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
|
||||||
github.com/jmoiron/sqlx v1.3.5
|
github.com/jmoiron/sqlx v1.3.5
|
||||||
github.com/lib/pq v1.10.9
|
github.com/lib/pq v1.10.5
|
||||||
github.com/mailgun/groupcache/v2 v2.3.0
|
github.com/mailgun/groupcache/v2 v2.3.0
|
||||||
github.com/multiformats/go-multihash v0.2.3
|
github.com/multiformats/go-multihash v0.1.0
|
||||||
github.com/onsi/ginkgo v1.16.5
|
github.com/onsi/ginkgo v1.16.5
|
||||||
github.com/onsi/gomega v1.19.0
|
github.com/onsi/gomega v1.19.0
|
||||||
github.com/sirupsen/logrus v1.6.0
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
require github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/bits-and-blooms/bitset v1.10.0 // indirect
|
github.com/btcsuite/btcd v0.22.0-beta // indirect
|
||||||
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
|
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
|
||||||
github.com/consensys/bavard v0.1.13 // indirect
|
github.com/fsnotify/fsnotify v1.4.9 // indirect
|
||||||
github.com/consensys/gnark-crypto v0.12.1 // indirect
|
github.com/gogo/protobuf v1.3.1 // indirect
|
||||||
github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect
|
github.com/golang/protobuf v1.5.2 // indirect
|
||||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
github.com/google/uuid v1.2.0 // indirect
|
||||||
github.com/ethereum/c-kzg-4844 v0.4.0 // indirect
|
|
||||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
|
||||||
github.com/go-logr/logr v1.4.1 // indirect
|
|
||||||
github.com/go-logr/stdr v1.2.2 // indirect
|
|
||||||
github.com/golang/protobuf v1.5.3 // indirect
|
|
||||||
github.com/google/uuid v1.6.0 // indirect
|
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
|
|
||||||
github.com/holiman/uint256 v1.2.4 // indirect
|
|
||||||
github.com/ipfs/bbloom v0.0.4 // indirect
|
github.com/ipfs/bbloom v0.0.4 // indirect
|
||||||
github.com/ipfs/go-datastore v0.6.0 // indirect
|
github.com/ipfs/go-datastore v0.4.4 // indirect
|
||||||
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
|
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
|
||||||
github.com/ipfs/go-ipld-format v0.6.0 // indirect
|
github.com/ipfs/go-log v0.0.1 // indirect
|
||||||
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
|
||||||
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
|
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
|
||||||
github.com/jbenet/goprocess v0.1.4 // indirect
|
github.com/ipfs/go-verifcid v0.0.1 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
|
github.com/jbenet/goprocess v0.1.3 // indirect
|
||||||
|
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
|
||||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
|
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-colorable v0.1.8 // indirect
|
||||||
github.com/minio/sha256-simd v1.0.1 // indirect
|
github.com/mattn/go-isatty v0.0.12 // indirect
|
||||||
github.com/mmcloughlin/addchain v0.4.0 // indirect
|
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
|
||||||
|
github.com/minio/sha256-simd v1.0.0 // indirect
|
||||||
github.com/mr-tron/base58 v1.2.0 // indirect
|
github.com/mr-tron/base58 v1.2.0 // indirect
|
||||||
github.com/multiformats/go-base32 v0.1.0 // indirect
|
github.com/multiformats/go-base32 v0.0.3 // indirect
|
||||||
github.com/multiformats/go-base36 v0.2.0 // indirect
|
github.com/multiformats/go-base36 v0.1.0 // indirect
|
||||||
github.com/multiformats/go-multibase v0.2.0 // indirect
|
github.com/multiformats/go-multibase v0.0.3 // indirect
|
||||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
github.com/multiformats/go-varint v0.0.6 // indirect
|
||||||
github.com/nxadm/tail v1.4.8 // indirect
|
github.com/nxadm/tail v1.4.8 // indirect
|
||||||
|
github.com/opentracing/opentracing-go v1.1.0 // indirect
|
||||||
github.com/segmentio/fasthash v1.0.3 // indirect
|
github.com/segmentio/fasthash v1.0.3 // indirect
|
||||||
|
github.com/sirupsen/logrus v1.6.0 // indirect
|
||||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||||
github.com/supranational/blst v0.3.11 // indirect
|
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc // indirect
|
||||||
go.opentelemetry.io/otel v1.25.0 // indirect
|
go.uber.org/atomic v1.6.0 // indirect
|
||||||
go.opentelemetry.io/otel/metric v1.25.0 // indirect
|
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
|
||||||
go.opentelemetry.io/otel/trace v1.25.0 // indirect
|
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
|
||||||
go.uber.org/zap v1.27.0 // indirect
|
golang.org/x/text v0.3.7 // indirect
|
||||||
golang.org/x/crypto v0.22.0 // indirect
|
google.golang.org/protobuf v1.26.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect
|
|
||||||
golang.org/x/net v0.21.0 // indirect
|
|
||||||
golang.org/x/sync v0.6.0 // indirect
|
|
||||||
golang.org/x/sys v0.19.0 // indirect
|
|
||||||
golang.org/x/text v0.14.0 // indirect
|
|
||||||
google.golang.org/protobuf v1.32.0 // indirect
|
|
||||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||||
lukechampine.com/blake3 v1.2.2 // indirect
|
lukechampine.com/blake3 v1.1.6 // indirect
|
||||||
rsc.io/tmplfunc v0.0.3 // indirect
|
|
||||||
)
|
)
|
||||||
|
@ -20,7 +20,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ipfs/boxo/blockservice"
|
"github.com/ipfs/go-blockservice"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ ethdb.Iterator = &Iterator{}
|
var _ ethdb.Iterator = &Iterator{}
|
||||||
|
@ -20,11 +20,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"github.com/ipfs/boxo/blockservice"
|
|
||||||
"github.com/ipfs/boxo/blockstore"
|
|
||||||
"github.com/ipfs/boxo/exchange"
|
|
||||||
blocks "github.com/ipfs/go-block-format"
|
blocks "github.com/ipfs/go-block-format"
|
||||||
|
"github.com/ipfs/go-blockservice"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
|
exchange "github.com/ipfs/go-ipfs-exchange-interface"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -52,27 +52,27 @@ func (mbs *MockBlockservice) Exchange() exchange.Interface {
|
|||||||
panic("Exchange: implement me")
|
panic("Exchange: implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockservice) AddBlock(ctx context.Context, b blocks.Block) error {
|
func (mbs *MockBlockservice) AddBlock(b blocks.Block) error {
|
||||||
return mbs.blockStore.Put(ctx, b)
|
return mbs.blockStore.Put(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockservice) AddBlocks(ctx context.Context, bs []blocks.Block) error {
|
func (mbs *MockBlockservice) AddBlocks(bs []blocks.Block) error {
|
||||||
return mbs.blockStore.PutMany(ctx, bs)
|
return mbs.blockStore.PutMany(bs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockservice) DeleteBlock(ctx context.Context, c cid.Cid) error {
|
func (mbs *MockBlockservice) DeleteBlock(c cid.Cid) error {
|
||||||
return mbs.blockStore.DeleteBlock(ctx, c)
|
return mbs.blockStore.DeleteBlock(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockservice) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
|
func (mbs *MockBlockservice) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
|
||||||
return mbs.blockStore.Get(ctx, c)
|
return mbs.blockStore.Get(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockservice) GetBlocks(ctx context.Context, cs []cid.Cid) <-chan blocks.Block {
|
func (mbs *MockBlockservice) GetBlocks(ctx context.Context, cs []cid.Cid) <-chan blocks.Block {
|
||||||
blockChan := make(chan blocks.Block)
|
blockChan := make(chan blocks.Block)
|
||||||
go func() {
|
go func() {
|
||||||
for _, c := range cs {
|
for _, c := range cs {
|
||||||
if b, err := mbs.blockStore.Get(ctx, c); err == nil {
|
if b, err := mbs.blockStore.Get(c); err == nil {
|
||||||
blockChan <- b
|
blockChan <- b
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -93,17 +93,17 @@ type MockBlockstore struct {
|
|||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
|
func (mbs *MockBlockstore) DeleteBlock(c cid.Cid) error {
|
||||||
delete(mbs.blocks, c.String())
|
delete(mbs.blocks, c.String())
|
||||||
return mbs.err
|
return mbs.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
|
func (mbs *MockBlockstore) Has(c cid.Cid) (bool, error) {
|
||||||
_, ok := mbs.blocks[c.String()]
|
_, ok := mbs.blocks[c.String()]
|
||||||
return ok, mbs.err
|
return ok, mbs.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
|
func (mbs *MockBlockstore) Get(c cid.Cid) (blocks.Block, error) {
|
||||||
obj, ok := mbs.blocks[c.String()]
|
obj, ok := mbs.blocks[c.String()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, blockNotFoundErr
|
return nil, blockNotFoundErr
|
||||||
@ -111,7 +111,7 @@ func (mbs *MockBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, er
|
|||||||
return obj, mbs.err
|
return obj, mbs.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
|
func (mbs *MockBlockstore) GetSize(c cid.Cid) (int, error) {
|
||||||
obj, ok := mbs.blocks[c.String()]
|
obj, ok := mbs.blocks[c.String()]
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, blockNotFoundErr
|
return 0, blockNotFoundErr
|
||||||
@ -119,12 +119,12 @@ func (mbs *MockBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error)
|
|||||||
return len(obj.RawData()), mbs.err
|
return len(obj.RawData()), mbs.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockstore) Put(ctx context.Context, b blocks.Block) error {
|
func (mbs *MockBlockstore) Put(b blocks.Block) error {
|
||||||
mbs.blocks[b.Cid().String()] = b
|
mbs.blocks[b.Cid().String()] = b
|
||||||
return mbs.err
|
return mbs.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mbs *MockBlockstore) PutMany(ctx context.Context, bs []blocks.Block) error {
|
func (mbs *MockBlockstore) PutMany(bs []blocks.Block) error {
|
||||||
for _, b := range bs {
|
for _, b := range bs {
|
||||||
mbs.blocks[b.Cid().String()] = b
|
mbs.blocks[b.Cid().String()] = b
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,6 @@
|
|||||||
package pgipfsethdb
|
package pgipfsethdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math/big"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
@ -30,16 +28,13 @@ type Batch struct {
|
|||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
tx *sqlx.Tx
|
tx *sqlx.Tx
|
||||||
valueSize int
|
valueSize int
|
||||||
|
|
||||||
blockNumber *big.Int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
||||||
func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
|
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
|
||||||
b := &Batch{
|
b := &Batch{
|
||||||
db: db,
|
db: db,
|
||||||
tx: tx,
|
tx: tx,
|
||||||
blockNumber: blockNumber,
|
|
||||||
}
|
}
|
||||||
if tx == nil {
|
if tx == nil {
|
||||||
b.Reset()
|
b.Reset()
|
||||||
@ -55,7 +50,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil {
|
if _, err = b.tx.Exec(putPgStr, mhKey, value); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
b.valueSize += len(value)
|
b.valueSize += len(value)
|
@ -20,8 +20,6 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
@ -29,7 +27,7 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
|
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v3/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -41,7 +39,7 @@ var (
|
|||||||
|
|
||||||
var _ = Describe("Batch", func() {
|
var _ = Describe("Batch", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
db, err = shared.TestDB()
|
db, err = pgipfsethdb.TestDB()
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
cacheConfig := pgipfsethdb.CacheConfig{
|
cacheConfig := pgipfsethdb.CacheConfig{
|
||||||
@ -51,16 +49,11 @@ var _ = Describe("Batch", func() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
database = pgipfsethdb.NewDatabase(db, cacheConfig)
|
database = pgipfsethdb.NewDatabase(db, cacheConfig)
|
||||||
|
|
||||||
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
|
|
||||||
Expect(ok).To(BeTrue())
|
|
||||||
(*databaseWithBlock).BlockNumber = testBlockNumber
|
|
||||||
|
|
||||||
batch = database.NewBatch()
|
batch = database.NewBatch()
|
||||||
})
|
})
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
groupcache.DeregisterGroup("db")
|
groupcache.DeregisterGroup("db")
|
||||||
err = shared.ResetTestDB(db)
|
err = pgipfsethdb.ResetTestDB(db)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
@ -18,10 +18,8 @@ package pgipfsethdb
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -29,23 +27,16 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/mailgun/groupcache/v2"
|
"github.com/mailgun/groupcache/v2"
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var errNotSupported = errors.New("this operation is not supported")
|
var errNotSupported = errors.New("this operation is not supported")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
hasPgStr = "SELECT exists(select 1 from ipld.blocks WHERE key = $1 LIMIT 1)"
|
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1)"
|
||||||
getPgStr = "SELECT data FROM ipld.blocks WHERE key = $1 LIMIT 1"
|
getPgStr = "SELECT data FROM public.blocks WHERE key = $1"
|
||||||
putPgStr = "INSERT INTO ipld.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
|
putPgStr = "INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING"
|
||||||
deletePgStr = "DELETE FROM ipld.blocks WHERE key = $1"
|
deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
|
||||||
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
||||||
|
|
||||||
DefaultCacheConfig = CacheConfig{
|
|
||||||
Name: "db",
|
|
||||||
Size: 3000000, // 3MB
|
|
||||||
ExpiryDuration: time.Hour,
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ ethdb.Database = &Database{}
|
var _ ethdb.Database = &Database{}
|
||||||
@ -54,8 +45,6 @@ var _ ethdb.Database = &Database{}
|
|||||||
type Database struct {
|
type Database struct {
|
||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
cache *groupcache.Group
|
cache *groupcache.Group
|
||||||
|
|
||||||
BlockNumber *big.Int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
|
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
|
||||||
@ -121,12 +110,7 @@ func (d *Database) Has(key []byte) (bool, error) {
|
|||||||
// Get retrieves the given key if it's present in the key-value data store
|
// Get retrieves the given key if it's present in the key-value data store
|
||||||
func (d *Database) dbGet(key string) ([]byte, error) {
|
func (d *Database) dbGet(key string) ([]byte, error) {
|
||||||
var data []byte
|
var data []byte
|
||||||
err := d.db.Get(&data, getPgStr, key)
|
return data, d.db.Get(&data, getPgStr, key)
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
log.Warn("Database miss for key", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
return data, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get satisfies the ethdb.KeyValueReader interface
|
// Get satisfies the ethdb.KeyValueReader interface
|
||||||
@ -152,7 +136,7 @@ func (d *Database) Put(key []byte, value []byte) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64())
|
_, err = d.db.Exec(putPgStr, mhKey, value)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,13 +245,13 @@ func (d *Database) Compact(start []byte, limit []byte) error {
|
|||||||
// NewBatch creates a write-only database that buffers changes to its host db
|
// NewBatch creates a write-only database that buffers changes to its host db
|
||||||
// until a final write is called
|
// until a final write is called
|
||||||
func (d *Database) NewBatch() ethdb.Batch {
|
func (d *Database) NewBatch() ethdb.Batch {
|
||||||
return NewBatch(d.db, nil, d.BlockNumber)
|
return NewBatch(d.db, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBatchWithSize satisfies the ethdb.Batcher interface.
|
// NewBatchWithSize satisfies the ethdb.Batcher interface.
|
||||||
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
|
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
|
||||||
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
|
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
|
||||||
return NewBatch(d.db, nil, d.BlockNumber)
|
return NewBatch(d.db, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewIterator satisfies the ethdb.Iteratee interface
|
// NewIterator satisfies the ethdb.Iteratee interface
|
||||||
@ -319,9 +303,9 @@ func (d *Database) AncientSize(kind string) (uint64, error) {
|
|||||||
|
|
||||||
// AncientRange retrieves all the items in a range, starting from the index 'start'.
|
// AncientRange retrieves all the items in a range, starting from the index 'start'.
|
||||||
// It will return
|
// It will return
|
||||||
// - at most 'count' items,
|
// - at most 'count' items,
|
||||||
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
|
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
|
||||||
// return as many items as fit into maxBytes.
|
// return as many items as fit into maxBytes.
|
||||||
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
|
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
|
||||||
return nil, errNotSupported
|
return nil, errNotSupported
|
||||||
}
|
}
|
||||||
@ -333,14 +317,14 @@ func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error
|
|||||||
|
|
||||||
// TruncateHead satisfies the ethdb.AncientWriter interface.
|
// TruncateHead satisfies the ethdb.AncientWriter interface.
|
||||||
// TruncateHead discards all but the first n ancient data from the ancient store.
|
// TruncateHead discards all but the first n ancient data from the ancient store.
|
||||||
func (d *Database) TruncateHead(n uint64) (uint64, error) {
|
func (d *Database) TruncateHead(n uint64) error {
|
||||||
return 0, errNotSupported
|
return errNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
// TruncateTail satisfies the ethdb.AncientWriter interface.
|
// TruncateTail satisfies the ethdb.AncientWriter interface.
|
||||||
// TruncateTail discards the first n ancient data from the ancient store.
|
// TruncateTail discards the first n ancient data from the ancient store.
|
||||||
func (d *Database) TruncateTail(n uint64) (uint64, error) {
|
func (d *Database) TruncateTail(n uint64) error {
|
||||||
return 0, errNotSupported
|
return errNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync satisfies the ethdb.AncientWriter interface
|
// Sync satisfies the ethdb.AncientWriter interface
|
@ -20,8 +20,6 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
@ -30,23 +28,22 @@ import (
|
|||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
|
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/v3/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
database ethdb.Database
|
database ethdb.Database
|
||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
err error
|
err error
|
||||||
testBlockNumber = big.NewInt(1337)
|
testHeader = types.Header{Number: big.NewInt(1337)}
|
||||||
testHeader = types.Header{Number: testBlockNumber}
|
testValue, _ = rlp.EncodeToBytes(testHeader)
|
||||||
testValue, _ = rlp.EncodeToBytes(testHeader)
|
testEthKey = testHeader.Hash().Bytes()
|
||||||
testEthKey = testHeader.Hash().Bytes()
|
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
|
||||||
testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Describe("Database", func() {
|
var _ = Describe("Database", func() {
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
db, err = shared.TestDB()
|
db, err = pgipfsethdb.TestDB()
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
cacheConfig := pgipfsethdb.CacheConfig{
|
cacheConfig := pgipfsethdb.CacheConfig{
|
||||||
@ -56,16 +53,10 @@ var _ = Describe("Database", func() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
database = pgipfsethdb.NewDatabase(db, cacheConfig)
|
database = pgipfsethdb.NewDatabase(db, cacheConfig)
|
||||||
|
|
||||||
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
|
|
||||||
Expect(ok).To(BeTrue())
|
|
||||||
(*databaseWithBlock).BlockNumber = testBlockNumber
|
|
||||||
})
|
})
|
||||||
AfterEach(func() {
|
AfterEach(func() {
|
||||||
groupcache.DeregisterGroup("db")
|
groupcache.DeregisterGroup("db")
|
||||||
err = shared.ResetTestDB(db)
|
err = pgipfsethdb.ResetTestDB(db)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = db.Close()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -76,7 +67,7 @@ var _ = Describe("Database", func() {
|
|||||||
Expect(has).ToNot(BeTrue())
|
Expect(has).ToNot(BeTrue())
|
||||||
})
|
})
|
||||||
It("returns true if a key-pair exists in the db", func() {
|
It("returns true if a key-pair exists in the db", func() {
|
||||||
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
|
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
has, err := database.Has(testEthKey)
|
has, err := database.Has(testEthKey)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
@ -91,7 +82,7 @@ var _ = Describe("Database", func() {
|
|||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
})
|
})
|
||||||
It("returns the value associated with the key, if the pair exists", func() {
|
It("returns the value associated with the key, if the pair exists", func() {
|
||||||
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
|
_, err = db.Exec("INSERT into public.blocks (key, data) VALUES ($1, $2)", testMhKey, testValue)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
val, err := database.Get(testEthKey)
|
val, err := database.Get(testEthKey)
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
@ -1,6 +1,6 @@
|
|||||||
## ipfs-ethdb
|
## ipfs-ethdb
|
||||||
|
|
||||||
IPFS has been [extended](https://github.com/cerc-io/go-ipfs/releases/tag/v0.4.22-alpha) to [use Postgres](https://github.com/cerc-io/go-ipfs-config/releases/tag/v0.0.8-alpha) as a backing [datastore](https://github.com/ipfs/go-ds-sql/tree/master/postgres).
|
IPFS has been [extended](https://github.com/vulcanize/go-ipfs/releases/tag/v0.4.22-alpha) to [use Postgres](https://github.com/vulcanize/go-ipfs-config/releases/tag/v0.0.8-alpha) as a backing [datastore](https://github.com/ipfs/go-ds-sql/tree/master/postgres).
|
||||||
Interfacing directly with the IPFS-backing Postgres database has some advantages over using the blockservice interface.
|
Interfacing directly with the IPFS-backing Postgres database has some advantages over using the blockservice interface.
|
||||||
Namely, batching of IPFS writes with other Postgres writes and avoiding lock contention on the ipfs repository (lockfile located at the `IPFS_PATH`).
|
Namely, batching of IPFS writes with other Postgres writes and avoiding lock contention on the ipfs repository (lockfile located at the `IPFS_PATH`).
|
||||||
The downside is that we forgo the block-exchange capabilities of the blockservice, and are only able to fetch data contained in the local datastore.
|
The downside is that we forgo the block-exchange capabilities of the blockservice, and are only able to fetch data contained in the local datastore.
|
||||||
@ -18,11 +18,11 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
|
"github.com/vulcanize/ipfs-ethdb/v3/postgres"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
connectStr := "postgresql://vdbm:password@localhost:8077/cerc_testing?sslmode=disable"
|
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
|
||||||
db, _ := sqlx.Connect("postgres", connectStr)
|
db, _ := sqlx.Connect("postgres", connectStr)
|
||||||
|
|
||||||
kvs := pgipfsethdb.NewKeyValueStore(db)
|
kvs := pgipfsethdb.NewKeyValueStore(db)
|
||||||
@ -31,11 +31,7 @@ func main() {
|
|||||||
trieNodeIterator := t.NodeIterator([]byte{})
|
trieNodeIterator := t.NodeIterator([]byte{})
|
||||||
// do stuff with trie node iterator
|
// do stuff with trie node iterator
|
||||||
|
|
||||||
database := pgipfsethdb.NewDatabase(db, pgipfsethdb.CacheConfig{
|
database := pgipfsethdb.NewDatabase(db)
|
||||||
Name: "db",
|
|
||||||
Size: 3000000, // 3MB
|
|
||||||
ExpiryDuration: time.Hour,
|
|
||||||
})
|
|
||||||
stateDatabase := state.NewDatabase(database)
|
stateDatabase := state.NewDatabase(database)
|
||||||
stateDB, _ := state.New(common.Hash{}, stateDatabase, nil)
|
stateDB, _ := state.New(common.Hash{}, stateDatabase, nil)
|
||||||
stateDBNodeIterator := state.NewNodeIterator(stateDB)
|
stateDBNodeIterator := state.NewNodeIterator(stateDB)
|
||||||
|
@ -1,42 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2023 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package shared
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
)
|
|
||||||
|
|
||||||
/*
|
|
||||||
Hostname: "localhost",
|
|
||||||
Port: 8077,
|
|
||||||
DatabaseName: "cerc_testing",
|
|
||||||
Username: "vdbm",
|
|
||||||
Password: "password",
|
|
||||||
*/
|
|
||||||
// TestDB connect to the testing database
|
|
||||||
// it assumes the database has the IPFS ipld.blocks table present
|
|
||||||
// DO NOT use a production db for the test db, as it will remove all contents of the ipld.blocks table
|
|
||||||
func TestDB() (*sqlx.DB, error) {
|
|
||||||
connectStr := "postgresql://vdbm:password@localhost:8077/cerc_testing?sslmode=disable"
|
|
||||||
return sqlx.Connect("postgres", connectStr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// ResetTestDB drops all rows in the test db ipld.blocks table
|
|
||||||
func ResetTestDB(db *sqlx.DB) error {
|
|
||||||
_, err := db.Exec("TRUNCATE ipld.blocks CASCADE")
|
|
||||||
return err
|
|
||||||
}
|
|
@ -17,8 +17,9 @@
|
|||||||
package pgipfsethdb
|
package pgipfsethdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
blockstore "github.com/ipfs/boxo/blockstore"
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
_ "github.com/lib/pq" //postgres driver
|
_ "github.com/lib/pq" //postgres driver
|
||||||
"github.com/multiformats/go-multihash"
|
"github.com/multiformats/go-multihash"
|
||||||
)
|
)
|
||||||
@ -32,3 +33,17 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) {
|
|||||||
dbKey := dshelp.MultihashToDsKey(mh)
|
dbKey := dshelp.MultihashToDsKey(mh)
|
||||||
return blockstore.BlockPrefix.String() + dbKey.String(), nil
|
return blockstore.BlockPrefix.String() + dbKey.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestDB connect to the testing database
|
||||||
|
// it assumes the database has the IPFS public.blocks table present
|
||||||
|
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
|
||||||
|
func TestDB() (*sqlx.DB, error) {
|
||||||
|
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
|
||||||
|
return sqlx.Connect("postgres", connectStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetTestDB drops all rows in the test db public.blocks table
|
||||||
|
func ResetTestDB(db *sqlx.DB) error {
|
||||||
|
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
|
||||||
|
return err
|
||||||
|
}
|
@ -1,117 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2020 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package pgipfsethdb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/big"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ ethdb.Batch = &Batch{}
|
|
||||||
|
|
||||||
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
|
|
||||||
type Batch struct {
|
|
||||||
db *sqlx.DB
|
|
||||||
tx *sqlx.Tx
|
|
||||||
valueSize int
|
|
||||||
|
|
||||||
blockNumber *big.Int
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
|
||||||
func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
|
|
||||||
b := &Batch{
|
|
||||||
db: db,
|
|
||||||
tx: tx,
|
|
||||||
blockNumber: blockNumber,
|
|
||||||
}
|
|
||||||
if tx == nil {
|
|
||||||
b.Reset()
|
|
||||||
}
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put satisfies the ethdb.Batch interface
|
|
||||||
// Put inserts the given value into the key-value data store
|
|
||||||
// Key is expected to be a fully formulated cid key
|
|
||||||
// TODO: note, now that we expected a cid we could route to the "cids" tables based on prefix instead of to ipld.blocks
|
|
||||||
// but is it better to handle this routing here, or use a completely different interface since we already have to refactor
|
|
||||||
// at levels above this package in order to pass in cids instead of raw keccak256 hashes
|
|
||||||
func (b *Batch) Put(cidBytes []byte, value []byte) (err error) {
|
|
||||||
// cast and resolve strings from cid.Cast
|
|
||||||
// this will assert that we have a correctly formatted CID
|
|
||||||
// and will handle the different string encodings for v0 and v1 CIDs
|
|
||||||
// (note that this v0 vs v1 is different from the blockstore v0 vs v1)
|
|
||||||
c, err := cid.Cast(cidBytes)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err = b.tx.Exec(putPgStr, c.String(), value, b.blockNumber.Uint64()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
b.valueSize += len(value)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete satisfies the ethdb.Batch interface
|
|
||||||
// Delete removes the key from the key-value data store
|
|
||||||
func (b *Batch) Delete(cidBytes []byte) (err error) {
|
|
||||||
c, err := cid.Cast(cidBytes)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = b.tx.Exec(deletePgStr, c.String())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// ValueSize satisfies the ethdb.Batch interface
|
|
||||||
// ValueSize retrieves the amount of data queued up for writing
|
|
||||||
// The returned value is the total byte length of all data queued to write
|
|
||||||
func (b *Batch) ValueSize() int {
|
|
||||||
return b.valueSize
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write satisfies the ethdb.Batch interface
|
|
||||||
// Write flushes any accumulated data to disk
|
|
||||||
func (b *Batch) Write() error {
|
|
||||||
if b.tx == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return b.tx.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Replay satisfies the ethdb.Batch interface
|
|
||||||
// Replay replays the batch contents
|
|
||||||
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
|
|
||||||
return errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset satisfies the ethdb.Batch interface
|
|
||||||
// Reset resets the batch for reuse
|
|
||||||
// This should be called after every write
|
|
||||||
func (b *Batch) Reset() {
|
|
||||||
var err error
|
|
||||||
b.tx, err = b.db.Beginx()
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
b.valueSize = 0
|
|
||||||
}
|
|
@ -1,138 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package pgipfsethdb_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/big"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
|
||||||
"github.com/mailgun/groupcache/v2"
|
|
||||||
. "github.com/onsi/ginkgo"
|
|
||||||
. "github.com/onsi/gomega"
|
|
||||||
|
|
||||||
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v0"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
batch ethdb.Batch
|
|
||||||
testHeader2 = types.Header{Number: big.NewInt(2)}
|
|
||||||
testValue2, _ = rlp.EncodeToBytes(testHeader2)
|
|
||||||
testEthKey2 = testHeader2.Hash().Bytes()
|
|
||||||
testCID2, _ = pgipfsethdb.CIDFromKeccak256(testEthKey2, cid.EthBlock)
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ = Describe("Batch", func() {
|
|
||||||
BeforeEach(func() {
|
|
||||||
db, err = shared.TestDB()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
cacheConfig := pgipfsethdb.CacheConfig{
|
|
||||||
Name: "db",
|
|
||||||
Size: 3000000, // 3MB
|
|
||||||
ExpiryDuration: time.Hour,
|
|
||||||
}
|
|
||||||
|
|
||||||
database = pgipfsethdb.NewDatabase(db, cacheConfig)
|
|
||||||
|
|
||||||
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
|
|
||||||
Expect(ok).To(BeTrue())
|
|
||||||
(*databaseWithBlock).BlockNumber = testBlockNumber
|
|
||||||
|
|
||||||
batch = database.NewBatch()
|
|
||||||
})
|
|
||||||
AfterEach(func() {
|
|
||||||
groupcache.DeregisterGroup("db")
|
|
||||||
err = shared.ResetTestDB(db)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("Put/Write", func() {
|
|
||||||
It("adds the key-value pair to the batch", func() {
|
|
||||||
_, err = database.Get(testCID.Bytes())
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
|
||||||
_, err = database.Get(testCID2.Bytes())
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
|
||||||
|
|
||||||
err = batch.Put(testCID.Bytes(), testValue)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = batch.Put(testCID2.Bytes(), testValue2)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = batch.Write()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
val, err := database.Get(testCID.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(val).To(Equal(testValue))
|
|
||||||
val2, err := database.Get(testCID2.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(val2).To(Equal(testValue2))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("Delete/Reset/Write", func() {
|
|
||||||
It("deletes the key-value pair in the batch", func() {
|
|
||||||
err = batch.Put(testCID.Bytes(), testValue)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = batch.Put(testCID2.Bytes(), testValue2)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = batch.Write()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
batch.Reset()
|
|
||||||
err = batch.Delete(testCID.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = batch.Delete(testCID2.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = batch.Write()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
_, err = database.Get(testCID.Bytes())
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
|
||||||
_, err = database.Get(testCID2.Bytes())
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("ValueSize/Reset", func() {
|
|
||||||
It("returns the size of data in the batch queued for write", func() {
|
|
||||||
err = batch.Put(testCID.Bytes(), testValue)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = batch.Put(testCID2.Bytes(), testValue2)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = batch.Write()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
size := batch.ValueSize()
|
|
||||||
Expect(size).To(Equal(len(testValue) + len(testValue2)))
|
|
||||||
|
|
||||||
batch.Reset()
|
|
||||||
size = batch.ValueSize()
|
|
||||||
Expect(size).To(Equal(0))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
@ -1,365 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2020 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package pgipfsethdb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"math/big"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
"github.com/mailgun/groupcache/v2"
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
)
|
|
||||||
|
|
||||||
var errNotSupported = errors.New("this operation is not supported")
|
|
||||||
|
|
||||||
var (
|
|
||||||
hasPgStr = "SELECT exists(select 1 from ipld.blocks WHERE key = $1 LIMIT 1)"
|
|
||||||
getPgStr = "SELECT data FROM ipld.blocks WHERE key = $1 LIMIT 1"
|
|
||||||
putPgStr = "INSERT INTO ipld.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
|
|
||||||
deletePgStr = "DELETE FROM ipld.blocks WHERE key = $1"
|
|
||||||
dbSizePgStr = "SELECT pg_database_size(current_database())"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ ethdb.Database = &Database{}
|
|
||||||
|
|
||||||
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
|
|
||||||
type Database struct {
|
|
||||||
db *sqlx.DB
|
|
||||||
cache *groupcache.Group
|
|
||||||
|
|
||||||
BlockNumber *big.Int
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
|
|
||||||
return 0, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
type CacheConfig struct {
|
|
||||||
Name string
|
|
||||||
Size int
|
|
||||||
ExpiryDuration time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
|
|
||||||
func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore {
|
|
||||||
database := Database{db: db}
|
|
||||||
database.InitCache(cacheConfig)
|
|
||||||
|
|
||||||
return &database
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewDatabase returns a ethdb.Database interface for PG-IPFS
|
|
||||||
func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database {
|
|
||||||
database := Database{db: db}
|
|
||||||
database.InitCache(cacheConfig)
|
|
||||||
|
|
||||||
return &database
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Database) InitCache(cacheConfig CacheConfig) {
|
|
||||||
d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc(
|
|
||||||
func(_ context.Context, id string, dest groupcache.Sink) error {
|
|
||||||
val, err := d.dbGet(id)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Set the value in the groupcache, with expiry
|
|
||||||
if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
},
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Database) GetCacheStats() groupcache.Stats {
|
|
||||||
return d.cache.Stats
|
|
||||||
}
|
|
||||||
|
|
||||||
// Has satisfies the ethdb.KeyValueReader interface
|
|
||||||
// Has retrieves if a cid is present in the key-value data store
|
|
||||||
func (d *Database) Has(cidBytes []byte) (bool, error) {
|
|
||||||
c, err := cid.Cast(cidBytes)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
var exists bool
|
|
||||||
return exists, d.db.Get(&exists, hasPgStr, c.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves the given key if it's present in the key-value data store
|
|
||||||
func (d *Database) dbGet(key string) ([]byte, error) {
|
|
||||||
var data []byte
|
|
||||||
err := d.db.Get(&data, getPgStr, key)
|
|
||||||
if err == sql.ErrNoRows {
|
|
||||||
log.Warn("Database miss for key ", key)
|
|
||||||
}
|
|
||||||
|
|
||||||
return data, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get satisfies the ethdb.KeyValueReader interface
|
|
||||||
// Get retrieves the given cid if it's present in the key-value data store
|
|
||||||
func (d *Database) Get(cidBytes []byte) ([]byte, error) {
|
|
||||||
c, err := cid.Cast(cidBytes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
var data []byte
|
|
||||||
return data, d.cache.Get(ctx, c.String(), groupcache.AllocatingByteSliceSink(&data))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put satisfies the ethdb.KeyValueWriter interface
|
|
||||||
// Put inserts the given value into the key-value data store
|
|
||||||
// Key is expected to be a fully formulated cis of value
|
|
||||||
func (d *Database) Put(cidBytes []byte, value []byte) error {
|
|
||||||
c, err := cid.Cast(cidBytes)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
_, err = d.db.Exec(putPgStr, c.String(), value, d.BlockNumber.Uint64())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete satisfies the ethdb.KeyValueWriter interface
|
|
||||||
// Delete removes the cid from the key-value data store
|
|
||||||
func (d *Database) Delete(cidBytes []byte) error {
|
|
||||||
c, err := cid.Cast(cidBytes)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
cidString := c.String()
|
|
||||||
|
|
||||||
_, err = d.db.Exec(deletePgStr, cidString)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove from cache.
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
|
|
||||||
defer cancel()
|
|
||||||
err = d.cache.Remove(ctx, cidString)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// DatabaseProperty enum type
|
|
||||||
type DatabaseProperty int
|
|
||||||
|
|
||||||
const (
|
|
||||||
Unknown DatabaseProperty = iota
|
|
||||||
Size
|
|
||||||
Idle
|
|
||||||
InUse
|
|
||||||
MaxIdleClosed
|
|
||||||
MaxLifetimeClosed
|
|
||||||
MaxOpenConnections
|
|
||||||
OpenConnections
|
|
||||||
WaitCount
|
|
||||||
WaitDuration
|
|
||||||
)
|
|
||||||
|
|
||||||
// DatabasePropertyFromString helper function
|
|
||||||
func DatabasePropertyFromString(property string) (DatabaseProperty, error) {
|
|
||||||
switch strings.ToLower(property) {
|
|
||||||
case "size":
|
|
||||||
return Size, nil
|
|
||||||
case "idle":
|
|
||||||
return Idle, nil
|
|
||||||
case "inuse":
|
|
||||||
return InUse, nil
|
|
||||||
case "maxidleclosed":
|
|
||||||
return MaxIdleClosed, nil
|
|
||||||
case "maxlifetimeclosed":
|
|
||||||
return MaxLifetimeClosed, nil
|
|
||||||
case "maxopenconnections":
|
|
||||||
return MaxOpenConnections, nil
|
|
||||||
case "openconnections":
|
|
||||||
return OpenConnections, nil
|
|
||||||
case "waitcount":
|
|
||||||
return WaitCount, nil
|
|
||||||
case "waitduration":
|
|
||||||
return WaitDuration, nil
|
|
||||||
default:
|
|
||||||
return Unknown, fmt.Errorf("unknown database property")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stat satisfies the ethdb.Stater interface
|
|
||||||
// Stat returns a particular internal stat of the database
|
|
||||||
func (d *Database) Stat(property string) (string, error) {
|
|
||||||
prop, err := DatabasePropertyFromString(property)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
switch prop {
|
|
||||||
case Size:
|
|
||||||
var byteSize string
|
|
||||||
return byteSize, d.db.Get(&byteSize, dbSizePgStr)
|
|
||||||
case Idle:
|
|
||||||
return strconv.Itoa(d.db.Stats().Idle), nil
|
|
||||||
case InUse:
|
|
||||||
return strconv.Itoa(d.db.Stats().InUse), nil
|
|
||||||
case MaxIdleClosed:
|
|
||||||
return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil
|
|
||||||
case MaxLifetimeClosed:
|
|
||||||
return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil
|
|
||||||
case MaxOpenConnections:
|
|
||||||
return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil
|
|
||||||
case OpenConnections:
|
|
||||||
return strconv.Itoa(d.db.Stats().OpenConnections), nil
|
|
||||||
case WaitCount:
|
|
||||||
return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil
|
|
||||||
case WaitDuration:
|
|
||||||
return d.db.Stats().WaitDuration.String(), nil
|
|
||||||
default:
|
|
||||||
return "", fmt.Errorf("unhandled database property")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compact satisfies the ethdb.Compacter interface
|
|
||||||
// Compact flattens the underlying data store for the given key range
|
|
||||||
func (d *Database) Compact(start []byte, limit []byte) error {
|
|
||||||
return errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewBatch satisfies the ethdb.Batcher interface
|
|
||||||
// NewBatch creates a write-only database that buffers changes to its host db
|
|
||||||
// until a final write is called
|
|
||||||
func (d *Database) NewBatch() ethdb.Batch {
|
|
||||||
return NewBatch(d.db, nil, d.BlockNumber)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewBatchWithSize satisfies the ethdb.Batcher interface.
|
|
||||||
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
|
|
||||||
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
|
|
||||||
return NewBatch(d.db, nil, d.BlockNumber)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewIterator satisfies the ethdb.Iteratee interface
|
|
||||||
// it creates a binary-alphabetical iterator over a subset
|
|
||||||
// of database content with a particular key prefix, starting at a particular
|
|
||||||
// initial key (or after, if it does not exist).
|
|
||||||
//
|
|
||||||
// Note: This method assumes that the prefix is NOT part of the start, so there's
|
|
||||||
// no need for the caller to prepend the prefix to the start
|
|
||||||
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
|
|
||||||
return NewIterator(start, prefix, d.db)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close satisfies the io.Closer interface.
|
|
||||||
// Close closes the db connection and deregisters from groupcache.
|
|
||||||
func (d *Database) Close() error {
|
|
||||||
groupcache.DeregisterGroup(d.cache.Name())
|
|
||||||
return d.db.DB.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasAncient satisfies the ethdb.AncientReader interface
|
|
||||||
// HasAncient returns an indicator whether the specified data exists in the ancient store
|
|
||||||
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
|
|
||||||
return false, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ancient satisfies the ethdb.AncientReader interface
|
|
||||||
// Ancient retrieves an ancient binary blob from the append-only immutable files
|
|
||||||
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
|
|
||||||
return nil, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ancients satisfies the ethdb.AncientReader interface
|
|
||||||
// Ancients returns the ancient item numbers in the ancient store
|
|
||||||
func (d *Database) Ancients() (uint64, error) {
|
|
||||||
return 0, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tail satisfies the ethdb.AncientReader interface.
|
|
||||||
// Tail returns the number of first stored item in the freezer.
|
|
||||||
func (d *Database) Tail() (uint64, error) {
|
|
||||||
return 0, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// AncientSize satisfies the ethdb.AncientReader interface
|
|
||||||
// AncientSize returns the ancient size of the specified category
|
|
||||||
func (d *Database) AncientSize(kind string) (uint64, error) {
|
|
||||||
return 0, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// AncientRange retrieves all the items in a range, starting from the index 'start'.
|
|
||||||
// It will return
|
|
||||||
// - at most 'count' items,
|
|
||||||
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
|
|
||||||
// return as many items as fit into maxBytes.
|
|
||||||
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
|
|
||||||
return nil, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// ReadAncients applies the provided AncientReader function
|
|
||||||
func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
|
|
||||||
return errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// TruncateHead satisfies the ethdb.AncientWriter interface.
|
|
||||||
// TruncateHead discards all but the first n ancient data from the ancient store.
|
|
||||||
func (d *Database) TruncateHead(n uint64) (uint64, error) {
|
|
||||||
return 0, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// TruncateTail satisfies the ethdb.AncientWriter interface.
|
|
||||||
// TruncateTail discards the first n ancient data from the ancient store.
|
|
||||||
func (d *Database) TruncateTail(n uint64) (uint64, error) {
|
|
||||||
return 0, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sync satisfies the ethdb.AncientWriter interface
|
|
||||||
// Sync flushes all in-memory ancient store data to disk
|
|
||||||
func (d *Database) Sync() error {
|
|
||||||
return errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// MigrateTable satisfies the ethdb.AncientWriter interface.
|
|
||||||
// MigrateTable processes and migrates entries of a given table to a new format.
|
|
||||||
func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
|
|
||||||
return errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewSnapshot satisfies the ethdb.Snapshotter interface.
|
|
||||||
// NewSnapshot creates a database snapshot based on the current state.
|
|
||||||
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
|
|
||||||
return nil, errNotSupported
|
|
||||||
}
|
|
||||||
|
|
||||||
// AncientDatadir returns an error as we don't have a backing chain freezer.
|
|
||||||
func (d *Database) AncientDatadir() (string, error) {
|
|
||||||
return "", errNotSupported
|
|
||||||
}
|
|
@ -1,133 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package pgipfsethdb_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/big"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
"github.com/mailgun/groupcache/v2"
|
|
||||||
. "github.com/onsi/ginkgo"
|
|
||||||
. "github.com/onsi/gomega"
|
|
||||||
|
|
||||||
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v0"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
database ethdb.Database
|
|
||||||
db *sqlx.DB
|
|
||||||
err error
|
|
||||||
testBlockNumber = big.NewInt(1337)
|
|
||||||
testHeader = types.Header{Number: testBlockNumber}
|
|
||||||
testValue, _ = rlp.EncodeToBytes(testHeader)
|
|
||||||
testEthKey = testHeader.Hash().Bytes()
|
|
||||||
testCID, _ = pgipfsethdb.CIDFromKeccak256(testEthKey, cid.EthBlock)
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ = Describe("Database", func() {
|
|
||||||
BeforeEach(func() {
|
|
||||||
db, err = shared.TestDB()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
|
|
||||||
cacheConfig := pgipfsethdb.CacheConfig{
|
|
||||||
Name: "db",
|
|
||||||
Size: 3000000, // 3MB
|
|
||||||
ExpiryDuration: time.Hour,
|
|
||||||
}
|
|
||||||
|
|
||||||
database = pgipfsethdb.NewDatabase(db, cacheConfig)
|
|
||||||
|
|
||||||
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
|
|
||||||
Expect(ok).To(BeTrue())
|
|
||||||
(*databaseWithBlock).BlockNumber = testBlockNumber
|
|
||||||
})
|
|
||||||
AfterEach(func() {
|
|
||||||
groupcache.DeregisterGroup("db")
|
|
||||||
err = shared.ResetTestDB(db)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
err = db.Close()
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("Has", func() {
|
|
||||||
It("returns false if a key-pair doesn't exist in the db", func() {
|
|
||||||
has, err := database.Has(testCID.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(has).ToNot(BeTrue())
|
|
||||||
})
|
|
||||||
It("returns true if a key-pair exists in the db", func() {
|
|
||||||
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
has, err := database.Has(testCID.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(has).To(BeTrue())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("Get", func() {
|
|
||||||
It("throws an err if the key-pair doesn't exist in the db", func() {
|
|
||||||
_, err = database.Get(testCID.Bytes())
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
|
||||||
})
|
|
||||||
It("returns the value associated with the key, if the pair exists", func() {
|
|
||||||
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
val, err := database.Get(testCID.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(val).To(Equal(testValue))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("Put", func() {
|
|
||||||
It("persists the key-value pair in the database", func() {
|
|
||||||
_, err = database.Get(testCID.Bytes())
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
|
||||||
|
|
||||||
err = database.Put(testCID.Bytes(), testValue)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
val, err := database.Get(testCID.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(val).To(Equal(testValue))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
|
|
||||||
Describe("Delete", func() {
|
|
||||||
It("removes the key-value pair from the database", func() {
|
|
||||||
err = database.Put(testCID.Bytes(), testValue)
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
val, err := database.Get(testCID.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
Expect(val).To(Equal(testValue))
|
|
||||||
|
|
||||||
err = database.Delete(testCID.Bytes())
|
|
||||||
Expect(err).ToNot(HaveOccurred())
|
|
||||||
_, err = database.Get(testCID.Bytes())
|
|
||||||
Expect(err).To(HaveOccurred())
|
|
||||||
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
})
|
|
@ -1,91 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2020 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package pgipfsethdb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
)
|
|
||||||
|
|
||||||
var _ ethdb.Iterator = &Iterator{}
|
|
||||||
|
|
||||||
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
|
|
||||||
// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync),
|
|
||||||
// rawdb.InspectDatabase, and the new core/state/snapshot features.
|
|
||||||
// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
|
|
||||||
// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
|
|
||||||
type Iterator struct {
|
|
||||||
db *sqlx.DB
|
|
||||||
currentKey, prefix []byte
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
|
|
||||||
func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
|
|
||||||
return &Iterator{
|
|
||||||
db: db,
|
|
||||||
prefix: prefix,
|
|
||||||
currentKey: start,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Next satisfies the ethdb.Iterator interface
|
|
||||||
// Next moves the iterator to the next key/value pair
|
|
||||||
// It returns whether the iterator is exhausted
|
|
||||||
func (i *Iterator) Next() bool {
|
|
||||||
// this is complicated by the ipfs db keys not being the keccak256 hashes
|
|
||||||
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
|
|
||||||
panic("implement me: Next")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Error satisfies the ethdb.Iterator interface
|
|
||||||
// Error returns any accumulated error
|
|
||||||
// Exhausting all the key/value pairs is not considered to be an error
|
|
||||||
func (i *Iterator) Error() error {
|
|
||||||
return i.err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Key satisfies the ethdb.Iterator interface
|
|
||||||
// Key returns the key of the current key/value pair, or nil if done
|
|
||||||
// The caller should not modify the contents of the returned slice
|
|
||||||
// and its contents may change on the next call to Next
|
|
||||||
func (i *Iterator) Key() []byte {
|
|
||||||
return i.currentKey
|
|
||||||
}
|
|
||||||
|
|
||||||
// Value satisfies the ethdb.Iterator interface
|
|
||||||
// Value returns the value of the current key/value pair, or nil if done
|
|
||||||
// The caller should not modify the contents of the returned slice
|
|
||||||
// and its contents may change on the next call to Next
|
|
||||||
func (i *Iterator) Value() []byte {
|
|
||||||
c, err := cid.Cast(i.currentKey)
|
|
||||||
if err != nil {
|
|
||||||
i.err = err
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
var data []byte
|
|
||||||
i.err = i.db.Get(&data, getPgStr, c)
|
|
||||||
return data
|
|
||||||
}
|
|
||||||
|
|
||||||
// Release satisfies the ethdb.Iterator interface
|
|
||||||
// Release releases associated resources
|
|
||||||
// Release should always succeed and can be called multiple times without causing error
|
|
||||||
func (i *Iterator) Release() {
|
|
||||||
i.db.Close()
|
|
||||||
}
|
|
@ -1,32 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package pgipfsethdb
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/ipfs/go-cid"
|
|
||||||
_ "github.com/lib/pq" //postgres driver
|
|
||||||
"github.com/multiformats/go-multihash"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CIDFromKeccak256 converts keccak256 hash bytes into a v1 cid
|
|
||||||
func CIDFromKeccak256(hash []byte, codecType uint64) (cid.Cid, error) {
|
|
||||||
mh, err := multihash.Encode(hash, multihash.KECCAK_256)
|
|
||||||
if err != nil {
|
|
||||||
return cid.Cid{}, err
|
|
||||||
}
|
|
||||||
return cid.NewCidV1(codecType, mh), nil
|
|
||||||
}
|
|
@ -1,29 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2019 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package pgipfsethdb_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
|
||||||
. "github.com/onsi/gomega"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestPGIPFSETHDB(t *testing.T) {
|
|
||||||
RegisterFailHandler(Fail)
|
|
||||||
RunSpecs(t, "PG-IPFS ethdb test")
|
|
||||||
}
|
|
2
util.go
2
util.go
@ -29,7 +29,7 @@ func Keccak256ToCid(h []byte, codec uint64) (cid.Cid, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return cid.Cid{}, err
|
return cid.Cid{}, err
|
||||||
}
|
}
|
||||||
return cid.NewCidV1(codec, buf), nil
|
return cid.NewCidV1(codec, multihash.Multihash(buf)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBlock takes a keccak256 hash key and the rlp []byte value it was derived from and creates an ipfs block object
|
// NewBlock takes a keccak256 hash key and the rlp []byte value it was derived from and creates an ipfs block object
|
||||||
|
Loading…
Reference in New Issue
Block a user