From 285b0f0db495b3310ea19e708c6c696068d6c467 Mon Sep 17 00:00:00 2001 From: i-norden Date: Mon, 20 Feb 2023 11:39:20 -0600 Subject: [PATCH 1/4] v0 and v1 subdirectories for postgres package; for working with the corresponding version of blockstore format --- postgres/{ => v0}/batch.go | 0 postgres/{ => v0}/batch_test.go | 0 postgres/{ => v0}/database.go | 0 postgres/{ => v0}/database_test.go | 0 postgres/{ => v0}/iterator.go | 0 postgres/{ => v0}/postgres_suite_test.go | 0 postgres/{ => v0}/util.go | 0 postgres/v1/batch.go | 108 +++++++ postgres/v1/batch_test.go | 133 +++++++++ postgres/v1/database.go | 361 +++++++++++++++++++++++ postgres/v1/database_test.go | 127 ++++++++ postgres/v1/iterator.go | 90 ++++++ postgres/v1/postgres_suite_test.go | 29 ++ postgres/v1/util.go | 49 +++ 14 files changed, 897 insertions(+) rename postgres/{ => v0}/batch.go (100%) rename postgres/{ => v0}/batch_test.go (100%) rename postgres/{ => v0}/database.go (100%) rename postgres/{ => v0}/database_test.go (100%) rename postgres/{ => v0}/iterator.go (100%) rename postgres/{ => v0}/postgres_suite_test.go (100%) rename postgres/{ => v0}/util.go (100%) create mode 100644 postgres/v1/batch.go create mode 100644 postgres/v1/batch_test.go create mode 100644 postgres/v1/database.go create mode 100644 postgres/v1/database_test.go create mode 100644 postgres/v1/iterator.go create mode 100644 postgres/v1/postgres_suite_test.go create mode 100644 postgres/v1/util.go diff --git a/postgres/batch.go b/postgres/v0/batch.go similarity index 100% rename from postgres/batch.go rename to postgres/v0/batch.go diff --git a/postgres/batch_test.go b/postgres/v0/batch_test.go similarity index 100% rename from postgres/batch_test.go rename to postgres/v0/batch_test.go diff --git a/postgres/database.go b/postgres/v0/database.go similarity index 100% rename from postgres/database.go rename to postgres/v0/database.go diff --git a/postgres/database_test.go b/postgres/v0/database_test.go similarity index 100% rename from postgres/database_test.go rename to postgres/v0/database_test.go diff --git a/postgres/iterator.go b/postgres/v0/iterator.go similarity index 100% rename from postgres/iterator.go rename to postgres/v0/iterator.go diff --git a/postgres/postgres_suite_test.go b/postgres/v0/postgres_suite_test.go similarity index 100% rename from postgres/postgres_suite_test.go rename to postgres/v0/postgres_suite_test.go diff --git a/postgres/util.go b/postgres/v0/util.go similarity index 100% rename from postgres/util.go rename to postgres/v0/util.go diff --git a/postgres/v1/batch.go b/postgres/v1/batch.go new file mode 100644 index 0000000..0099567 --- /dev/null +++ b/postgres/v1/batch.go @@ -0,0 +1,108 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/jmoiron/sqlx" +) + +var _ ethdb.Batch = &Batch{} + +// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection +type Batch struct { + db *sqlx.DB + tx *sqlx.Tx + valueSize int + + blockNumber *big.Int +} + +// NewBatch returns a ethdb.Batch interface for PG-IPFS +func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch { + b := &Batch{ + db: db, + tx: tx, + blockNumber: blockNumber, + } + if tx == nil { + b.Reset() + } + return b +} + +// Put satisfies the ethdb.Batch interface +// Put inserts the given value into the key-value data store +// Key is expected to be the keccak256 hash of value +func (b *Batch) Put(key []byte, value []byte) (err error) { + mhKey, err := MultihashKeyFromKeccak256(key) + if err != nil { + return err + } + if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil { + return err + } + b.valueSize += len(value) + return nil +} + +// Delete satisfies the ethdb.Batch interface +// Delete removes the key from the key-value data store +func (b *Batch) Delete(key []byte) (err error) { + mhKey, err := MultihashKeyFromKeccak256(key) + if err != nil { + return err + } + _, err = b.tx.Exec(deletePgStr, mhKey) + return err +} + +// ValueSize satisfies the ethdb.Batch interface +// ValueSize retrieves the amount of data queued up for writing +// The returned value is the total byte length of all data queued to write +func (b *Batch) ValueSize() int { + return b.valueSize +} + +// Write satisfies the ethdb.Batch interface +// Write flushes any accumulated data to disk +func (b *Batch) Write() error { + if b.tx == nil { + return nil + } + return b.tx.Commit() +} + +// Replay satisfies the ethdb.Batch interface +// Replay replays the batch contents +func (b *Batch) Replay(w ethdb.KeyValueWriter) error { + return errNotSupported +} + +// Reset satisfies the ethdb.Batch interface +// Reset resets the batch for reuse +// This should be called after every write +func (b *Batch) Reset() { + var err error + b.tx, err = b.db.Beginx() + if err != nil { + panic(err) + } + b.valueSize = 0 +} diff --git a/postgres/v1/batch_test.go b/postgres/v1/batch_test.go new file mode 100644 index 0000000..ba4ca20 --- /dev/null +++ b/postgres/v1/batch_test.go @@ -0,0 +1,133 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb_test + +import ( + "math/big" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/mailgun/groupcache/v2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" +) + +var ( + batch ethdb.Batch + testHeader2 = types.Header{Number: big.NewInt(2)} + testValue2, _ = rlp.EncodeToBytes(testHeader2) + testEthKey2 = testHeader2.Hash().Bytes() +) + +var _ = Describe("Batch", func() { + BeforeEach(func() { + db, err = pgipfsethdb.TestDB() + Expect(err).ToNot(HaveOccurred()) + + cacheConfig := pgipfsethdb.CacheConfig{ + Name: "db", + Size: 3000000, // 3MB + ExpiryDuration: time.Hour, + } + + database = pgipfsethdb.NewDatabase(db, cacheConfig) + + databaseWithBlock, ok := database.(*pgipfsethdb.Database) + Expect(ok).To(BeTrue()) + (*databaseWithBlock).BlockNumber = testBlockNumber + + batch = database.NewBatch() + }) + AfterEach(func() { + groupcache.DeregisterGroup("db") + err = pgipfsethdb.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("Put/Write", func() { + It("adds the key-value pair to the batch", func() { + _, err = database.Get(testEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + _, err = database.Get(testEthKey2) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = batch.Put(testEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testEthKey2, testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + val, err := database.Get(testEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + val2, err := database.Get(testEthKey2) + Expect(err).ToNot(HaveOccurred()) + Expect(val2).To(Equal(testValue2)) + }) + }) + + Describe("Delete/Reset/Write", func() { + It("deletes the key-value pair in the batch", func() { + err = batch.Put(testEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testEthKey2, testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + batch.Reset() + err = batch.Delete(testEthKey) + Expect(err).ToNot(HaveOccurred()) + err = batch.Delete(testEthKey2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + _, err = database.Get(testEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + _, err = database.Get(testEthKey2) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) + + Describe("ValueSize/Reset", func() { + It("returns the size of data in the batch queued for write", func() { + err = batch.Put(testEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testEthKey2, testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + size := batch.ValueSize() + Expect(size).To(Equal(len(testValue) + len(testValue2))) + + batch.Reset() + size = batch.ValueSize() + Expect(size).To(Equal(0)) + }) + }) +}) diff --git a/postgres/v1/database.go b/postgres/v1/database.go new file mode 100644 index 0000000..c21da02 --- /dev/null +++ b/postgres/v1/database.go @@ -0,0 +1,361 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "context" + "database/sql" + "errors" + "fmt" + "math/big" + "strconv" + "strings" + "time" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/jmoiron/sqlx" + "github.com/mailgun/groupcache/v2" + log "github.com/sirupsen/logrus" +) + +var errNotSupported = errors.New("this operation is not supported") + +var ( + hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1 LIMIT 1)" + getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1" + putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING" + deletePgStr = "DELETE FROM public.blocks WHERE key = $1" + dbSizePgStr = "SELECT pg_database_size(current_database())" +) + +var _ ethdb.Database = &Database{} + +// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection +type Database struct { + db *sqlx.DB + cache *groupcache.Group + + BlockNumber *big.Int +} + +func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { + return 0, errNotSupported +} + +type CacheConfig struct { + Name string + Size int + ExpiryDuration time.Duration +} + +// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS +func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore { + database := Database{db: db} + database.InitCache(cacheConfig) + + return &database +} + +// NewDatabase returns a ethdb.Database interface for PG-IPFS +func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database { + database := Database{db: db} + database.InitCache(cacheConfig) + + return &database +} + +func (d *Database) InitCache(cacheConfig CacheConfig) { + d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc( + func(_ context.Context, id string, dest groupcache.Sink) error { + val, err := d.dbGet(id) + + if err != nil { + return err + } + + // Set the value in the groupcache, with expiry + if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil { + return err + } + + return nil + }, + )) +} + +func (d *Database) GetCacheStats() groupcache.Stats { + return d.cache.Stats +} + +// Has satisfies the ethdb.KeyValueReader interface +// Has retrieves if a key is present in the key-value data store +func (d *Database) Has(key []byte) (bool, error) { + mhKey, err := MultihashKeyFromKeccak256(key) + if err != nil { + return false, err + } + var exists bool + return exists, d.db.Get(&exists, hasPgStr, mhKey) +} + +// Get retrieves the given key if it's present in the key-value data store +func (d *Database) dbGet(key string) ([]byte, error) { + var data []byte + err := d.db.Get(&data, getPgStr, key) + if err == sql.ErrNoRows { + log.Warn("Database miss for key", key) + } + + return data, err +} + +// Get satisfies the ethdb.KeyValueReader interface +// Get retrieves the given key if it's present in the key-value data store +func (d *Database) Get(key []byte) ([]byte, error) { + mhKey, err := MultihashKeyFromKeccak256(key) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + + var data []byte + return data, d.cache.Get(ctx, mhKey, groupcache.AllocatingByteSliceSink(&data)) +} + +// Put satisfies the ethdb.KeyValueWriter interface +// Put inserts the given value into the key-value data store +// Key is expected to be the keccak256 hash of value +func (d *Database) Put(key []byte, value []byte) error { + mhKey, err := MultihashKeyFromKeccak256(key) + if err != nil { + return err + } + _, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64()) + return err +} + +// Delete satisfies the ethdb.KeyValueWriter interface +// Delete removes the key from the key-value data store +func (d *Database) Delete(key []byte) error { + mhKey, err := MultihashKeyFromKeccak256(key) + if err != nil { + return err + } + + _, err = d.db.Exec(deletePgStr, mhKey) + if err != nil { + return err + } + + // Remove from cache. + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + err = d.cache.Remove(ctx, mhKey) + + return err +} + +// DatabaseProperty enum type +type DatabaseProperty int + +const ( + Unknown DatabaseProperty = iota + Size + Idle + InUse + MaxIdleClosed + MaxLifetimeClosed + MaxOpenConnections + OpenConnections + WaitCount + WaitDuration +) + +// DatabasePropertyFromString helper function +func DatabasePropertyFromString(property string) (DatabaseProperty, error) { + switch strings.ToLower(property) { + case "size": + return Size, nil + case "idle": + return Idle, nil + case "inuse": + return InUse, nil + case "maxidleclosed": + return MaxIdleClosed, nil + case "maxlifetimeclosed": + return MaxLifetimeClosed, nil + case "maxopenconnections": + return MaxOpenConnections, nil + case "openconnections": + return OpenConnections, nil + case "waitcount": + return WaitCount, nil + case "waitduration": + return WaitDuration, nil + default: + return Unknown, fmt.Errorf("unknown database property") + } +} + +// Stat satisfies the ethdb.Stater interface +// Stat returns a particular internal stat of the database +func (d *Database) Stat(property string) (string, error) { + prop, err := DatabasePropertyFromString(property) + if err != nil { + return "", err + } + switch prop { + case Size: + var byteSize string + return byteSize, d.db.Get(&byteSize, dbSizePgStr) + case Idle: + return strconv.Itoa(d.db.Stats().Idle), nil + case InUse: + return strconv.Itoa(d.db.Stats().InUse), nil + case MaxIdleClosed: + return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil + case MaxLifetimeClosed: + return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil + case MaxOpenConnections: + return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil + case OpenConnections: + return strconv.Itoa(d.db.Stats().OpenConnections), nil + case WaitCount: + return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil + case WaitDuration: + return d.db.Stats().WaitDuration.String(), nil + default: + return "", fmt.Errorf("unhandled database property") + } +} + +// Compact satisfies the ethdb.Compacter interface +// Compact flattens the underlying data store for the given key range +func (d *Database) Compact(start []byte, limit []byte) error { + return errNotSupported +} + +// NewBatch satisfies the ethdb.Batcher interface +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called +func (d *Database) NewBatch() ethdb.Batch { + return NewBatch(d.db, nil, d.BlockNumber) +} + +// NewBatchWithSize satisfies the ethdb.Batcher interface. +// NewBatchWithSize creates a write-only database batch with pre-allocated buffer. +func (d *Database) NewBatchWithSize(size int) ethdb.Batch { + return NewBatch(d.db, nil, d.BlockNumber) +} + +// NewIterator satisfies the ethdb.Iteratee interface +// it creates a binary-alphabetical iterator over a subset +// of database content with a particular key prefix, starting at a particular +// initial key (or after, if it does not exist). +// +// Note: This method assumes that the prefix is NOT part of the start, so there's +// no need for the caller to prepend the prefix to the start +func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator { + return NewIterator(start, prefix, d.db) +} + +// Close satisfies the io.Closer interface +// Close closes the db connection +func (d *Database) Close() error { + return d.db.DB.Close() +} + +// HasAncient satisfies the ethdb.AncientReader interface +// HasAncient returns an indicator whether the specified data exists in the ancient store +func (d *Database) HasAncient(kind string, number uint64) (bool, error) { + return false, errNotSupported +} + +// Ancient satisfies the ethdb.AncientReader interface +// Ancient retrieves an ancient binary blob from the append-only immutable files +func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { + return nil, errNotSupported +} + +// Ancients satisfies the ethdb.AncientReader interface +// Ancients returns the ancient item numbers in the ancient store +func (d *Database) Ancients() (uint64, error) { + return 0, errNotSupported +} + +// Tail satisfies the ethdb.AncientReader interface. +// Tail returns the number of first stored item in the freezer. +func (d *Database) Tail() (uint64, error) { + return 0, errNotSupported +} + +// AncientSize satisfies the ethdb.AncientReader interface +// AncientSize returns the ancient size of the specified category +func (d *Database) AncientSize(kind string) (uint64, error) { + return 0, errNotSupported +} + +// AncientRange retrieves all the items in a range, starting from the index 'start'. +// It will return +// - at most 'count' items, +// - at least 1 item (even if exceeding the maxBytes), but will otherwise +// return as many items as fit into maxBytes. +func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { + return nil, errNotSupported +} + +// ReadAncients applies the provided AncientReader function +func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) { + return errNotSupported +} + +// TruncateHead satisfies the ethdb.AncientWriter interface. +// TruncateHead discards all but the first n ancient data from the ancient store. +func (d *Database) TruncateHead(n uint64) error { + return errNotSupported +} + +// TruncateTail satisfies the ethdb.AncientWriter interface. +// TruncateTail discards the first n ancient data from the ancient store. +func (d *Database) TruncateTail(n uint64) error { + return errNotSupported +} + +// Sync satisfies the ethdb.AncientWriter interface +// Sync flushes all in-memory ancient store data to disk +func (d *Database) Sync() error { + return errNotSupported +} + +// MigrateTable satisfies the ethdb.AncientWriter interface. +// MigrateTable processes and migrates entries of a given table to a new format. +func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error { + return errNotSupported +} + +// NewSnapshot satisfies the ethdb.Snapshotter interface. +// NewSnapshot creates a database snapshot based on the current state. +func (d *Database) NewSnapshot() (ethdb.Snapshot, error) { + return nil, errNotSupported +} + +// AncientDatadir returns an error as we don't have a backing chain freezer. +func (d *Database) AncientDatadir() (string, error) { + return "", errNotSupported +} diff --git a/postgres/v1/database_test.go b/postgres/v1/database_test.go new file mode 100644 index 0000000..b929561 --- /dev/null +++ b/postgres/v1/database_test.go @@ -0,0 +1,127 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb_test + +import ( + "math/big" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/jmoiron/sqlx" + "github.com/mailgun/groupcache/v2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" +) + +var ( + database ethdb.Database + db *sqlx.DB + err error + testBlockNumber = big.NewInt(1337) + testHeader = types.Header{Number: testBlockNumber} + testValue, _ = rlp.EncodeToBytes(testHeader) + testEthKey = testHeader.Hash().Bytes() + testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey) +) + +var _ = Describe("Database", func() { + BeforeEach(func() { + db, err = pgipfsethdb.TestDB() + Expect(err).ToNot(HaveOccurred()) + + cacheConfig := pgipfsethdb.CacheConfig{ + Name: "db", + Size: 3000000, // 3MB + ExpiryDuration: time.Hour, + } + + database = pgipfsethdb.NewDatabase(db, cacheConfig) + + databaseWithBlock, ok := database.(*pgipfsethdb.Database) + Expect(ok).To(BeTrue()) + (*databaseWithBlock).BlockNumber = testBlockNumber + }) + AfterEach(func() { + groupcache.DeregisterGroup("db") + err = pgipfsethdb.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("Has", func() { + It("returns false if a key-pair doesn't exist in the db", func() { + has, err := database.Has(testEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).ToNot(BeTrue()) + }) + It("returns true if a key-pair exists in the db", func() { + _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64()) + Expect(err).ToNot(HaveOccurred()) + has, err := database.Has(testEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(BeTrue()) + }) + }) + + Describe("Get", func() { + It("throws an err if the key-pair doesn't exist in the db", func() { + _, err = database.Get(testEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + It("returns the value associated with the key, if the pair exists", func() { + _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64()) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Put", func() { + It("persists the key-value pair in the database", func() { + _, err = database.Get(testEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = database.Put(testEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Delete", func() { + It("removes the key-value pair from the database", func() { + err = database.Put(testEthKey, testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testEthKey) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + + err = database.Delete(testEthKey) + Expect(err).ToNot(HaveOccurred()) + _, err = database.Get(testEthKey) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) +}) diff --git a/postgres/v1/iterator.go b/postgres/v1/iterator.go new file mode 100644 index 0000000..0d3a0a4 --- /dev/null +++ b/postgres/v1/iterator.go @@ -0,0 +1,90 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "github.com/ethereum/go-ethereum/ethdb" + "github.com/jmoiron/sqlx" +) + +var _ ethdb.Iterator = &Iterator{} + +// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection +// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync), +// rawdb.InspectDatabase, and the new core/state/snapshot features. +// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed +// from the ethdb.KeyValueStoreand ethdb.Database interfaces) +type Iterator struct { + db *sqlx.DB + currentKey, prefix []byte + err error +} + +// NewIterator returns an ethdb.Iterator interface for PG-IPFS +func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator { + return &Iterator{ + db: db, + prefix: prefix, + currentKey: start, + } +} + +// Next satisfies the ethdb.Iterator interface +// Next moves the iterator to the next key/value pair +// It returns whether the iterator is exhausted +func (i *Iterator) Next() bool { + // this is complicated by the ipfs db keys not being the keccak256 hashes + // go-ethereum usage of this method expects the iteration to occur over keccak256 keys + panic("implement me: Next") +} + +// Error satisfies the ethdb.Iterator interface +// Error returns any accumulated error +// Exhausting all the key/value pairs is not considered to be an error +func (i *Iterator) Error() error { + return i.err +} + +// Key satisfies the ethdb.Iterator interface +// Key returns the key of the current key/value pair, or nil if done +// The caller should not modify the contents of the returned slice +// and its contents may change on the next call to Next +func (i *Iterator) Key() []byte { + return i.currentKey +} + +// Value satisfies the ethdb.Iterator interface +// Value returns the value of the current key/value pair, or nil if done +// The caller should not modify the contents of the returned slice +// and its contents may change on the next call to Next +func (i *Iterator) Value() []byte { + mhKey, err := MultihashKeyFromKeccak256(i.currentKey) + if err != nil { + i.err = err + return nil + } + var data []byte + i.err = i.db.Get(&data, getPgStr, mhKey) + return data +} + +// Release satisfies the ethdb.Iterator interface +// Release releases associated resources +// Release should always succeed and can be called multiple times without causing error +func (i *Iterator) Release() { + i.db.Close() +} diff --git a/postgres/v1/postgres_suite_test.go b/postgres/v1/postgres_suite_test.go new file mode 100644 index 0000000..fa1f767 --- /dev/null +++ b/postgres/v1/postgres_suite_test.go @@ -0,0 +1,29 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestPGIPFSETHDB(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "PG-IPFS ethdb test") +} diff --git a/postgres/v1/util.go b/postgres/v1/util.go new file mode 100644 index 0000000..eedd8fd --- /dev/null +++ b/postgres/v1/util.go @@ -0,0 +1,49 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + blockstore "github.com/ipfs/go-ipfs-blockstore" + dshelp "github.com/ipfs/go-ipfs-ds-help" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" //postgres driver + "github.com/multiformats/go-multihash" +) + +// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string +func MultihashKeyFromKeccak256(h []byte) (string, error) { + mh, err := multihash.Encode(h, multihash.KECCAK_256) + if err != nil { + return "", err + } + dbKey := dshelp.MultihashToDsKey(mh) + return blockstore.BlockPrefix.String() + dbKey.String(), nil +} + +// TestDB connect to the testing database +// it assumes the database has the IPFS public.blocks table present +// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table +func TestDB() (*sqlx.DB, error) { + connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" + return sqlx.Connect("postgres", connectStr) +} + +// ResetTestDB drops all rows in the test db public.blocks table +func ResetTestDB(db *sqlx.DB) error { + _, err := db.Exec("TRUNCATE public.blocks CASCADE") + return err +} From 844cda11074021c76080374f461bfd0dc7079e12 Mon Sep 17 00:00:00 2001 From: i-norden Date: Mon, 20 Feb 2023 12:21:52 -0600 Subject: [PATCH 2/4] shared util --- postgres/shared/util.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 postgres/shared/util.go diff --git a/postgres/shared/util.go b/postgres/shared/util.go new file mode 100644 index 0000000..1a26852 --- /dev/null +++ b/postgres/shared/util.go @@ -0,0 +1,33 @@ +// VulcanizeDB +// Copyright © 2023 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package shared + +import "github.com/jmoiron/sqlx" + +// TestDB connect to the testing database +// it assumes the database has the IPFS public.blocks table present +// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table +func TestDB() (*sqlx.DB, error) { + connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" + return sqlx.Connect("postgres", connectStr) +} + +// ResetTestDB drops all rows in the test db public.blocks table +func ResetTestDB(db *sqlx.DB) error { + _, err := db.Exec("TRUNCATE public.blocks CASCADE") + return err +} From 5451c3c225b68b1b9fb2b90cadcdd44c12c6cf8e Mon Sep 17 00:00:00 2001 From: i-norden Date: Mon, 20 Feb 2023 12:22:17 -0600 Subject: [PATCH 3/4] refactor v1 pkg to work with new dep paths --- postgres/v1/batch_test.go | 8 +++++--- postgres/v1/database_test.go | 8 +++++--- postgres/v1/util.go | 15 --------------- 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/postgres/v1/batch_test.go b/postgres/v1/batch_test.go index ba4ca20..1d559b8 100644 --- a/postgres/v1/batch_test.go +++ b/postgres/v1/batch_test.go @@ -20,6 +20,8 @@ import ( "math/big" "time" + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -27,7 +29,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v1" ) var ( @@ -39,7 +41,7 @@ var ( var _ = Describe("Batch", func() { BeforeEach(func() { - db, err = pgipfsethdb.TestDB() + db, err = shared.TestDB() Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ @@ -58,7 +60,7 @@ var _ = Describe("Batch", func() { }) AfterEach(func() { groupcache.DeregisterGroup("db") - err = pgipfsethdb.ResetTestDB(db) + err = shared.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) diff --git a/postgres/v1/database_test.go b/postgres/v1/database_test.go index b929561..5af1d93 100644 --- a/postgres/v1/database_test.go +++ b/postgres/v1/database_test.go @@ -20,6 +20,8 @@ import ( "math/big" "time" + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -28,7 +30,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v1" ) var ( @@ -44,7 +46,7 @@ var ( var _ = Describe("Database", func() { BeforeEach(func() { - db, err = pgipfsethdb.TestDB() + db, err = shared.TestDB() Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ @@ -61,7 +63,7 @@ var _ = Describe("Database", func() { }) AfterEach(func() { groupcache.DeregisterGroup("db") - err = pgipfsethdb.ResetTestDB(db) + err = shared.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) diff --git a/postgres/v1/util.go b/postgres/v1/util.go index eedd8fd..e65b102 100644 --- a/postgres/v1/util.go +++ b/postgres/v1/util.go @@ -19,7 +19,6 @@ package pgipfsethdb import ( blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - "github.com/jmoiron/sqlx" _ "github.com/lib/pq" //postgres driver "github.com/multiformats/go-multihash" ) @@ -33,17 +32,3 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) { dbKey := dshelp.MultihashToDsKey(mh) return blockstore.BlockPrefix.String() + dbKey.String(), nil } - -// TestDB connect to the testing database -// it assumes the database has the IPFS public.blocks table present -// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table -func TestDB() (*sqlx.DB, error) { - connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" - return sqlx.Connect("postgres", connectStr) -} - -// ResetTestDB drops all rows in the test db public.blocks table -func ResetTestDB(db *sqlx.DB) error { - _, err := db.Exec("TRUNCATE public.blocks CASCADE") - return err -} From d2176e69af5ca174f19768d378c3760b12d0eae6 Mon Sep 17 00:00:00 2001 From: i-norden Date: Mon, 20 Feb 2023 12:26:40 -0600 Subject: [PATCH 4/4] update postgres v0 subpkg and tests to work with cid keys --- postgres/v0/batch.go | 23 ++++++++++++++------- postgres/v0/batch_test.go | 39 ++++++++++++++++++++---------------- postgres/v0/database.go | 37 ++++++++++++++++++---------------- postgres/v0/database_test.go | 38 +++++++++++++++++++---------------- postgres/v0/iterator.go | 5 +++-- postgres/v0/util.go | 29 ++++++--------------------- 6 files changed, 88 insertions(+), 83 deletions(-) diff --git a/postgres/v0/batch.go b/postgres/v0/batch.go index 0099567..0a83747 100644 --- a/postgres/v0/batch.go +++ b/postgres/v0/batch.go @@ -19,6 +19,8 @@ package pgipfsethdb import ( "math/big" + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" ) @@ -49,13 +51,20 @@ func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch { // Put satisfies the ethdb.Batch interface // Put inserts the given value into the key-value data store -// Key is expected to be the keccak256 hash of value -func (b *Batch) Put(key []byte, value []byte) (err error) { - mhKey, err := MultihashKeyFromKeccak256(key) +// Key is expected to be a fully formulated cid key +// TODO: note, now that we expected a cid we could route to the "cids" tables based on prefix instead of to public.blocks +// but is it better to handle this routing here, or use a completely different interface since we already have to refactor +// at levels above this package in order to pass in cids instead of raw keccak256 hashes +func (b *Batch) Put(cidBytes []byte, value []byte) (err error) { + // cast and resolve strings from cid.Cast + // this will assert that we have a correctly formatted CID + // and will handle the different string encodings for v0 and v1 CIDs + // (note that this v0 vs v1 is different from the blockstore v0 vs v1) + c, err := cid.Cast(cidBytes) if err != nil { return err } - if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil { + if _, err = b.tx.Exec(putPgStr, c.String(), value, b.blockNumber.Uint64()); err != nil { return err } b.valueSize += len(value) @@ -64,12 +73,12 @@ func (b *Batch) Put(key []byte, value []byte) (err error) { // Delete satisfies the ethdb.Batch interface // Delete removes the key from the key-value data store -func (b *Batch) Delete(key []byte) (err error) { - mhKey, err := MultihashKeyFromKeccak256(key) +func (b *Batch) Delete(cidBytes []byte) (err error) { + c, err := cid.Cast(cidBytes) if err != nil { return err } - _, err = b.tx.Exec(deletePgStr, mhKey) + _, err = b.tx.Exec(deletePgStr, c.String()) return err } diff --git a/postgres/v0/batch_test.go b/postgres/v0/batch_test.go index ba4ca20..f8e496e 100644 --- a/postgres/v0/batch_test.go +++ b/postgres/v0/batch_test.go @@ -20,6 +20,10 @@ import ( "math/big" "time" + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -27,7 +31,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v0" ) var ( @@ -35,11 +39,12 @@ var ( testHeader2 = types.Header{Number: big.NewInt(2)} testValue2, _ = rlp.EncodeToBytes(testHeader2) testEthKey2 = testHeader2.Hash().Bytes() + testCID2, _ = pgipfsethdb.CIDFromKeccak256(testEthKey2, cid.EthBlock) ) var _ = Describe("Batch", func() { BeforeEach(func() { - db, err = pgipfsethdb.TestDB() + db, err = shared.TestDB() Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ @@ -58,30 +63,30 @@ var _ = Describe("Batch", func() { }) AfterEach(func() { groupcache.DeregisterGroup("db") - err = pgipfsethdb.ResetTestDB(db) + err = shared.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) Describe("Put/Write", func() { It("adds the key-value pair to the batch", func() { - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) - _, err = database.Get(testEthKey2) + _, err = database.Get(testCID2.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) - err = batch.Put(testEthKey, testValue) + err = batch.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - err = batch.Put(testEthKey2, testValue2) + err = batch.Put(testCID2.Bytes(), testValue2) Expect(err).ToNot(HaveOccurred()) err = batch.Write() Expect(err).ToNot(HaveOccurred()) - val, err := database.Get(testEthKey) + val, err := database.Get(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) - val2, err := database.Get(testEthKey2) + val2, err := database.Get(testCID2.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val2).To(Equal(testValue2)) }) @@ -89,25 +94,25 @@ var _ = Describe("Batch", func() { Describe("Delete/Reset/Write", func() { It("deletes the key-value pair in the batch", func() { - err = batch.Put(testEthKey, testValue) + err = batch.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - err = batch.Put(testEthKey2, testValue2) + err = batch.Put(testCID2.Bytes(), testValue2) Expect(err).ToNot(HaveOccurred()) err = batch.Write() Expect(err).ToNot(HaveOccurred()) batch.Reset() - err = batch.Delete(testEthKey) + err = batch.Delete(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) - err = batch.Delete(testEthKey2) + err = batch.Delete(testCID2.Bytes()) Expect(err).ToNot(HaveOccurred()) err = batch.Write() Expect(err).ToNot(HaveOccurred()) - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) - _, err = database.Get(testEthKey2) + _, err = database.Get(testCID2.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) }) @@ -115,9 +120,9 @@ var _ = Describe("Batch", func() { Describe("ValueSize/Reset", func() { It("returns the size of data in the batch queued for write", func() { - err = batch.Put(testEthKey, testValue) + err = batch.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - err = batch.Put(testEthKey2, testValue2) + err = batch.Put(testCID2.Bytes(), testValue2) Expect(err).ToNot(HaveOccurred()) err = batch.Write() Expect(err).ToNot(HaveOccurred()) diff --git a/postgres/v0/database.go b/postgres/v0/database.go index c21da02..4fc59a3 100644 --- a/postgres/v0/database.go +++ b/postgres/v0/database.go @@ -26,6 +26,8 @@ import ( "strings" "time" + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" "github.com/mailgun/groupcache/v2" @@ -102,14 +104,14 @@ func (d *Database) GetCacheStats() groupcache.Stats { } // Has satisfies the ethdb.KeyValueReader interface -// Has retrieves if a key is present in the key-value data store -func (d *Database) Has(key []byte) (bool, error) { - mhKey, err := MultihashKeyFromKeccak256(key) +// Has retrieves if a cid is present in the key-value data store +func (d *Database) Has(cidBytes []byte) (bool, error) { + c, err := cid.Cast(cidBytes) if err != nil { return false, err } var exists bool - return exists, d.db.Get(&exists, hasPgStr, mhKey) + return exists, d.db.Get(&exists, hasPgStr, c.String()) } // Get retrieves the given key if it's present in the key-value data store @@ -124,9 +126,9 @@ func (d *Database) dbGet(key string) ([]byte, error) { } // Get satisfies the ethdb.KeyValueReader interface -// Get retrieves the given key if it's present in the key-value data store -func (d *Database) Get(key []byte) ([]byte, error) { - mhKey, err := MultihashKeyFromKeccak256(key) +// Get retrieves the given cid if it's present in the key-value data store +func (d *Database) Get(cidBytes []byte) ([]byte, error) { + c, err := cid.Cast(cidBytes) if err != nil { return nil, err } @@ -135,30 +137,31 @@ func (d *Database) Get(key []byte) ([]byte, error) { defer cancel() var data []byte - return data, d.cache.Get(ctx, mhKey, groupcache.AllocatingByteSliceSink(&data)) + return data, d.cache.Get(ctx, c.String(), groupcache.AllocatingByteSliceSink(&data)) } // Put satisfies the ethdb.KeyValueWriter interface // Put inserts the given value into the key-value data store -// Key is expected to be the keccak256 hash of value -func (d *Database) Put(key []byte, value []byte) error { - mhKey, err := MultihashKeyFromKeccak256(key) +// Key is expected to be a fully formulated cis of value +func (d *Database) Put(cidBytes []byte, value []byte) error { + c, err := cid.Cast(cidBytes) if err != nil { return err } - _, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64()) + _, err = d.db.Exec(putPgStr, c.String(), value, d.BlockNumber.Uint64()) return err } // Delete satisfies the ethdb.KeyValueWriter interface -// Delete removes the key from the key-value data store -func (d *Database) Delete(key []byte) error { - mhKey, err := MultihashKeyFromKeccak256(key) +// Delete removes the cid from the key-value data store +func (d *Database) Delete(cidBytes []byte) error { + c, err := cid.Cast(cidBytes) if err != nil { return err } + cidString := c.String() - _, err = d.db.Exec(deletePgStr, mhKey) + _, err = d.db.Exec(deletePgStr, cidString) if err != nil { return err } @@ -166,7 +169,7 @@ func (d *Database) Delete(key []byte) error { // Remove from cache. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() - err = d.cache.Remove(ctx, mhKey) + err = d.cache.Remove(ctx, cidString) return err } diff --git a/postgres/v0/database_test.go b/postgres/v0/database_test.go index b929561..8736222 100644 --- a/postgres/v0/database_test.go +++ b/postgres/v0/database_test.go @@ -20,6 +20,10 @@ import ( "math/big" "time" + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -28,7 +32,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v0" ) var ( @@ -39,12 +43,12 @@ var ( testHeader = types.Header{Number: testBlockNumber} testValue, _ = rlp.EncodeToBytes(testHeader) testEthKey = testHeader.Hash().Bytes() - testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey) + testCID, _ = pgipfsethdb.CIDFromKeccak256(testEthKey, cid.EthBlock) ) var _ = Describe("Database", func() { BeforeEach(func() { - db, err = pgipfsethdb.TestDB() + db, err = shared.TestDB() Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ @@ -61,20 +65,20 @@ var _ = Describe("Database", func() { }) AfterEach(func() { groupcache.DeregisterGroup("db") - err = pgipfsethdb.ResetTestDB(db) + err = shared.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) Describe("Has", func() { It("returns false if a key-pair doesn't exist in the db", func() { - has, err := database.Has(testEthKey) + has, err := database.Has(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(has).ToNot(BeTrue()) }) It("returns true if a key-pair exists in the db", func() { - _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64()) + _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64()) Expect(err).ToNot(HaveOccurred()) - has, err := database.Has(testEthKey) + has, err := database.Has(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(has).To(BeTrue()) }) @@ -82,14 +86,14 @@ var _ = Describe("Database", func() { Describe("Get", func() { It("throws an err if the key-pair doesn't exist in the db", func() { - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) }) It("returns the value associated with the key, if the pair exists", func() { - _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64()) + _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64()) Expect(err).ToNot(HaveOccurred()) - val, err := database.Get(testEthKey) + val, err := database.Get(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) }) @@ -97,13 +101,13 @@ var _ = Describe("Database", func() { Describe("Put", func() { It("persists the key-value pair in the database", func() { - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) - err = database.Put(testEthKey, testValue) + err = database.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - val, err := database.Get(testEthKey) + val, err := database.Get(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) }) @@ -111,15 +115,15 @@ var _ = Describe("Database", func() { Describe("Delete", func() { It("removes the key-value pair from the database", func() { - err = database.Put(testEthKey, testValue) + err = database.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - val, err := database.Get(testEthKey) + val, err := database.Get(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) - err = database.Delete(testEthKey) + err = database.Delete(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) }) diff --git a/postgres/v0/iterator.go b/postgres/v0/iterator.go index 0d3a0a4..e5c8e64 100644 --- a/postgres/v0/iterator.go +++ b/postgres/v0/iterator.go @@ -18,6 +18,7 @@ package pgipfsethdb import ( "github.com/ethereum/go-ethereum/ethdb" + "github.com/ipfs/go-cid" "github.com/jmoiron/sqlx" ) @@ -72,13 +73,13 @@ func (i *Iterator) Key() []byte { // The caller should not modify the contents of the returned slice // and its contents may change on the next call to Next func (i *Iterator) Value() []byte { - mhKey, err := MultihashKeyFromKeccak256(i.currentKey) + c, err := cid.Cast(i.currentKey) if err != nil { i.err = err return nil } var data []byte - i.err = i.db.Get(&data, getPgStr, mhKey) + i.err = i.db.Get(&data, getPgStr, c) return data } diff --git a/postgres/v0/util.go b/postgres/v0/util.go index eedd8fd..85700bf 100644 --- a/postgres/v0/util.go +++ b/postgres/v0/util.go @@ -17,33 +17,16 @@ package pgipfsethdb import ( - blockstore "github.com/ipfs/go-ipfs-blockstore" - dshelp "github.com/ipfs/go-ipfs-ds-help" - "github.com/jmoiron/sqlx" + "github.com/ipfs/go-cid" _ "github.com/lib/pq" //postgres driver "github.com/multiformats/go-multihash" ) -// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string -func MultihashKeyFromKeccak256(h []byte) (string, error) { - mh, err := multihash.Encode(h, multihash.KECCAK_256) +// CIDFromKeccak256 converts keccak256 hash bytes into a v1 cid +func CIDFromKeccak256(hash []byte, codecType uint64) (cid.Cid, error) { + mh, err := multihash.Encode(hash, multihash.KECCAK_256) if err != nil { - return "", err + return cid.Cid{}, err } - dbKey := dshelp.MultihashToDsKey(mh) - return blockstore.BlockPrefix.String() + dbKey.String(), nil -} - -// TestDB connect to the testing database -// it assumes the database has the IPFS public.blocks table present -// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table -func TestDB() (*sqlx.DB, error) { - connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" - return sqlx.Connect("postgres", connectStr) -} - -// ResetTestDB drops all rows in the test db public.blocks table -func ResetTestDB(db *sqlx.DB) error { - _, err := db.Exec("TRUNCATE public.blocks CASCADE") - return err + return cid.NewCidV1(codecType, mh), nil }