Compare commits

..

15 Commits

Author SHA1 Message Date
e9420ff2b8
Merge pull request #59 from cerc-io/roy/v5-dev
Deregister groupcache on close
2023-05-31 16:24:24 +08:00
c438c158fc Deregister groupcache on close 2023-05-14 02:33:04 +08:00
Ian Norden
05559e2551
Merge pull request #58 from cerc-io/roy/v5-dev
Update to v5
2023-04-09 12:16:03 -05:00
i-norden
a1e3f601b1 revert using vdb fork of geth 2023-04-09 12:11:46 -05:00
4a4ae2f2ea use statediff indexer config; other nits 2023-04-01 00:13:20 +08:00
4695984e1d vulc => cerc 2023-04-01 00:13:20 +08:00
e015d4333d update docs 2023-04-01 00:13:11 +08:00
Ian Norden
2709f7b8a7
Merge pull request #56 from cerc-io/ian/v5
bump to v1.11.5
2023-03-30 16:26:04 -05:00
i-norden
0e49b87509 update pkg version 2023-03-30 16:13:16 -05:00
i-norden
360b0abbbc use stock v1.11.5 geth; use new cerc test db info 2023-03-30 16:11:05 -05:00
Ian Norden
4a52acb0bb
Merge pull request #50 from cerc-io/ian/v5
v0 blockstore compatible version
2023-03-30 15:51:59 -05:00
i-norden
d2176e69af update postgres v0 subpkg and tests to work with cid keys 2023-02-20 12:44:45 -06:00
i-norden
5451c3c225 refactor v1 pkg to work with new dep paths 2023-02-20 12:44:45 -06:00
i-norden
844cda1107 shared util 2023-02-20 12:44:45 -06:00
i-norden
285b0f0db4 v0 and v1 subdirectories for postgres package; for working with the corresponding version of blockstore format 2023-02-20 12:44:40 -06:00
22 changed files with 995 additions and 47 deletions

View File

@ -1,6 +1,6 @@
## ipfs-ethdb
[![Go Report Card](https://goreportcard.com/badge/github.com/vulcanize/ipfs-ethdb)](https://goreportcard.com/report/github.com/vulcanize/ipfs-ethdb)
[![Go Report Card](https://goreportcard.com/badge/github.com/cerc/ipfs-ethdb)](https://goreportcard.com/report/github.com/cerc-io/ipfs-ethdb)
> go-ethereum ethdb interfaces for Ethereum state data stored in IPFS
@ -11,7 +11,7 @@ interfacing with a state database. These interfaces are used to build higher-lev
which are used to perform the bulk of state related needs.
Ethereum data can be stored on IPFS, standard codecs for Etheruem data are defined in the [go-cid](https://github.com/ipfs/go-cid) library.
Using our [statediffing geth client](https://github.com/vulcanize/go-ethereum/releases/tag/v1.9.11-statediff-0.0.2) it is feasible to extract every single
Using our [statediffing geth client](https://github.com/cerc-io/go-ethereum/releases/tag/v1.9.11-statediff-0.0.2) it is feasible to extract every single
state and storage node and publish it to IPFS.
Geth stores state data in leveldb as key-value pairs between the keccak256 hash of the rlp-encoded object and the rlp-encoded object.
@ -50,7 +50,7 @@ import (
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/repo/fsrepo"
"github.com/jmoiron/sqlx"
"github.com/vulcanize/ipfs-ethdb/v4"
"github.com/cerc-io/ipfs-ethdb/v5"
)
func main() {

View File

@ -25,7 +25,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
ipfsethdb "github.com/cerc-io/ipfs-ethdb/v4"
ipfsethdb "github.com/cerc-io/ipfs-ethdb/v5"
)
var (

View File

@ -26,7 +26,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
ipfsethdb "github.com/cerc-io/ipfs-ethdb/v4"
ipfsethdb "github.com/cerc-io/ipfs-ethdb/v5"
)
var (

8
go.mod
View File

@ -1,4 +1,4 @@
module github.com/cerc-io/ipfs-ethdb/v4
module github.com/cerc-io/ipfs-ethdb/v5
go 1.19
@ -12,7 +12,7 @@ require (
github.com/ipfs/go-ipfs-ds-help v1.1.0
github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.7
github.com/lib/pq v1.10.6
github.com/mailgun/groupcache/v2 v2.3.0
github.com/multiformats/go-multihash v0.1.0
github.com/onsi/ginkgo v1.16.5
@ -60,8 +60,8 @@ require (
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/net v0.6.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/net v0.4.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect

12
go.sum
View File

@ -153,8 +153,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.6 h1:jbk+ZieJ0D7EVGJYpL9QTz7/YW6UHbmdnZWYyK5cdBs=
github.com/lib/pq v1.10.6/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mailgun/groupcache/v2 v2.3.0 h1:/Usq3VewXa8t+afFaUAY7g8N9cRNqBT5nDhECYwLcd8=
github.com/mailgun/groupcache/v2 v2.3.0/go.mod h1:tH8aMaTRIjFMJsmJ9p7Y5HGBj9hV/J9rKQ+/3dIXzNU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
@ -263,8 +263,8 @@ golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc=
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 h1:xP7rWLUr1e1n2xkK5YB4LI0hPEy3LJC6Wk+D4pGlOJg=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
@ -281,8 +281,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.6.0 h1:L4ZwwTvKW9gr0ZMS1yrHD9GZhIuVjOBBnaKH+SPQK0Q=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU=
golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

View File

@ -1,6 +1,6 @@
## ipfs-ethdb
IPFS has been [extended](https://github.com/vulcanize/go-ipfs/releases/tag/v0.4.22-alpha) to [use Postgres](https://github.com/vulcanize/go-ipfs-config/releases/tag/v0.0.8-alpha) as a backing [datastore](https://github.com/ipfs/go-ds-sql/tree/master/postgres).
IPFS has been [extended](https://github.com/cerc-io/go-ipfs/releases/tag/v0.4.22-alpha) to [use Postgres](https://github.com/cerc-io/go-ipfs-config/releases/tag/v0.0.8-alpha) as a backing [datastore](https://github.com/ipfs/go-ds-sql/tree/master/postgres).
Interfacing directly with the IPFS-backing Postgres database has some advantages over using the blockservice interface.
Namely, batching of IPFS writes with other Postgres writes and avoiding lock contention on the ipfs repository (lockfile located at the `IPFS_PATH`).
The downside is that we forgo the block-exchange capabilities of the blockservice, and are only able to fetch data contained in the local datastore.
@ -18,11 +18,11 @@ import (
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/trie"
"github.com/jmoiron/sqlx"
"github.com/vulcanize/ipfs-ethdb/v4/postgres"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
)
func main() {
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
connectStr := "postgresql://vdbm:password@localhost:8077/cerc_testing?sslmode=disable"
db, _ := sqlx.Connect("postgres", connectStr)
kvs := pgipfsethdb.NewKeyValueStore(db)
@ -31,7 +31,11 @@ func main() {
trieNodeIterator := t.NodeIterator([]byte{})
// do stuff with trie node iterator
database := pgipfsethdb.NewDatabase(db)
database := pgipfsethdb.NewDatabase(db, pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
})
stateDatabase := state.NewDatabase(database)
stateDB, _ := state.New(common.Hash{}, stateDatabase, nil)
stateDBNodeIterator := state.NewNodeIterator(stateDB)

42
postgres/shared/util.go Normal file
View File

@ -0,0 +1,42 @@
// VulcanizeDB
// Copyright © 2023 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package shared
import (
"github.com/jmoiron/sqlx"
)
/*
Hostname: "localhost",
Port: 8077,
DatabaseName: "cerc_testing",
Username: "vdbm",
Password: "password",
*/
// TestDB connect to the testing database
// it assumes the database has the IPFS ipld.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the ipld.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://vdbm:password@localhost:8077/cerc_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db ipld.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE ipld.blocks CASCADE")
return err
}

117
postgres/v0/batch.go Normal file
View File

@ -0,0 +1,117 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"math/big"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
)
var _ ethdb.Batch = &Batch{}
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
type Batch struct {
db *sqlx.DB
tx *sqlx.Tx
valueSize int
blockNumber *big.Int
}
// NewBatch returns a ethdb.Batch interface for PG-IPFS
func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch {
b := &Batch{
db: db,
tx: tx,
blockNumber: blockNumber,
}
if tx == nil {
b.Reset()
}
return b
}
// Put satisfies the ethdb.Batch interface
// Put inserts the given value into the key-value data store
// Key is expected to be a fully formulated cid key
// TODO: note, now that we expected a cid we could route to the "cids" tables based on prefix instead of to ipld.blocks
// but is it better to handle this routing here, or use a completely different interface since we already have to refactor
// at levels above this package in order to pass in cids instead of raw keccak256 hashes
func (b *Batch) Put(cidBytes []byte, value []byte) (err error) {
// cast and resolve strings from cid.Cast
// this will assert that we have a correctly formatted CID
// and will handle the different string encodings for v0 and v1 CIDs
// (note that this v0 vs v1 is different from the blockstore v0 vs v1)
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
if _, err = b.tx.Exec(putPgStr, c.String(), value, b.blockNumber.Uint64()); err != nil {
return err
}
b.valueSize += len(value)
return nil
}
// Delete satisfies the ethdb.Batch interface
// Delete removes the key from the key-value data store
func (b *Batch) Delete(cidBytes []byte) (err error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
_, err = b.tx.Exec(deletePgStr, c.String())
return err
}
// ValueSize satisfies the ethdb.Batch interface
// ValueSize retrieves the amount of data queued up for writing
// The returned value is the total byte length of all data queued to write
func (b *Batch) ValueSize() int {
return b.valueSize
}
// Write satisfies the ethdb.Batch interface
// Write flushes any accumulated data to disk
func (b *Batch) Write() error {
if b.tx == nil {
return nil
}
return b.tx.Commit()
}
// Replay satisfies the ethdb.Batch interface
// Replay replays the batch contents
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
return errNotSupported
}
// Reset satisfies the ethdb.Batch interface
// Reset resets the batch for reuse
// This should be called after every write
func (b *Batch) Reset() {
var err error
b.tx, err = b.db.Beginx()
if err != nil {
panic(err)
}
b.valueSize = 0
}

138
postgres/v0/batch_test.go Normal file
View File

@ -0,0 +1,138 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/mailgun/groupcache/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v0"
)
var (
batch ethdb.Batch
testHeader2 = types.Header{Number: big.NewInt(2)}
testValue2, _ = rlp.EncodeToBytes(testHeader2)
testEthKey2 = testHeader2.Hash().Bytes()
testCID2, _ = pgipfsethdb.CIDFromKeccak256(testEthKey2, cid.EthBlock)
)
var _ = Describe("Batch", func() {
BeforeEach(func() {
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
batch = database.NewBatch()
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})
Describe("Put/Write", func() {
It("adds the key-value pair to the batch", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testCID2.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
val2, err := database.Get(testCID2.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val2).To(Equal(testValue2))
})
})
Describe("Delete/Reset/Write", func() {
It("deletes the key-value pair in the batch", func() {
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
batch.Reset()
err = batch.Delete(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
err = batch.Delete(testCID2.Bytes())
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
_, err = database.Get(testCID2.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
Describe("ValueSize/Reset", func() {
It("returns the size of data in the batch queued for write", func() {
err = batch.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
err = batch.Put(testCID2.Bytes(), testValue2)
Expect(err).ToNot(HaveOccurred())
err = batch.Write()
Expect(err).ToNot(HaveOccurred())
size := batch.ValueSize()
Expect(size).To(Equal(len(testValue) + len(testValue2)))
batch.Reset()
size = batch.ValueSize()
Expect(size).To(Equal(0))
})
})
})

365
postgres/v0/database.go Normal file
View File

@ -0,0 +1,365 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"context"
"database/sql"
"errors"
"fmt"
"math/big"
"strconv"
"strings"
"time"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
log "github.com/sirupsen/logrus"
)
var errNotSupported = errors.New("this operation is not supported")
var (
hasPgStr = "SELECT exists(select 1 from ipld.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM ipld.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO ipld.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM ipld.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
)
var _ ethdb.Database = &Database{}
// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection
type Database struct {
db *sqlx.DB
cache *groupcache.Group
BlockNumber *big.Int
}
func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) {
return 0, errNotSupported
}
type CacheConfig struct {
Name string
Size int
ExpiryDuration time.Duration
}
// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS
func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore {
database := Database{db: db}
database.InitCache(cacheConfig)
return &database
}
// NewDatabase returns a ethdb.Database interface for PG-IPFS
func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database {
database := Database{db: db}
database.InitCache(cacheConfig)
return &database
}
func (d *Database) InitCache(cacheConfig CacheConfig) {
d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc(
func(_ context.Context, id string, dest groupcache.Sink) error {
val, err := d.dbGet(id)
if err != nil {
return err
}
// Set the value in the groupcache, with expiry
if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil {
return err
}
return nil
},
))
}
func (d *Database) GetCacheStats() groupcache.Stats {
return d.cache.Stats
}
// Has satisfies the ethdb.KeyValueReader interface
// Has retrieves if a cid is present in the key-value data store
func (d *Database) Has(cidBytes []byte) (bool, error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return false, err
}
var exists bool
return exists, d.db.Get(&exists, hasPgStr, c.String())
}
// Get retrieves the given key if it's present in the key-value data store
func (d *Database) dbGet(key string) ([]byte, error) {
var data []byte
err := d.db.Get(&data, getPgStr, key)
if err == sql.ErrNoRows {
log.Warn("Database miss for key ", key)
}
return data, err
}
// Get satisfies the ethdb.KeyValueReader interface
// Get retrieves the given cid if it's present in the key-value data store
func (d *Database) Get(cidBytes []byte) ([]byte, error) {
c, err := cid.Cast(cidBytes)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
var data []byte
return data, d.cache.Get(ctx, c.String(), groupcache.AllocatingByteSliceSink(&data))
}
// Put satisfies the ethdb.KeyValueWriter interface
// Put inserts the given value into the key-value data store
// Key is expected to be a fully formulated cis of value
func (d *Database) Put(cidBytes []byte, value []byte) error {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
_, err = d.db.Exec(putPgStr, c.String(), value, d.BlockNumber.Uint64())
return err
}
// Delete satisfies the ethdb.KeyValueWriter interface
// Delete removes the cid from the key-value data store
func (d *Database) Delete(cidBytes []byte) error {
c, err := cid.Cast(cidBytes)
if err != nil {
return err
}
cidString := c.String()
_, err = d.db.Exec(deletePgStr, cidString)
if err != nil {
return err
}
// Remove from cache.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
err = d.cache.Remove(ctx, cidString)
return err
}
// DatabaseProperty enum type
type DatabaseProperty int
const (
Unknown DatabaseProperty = iota
Size
Idle
InUse
MaxIdleClosed
MaxLifetimeClosed
MaxOpenConnections
OpenConnections
WaitCount
WaitDuration
)
// DatabasePropertyFromString helper function
func DatabasePropertyFromString(property string) (DatabaseProperty, error) {
switch strings.ToLower(property) {
case "size":
return Size, nil
case "idle":
return Idle, nil
case "inuse":
return InUse, nil
case "maxidleclosed":
return MaxIdleClosed, nil
case "maxlifetimeclosed":
return MaxLifetimeClosed, nil
case "maxopenconnections":
return MaxOpenConnections, nil
case "openconnections":
return OpenConnections, nil
case "waitcount":
return WaitCount, nil
case "waitduration":
return WaitDuration, nil
default:
return Unknown, fmt.Errorf("unknown database property")
}
}
// Stat satisfies the ethdb.Stater interface
// Stat returns a particular internal stat of the database
func (d *Database) Stat(property string) (string, error) {
prop, err := DatabasePropertyFromString(property)
if err != nil {
return "", err
}
switch prop {
case Size:
var byteSize string
return byteSize, d.db.Get(&byteSize, dbSizePgStr)
case Idle:
return strconv.Itoa(d.db.Stats().Idle), nil
case InUse:
return strconv.Itoa(d.db.Stats().InUse), nil
case MaxIdleClosed:
return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil
case MaxLifetimeClosed:
return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil
case MaxOpenConnections:
return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil
case OpenConnections:
return strconv.Itoa(d.db.Stats().OpenConnections), nil
case WaitCount:
return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil
case WaitDuration:
return d.db.Stats().WaitDuration.String(), nil
default:
return "", fmt.Errorf("unhandled database property")
}
}
// Compact satisfies the ethdb.Compacter interface
// Compact flattens the underlying data store for the given key range
func (d *Database) Compact(start []byte, limit []byte) error {
return errNotSupported
}
// NewBatch satisfies the ethdb.Batcher interface
// NewBatch creates a write-only database that buffers changes to its host db
// until a final write is called
func (d *Database) NewBatch() ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewBatchWithSize satisfies the ethdb.Batcher interface.
// NewBatchWithSize creates a write-only database batch with pre-allocated buffer.
func (d *Database) NewBatchWithSize(size int) ethdb.Batch {
return NewBatch(d.db, nil, d.BlockNumber)
}
// NewIterator satisfies the ethdb.Iteratee interface
// it creates a binary-alphabetical iterator over a subset
// of database content with a particular key prefix, starting at a particular
// initial key (or after, if it does not exist).
//
// Note: This method assumes that the prefix is NOT part of the start, so there's
// no need for the caller to prepend the prefix to the start
func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator {
return NewIterator(start, prefix, d.db)
}
// Close satisfies the io.Closer interface.
// Close closes the db connection and deregisters from groupcache.
func (d *Database) Close() error {
groupcache.DeregisterGroup(d.cache.Name())
return d.db.DB.Close()
}
// HasAncient satisfies the ethdb.AncientReader interface
// HasAncient returns an indicator whether the specified data exists in the ancient store
func (d *Database) HasAncient(kind string, number uint64) (bool, error) {
return false, errNotSupported
}
// Ancient satisfies the ethdb.AncientReader interface
// Ancient retrieves an ancient binary blob from the append-only immutable files
func (d *Database) Ancient(kind string, number uint64) ([]byte, error) {
return nil, errNotSupported
}
// Ancients satisfies the ethdb.AncientReader interface
// Ancients returns the ancient item numbers in the ancient store
func (d *Database) Ancients() (uint64, error) {
return 0, errNotSupported
}
// Tail satisfies the ethdb.AncientReader interface.
// Tail returns the number of first stored item in the freezer.
func (d *Database) Tail() (uint64, error) {
return 0, errNotSupported
}
// AncientSize satisfies the ethdb.AncientReader interface
// AncientSize returns the ancient size of the specified category
func (d *Database) AncientSize(kind string) (uint64, error) {
return 0, errNotSupported
}
// AncientRange retrieves all the items in a range, starting from the index 'start'.
// It will return
// - at most 'count' items,
// - at least 1 item (even if exceeding the maxBytes), but will otherwise
// return as many items as fit into maxBytes.
func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) {
return nil, errNotSupported
}
// ReadAncients applies the provided AncientReader function
func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) {
return errNotSupported
}
// TruncateHead satisfies the ethdb.AncientWriter interface.
// TruncateHead discards all but the first n ancient data from the ancient store.
func (d *Database) TruncateHead(n uint64) error {
return errNotSupported
}
// TruncateTail satisfies the ethdb.AncientWriter interface.
// TruncateTail discards the first n ancient data from the ancient store.
func (d *Database) TruncateTail(n uint64) error {
return errNotSupported
}
// Sync satisfies the ethdb.AncientWriter interface
// Sync flushes all in-memory ancient store data to disk
func (d *Database) Sync() error {
return errNotSupported
}
// MigrateTable satisfies the ethdb.AncientWriter interface.
// MigrateTable processes and migrates entries of a given table to a new format.
func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error {
return errNotSupported
}
// NewSnapshot satisfies the ethdb.Snapshotter interface.
// NewSnapshot creates a database snapshot based on the current state.
func (d *Database) NewSnapshot() (ethdb.Snapshot, error) {
return nil, errNotSupported
}
// AncientDatadir returns an error as we don't have a backing chain freezer.
func (d *Database) AncientDatadir() (string, error) {
return "", errNotSupported
}

View File

@ -0,0 +1,133 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ipfs/go-cid"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
"github.com/jmoiron/sqlx"
"github.com/mailgun/groupcache/v2"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v0"
)
var (
database ethdb.Database
db *sqlx.DB
err error
testBlockNumber = big.NewInt(1337)
testHeader = types.Header{Number: testBlockNumber}
testValue, _ = rlp.EncodeToBytes(testHeader)
testEthKey = testHeader.Hash().Bytes()
testCID, _ = pgipfsethdb.CIDFromKeccak256(testEthKey, cid.EthBlock)
)
var _ = Describe("Database", func() {
BeforeEach(func() {
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
database = pgipfsethdb.NewDatabase(db, cacheConfig)
databaseWithBlock, ok := database.(*pgipfsethdb.Database)
Expect(ok).To(BeTrue())
(*databaseWithBlock).BlockNumber = testBlockNumber
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
err = db.Close()
Expect(err).ToNot(HaveOccurred())
})
Describe("Has", func() {
It("returns false if a key-pair doesn't exist in the db", func() {
has, err := database.Has(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(has).To(BeTrue())
})
})
Describe("Get", func() {
It("throws an err if the key-pair doesn't exist in the db", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Put", func() {
It("persists the key-value pair in the database", func() {
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
err = database.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
})
})
Describe("Delete", func() {
It("removes the key-value pair from the database", func() {
err = database.Put(testCID.Bytes(), testValue)
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
Expect(val).To(Equal(testValue))
err = database.Delete(testCID.Bytes())
Expect(err).ToNot(HaveOccurred())
_, err = database.Get(testCID.Bytes())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
})
})

91
postgres/v0/iterator.go Normal file
View File

@ -0,0 +1,91 @@
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ipfs/go-cid"
"github.com/jmoiron/sqlx"
)
var _ ethdb.Iterator = &Iterator{}
// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection
// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync),
// rawdb.InspectDatabase, and the new core/state/snapshot features.
// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed
// from the ethdb.KeyValueStoreand ethdb.Database interfaces)
type Iterator struct {
db *sqlx.DB
currentKey, prefix []byte
err error
}
// NewIterator returns an ethdb.Iterator interface for PG-IPFS
func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator {
return &Iterator{
db: db,
prefix: prefix,
currentKey: start,
}
}
// Next satisfies the ethdb.Iterator interface
// Next moves the iterator to the next key/value pair
// It returns whether the iterator is exhausted
func (i *Iterator) Next() bool {
// this is complicated by the ipfs db keys not being the keccak256 hashes
// go-ethereum usage of this method expects the iteration to occur over keccak256 keys
panic("implement me: Next")
}
// Error satisfies the ethdb.Iterator interface
// Error returns any accumulated error
// Exhausting all the key/value pairs is not considered to be an error
func (i *Iterator) Error() error {
return i.err
}
// Key satisfies the ethdb.Iterator interface
// Key returns the key of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Key() []byte {
return i.currentKey
}
// Value satisfies the ethdb.Iterator interface
// Value returns the value of the current key/value pair, or nil if done
// The caller should not modify the contents of the returned slice
// and its contents may change on the next call to Next
func (i *Iterator) Value() []byte {
c, err := cid.Cast(i.currentKey)
if err != nil {
i.err = err
return nil
}
var data []byte
i.err = i.db.Get(&data, getPgStr, c)
return data
}
// Release satisfies the ethdb.Iterator interface
// Release releases associated resources
// Release should always succeed and can be called multiple times without causing error
func (i *Iterator) Release() {
i.db.Close()
}

32
postgres/v0/util.go Normal file
View File

@ -0,0 +1,32 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb
import (
"github.com/ipfs/go-cid"
_ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash"
)
// CIDFromKeccak256 converts keccak256 hash bytes into a v1 cid
func CIDFromKeccak256(hash []byte, codecType uint64) (cid.Cid, error) {
mh, err := multihash.Encode(hash, multihash.KECCAK_256)
if err != nil {
return cid.Cid{}, err
}
return cid.NewCidV1(codecType, mh), nil
}

View File

@ -20,6 +20,8 @@ import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
@ -27,7 +29,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
)
var (
@ -39,7 +41,7 @@ var (
var _ = Describe("Batch", func() {
BeforeEach(func() {
db, err = pgipfsethdb.TestDB()
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
@ -58,7 +60,7 @@ var _ = Describe("Batch", func() {
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = pgipfsethdb.ResetTestDB(db)
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
})

View File

@ -35,11 +35,17 @@ import (
var errNotSupported = errors.New("this operation is not supported")
var (
hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM public.blocks WHERE key = $1"
hasPgStr = "SELECT exists(select 1 from ipld.blocks WHERE key = $1 LIMIT 1)"
getPgStr = "SELECT data FROM ipld.blocks WHERE key = $1 LIMIT 1"
putPgStr = "INSERT INTO ipld.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"
deletePgStr = "DELETE FROM ipld.blocks WHERE key = $1"
dbSizePgStr = "SELECT pg_database_size(current_database())"
DefaultCacheConfig = CacheConfig{
Name: "db",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
}
)
var _ ethdb.Database = &Database{}

View File

@ -20,6 +20,8 @@ import (
"math/big"
"time"
"github.com/cerc-io/ipfs-ethdb/v5/postgres/shared"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
@ -28,7 +30,7 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres"
pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v5/postgres/v1"
)
var (
@ -44,7 +46,7 @@ var (
var _ = Describe("Database", func() {
BeforeEach(func() {
db, err = pgipfsethdb.TestDB()
db, err = shared.TestDB()
Expect(err).ToNot(HaveOccurred())
cacheConfig := pgipfsethdb.CacheConfig{
@ -61,7 +63,9 @@ var _ = Describe("Database", func() {
})
AfterEach(func() {
groupcache.DeregisterGroup("db")
err = pgipfsethdb.ResetTestDB(db)
err = shared.ResetTestDB(db)
Expect(err).ToNot(HaveOccurred())
err = db.Close()
Expect(err).ToNot(HaveOccurred())
})
@ -72,7 +76,7 @@ var _ = Describe("Database", func() {
Expect(has).ToNot(BeTrue())
})
It("returns true if a key-pair exists in the db", func() {
_, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
has, err := database.Has(testEthKey)
Expect(err).ToNot(HaveOccurred())
@ -87,7 +91,7 @@ var _ = Describe("Database", func() {
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
})
It("returns the value associated with the key, if the pair exists", func() {
_, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
_, err = db.Exec("INSERT into ipld.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64())
Expect(err).ToNot(HaveOccurred())
val, err := database.Get(testEthKey)
Expect(err).ToNot(HaveOccurred())

View File

@ -0,0 +1,29 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package pgipfsethdb_test
import (
"testing"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestPGIPFSETHDB(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "PG-IPFS ethdb test")
}

View File

@ -19,7 +19,6 @@ package pgipfsethdb
import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" //postgres driver
"github.com/multiformats/go-multihash"
)
@ -33,17 +32,3 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) {
dbKey := dshelp.MultihashToDsKey(mh)
return blockstore.BlockPrefix.String() + dbKey.String(), nil
}
// TestDB connect to the testing database
// it assumes the database has the IPFS public.blocks table present
// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table
func TestDB() (*sqlx.DB, error) {
connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable"
return sqlx.Connect("postgres", connectStr)
}
// ResetTestDB drops all rows in the test db public.blocks table
func ResetTestDB(db *sqlx.DB) error {
_, err := db.Exec("TRUNCATE public.blocks CASCADE")
return err
}

View File

@ -29,7 +29,7 @@ func Keccak256ToCid(h []byte, codec uint64) (cid.Cid, error) {
if err != nil {
return cid.Cid{}, err
}
return cid.NewCidV1(codec, multihash.Multihash(buf)), nil
return cid.NewCidV1(codec, buf), nil
}
// NewBlock takes a keccak256 hash key and the rlp []byte value it was derived from and creates an ipfs block object