diff --git a/postgres/shared/util.go b/postgres/shared/util.go new file mode 100644 index 0000000..1a26852 --- /dev/null +++ b/postgres/shared/util.go @@ -0,0 +1,33 @@ +// VulcanizeDB +// Copyright © 2023 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package shared + +import "github.com/jmoiron/sqlx" + +// TestDB connect to the testing database +// it assumes the database has the IPFS public.blocks table present +// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table +func TestDB() (*sqlx.DB, error) { + connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" + return sqlx.Connect("postgres", connectStr) +} + +// ResetTestDB drops all rows in the test db public.blocks table +func ResetTestDB(db *sqlx.DB) error { + _, err := db.Exec("TRUNCATE public.blocks CASCADE") + return err +} diff --git a/postgres/v0/batch.go b/postgres/v0/batch.go new file mode 100644 index 0000000..0a83747 --- /dev/null +++ b/postgres/v0/batch.go @@ -0,0 +1,117 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "math/big" + + "github.com/ipfs/go-cid" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/jmoiron/sqlx" +) + +var _ ethdb.Batch = &Batch{} + +// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection +type Batch struct { + db *sqlx.DB + tx *sqlx.Tx + valueSize int + + blockNumber *big.Int +} + +// NewBatch returns a ethdb.Batch interface for PG-IPFS +func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch { + b := &Batch{ + db: db, + tx: tx, + blockNumber: blockNumber, + } + if tx == nil { + b.Reset() + } + return b +} + +// Put satisfies the ethdb.Batch interface +// Put inserts the given value into the key-value data store +// Key is expected to be a fully formulated cid key +// TODO: note, now that we expected a cid we could route to the "cids" tables based on prefix instead of to public.blocks +// but is it better to handle this routing here, or use a completely different interface since we already have to refactor +// at levels above this package in order to pass in cids instead of raw keccak256 hashes +func (b *Batch) Put(cidBytes []byte, value []byte) (err error) { + // cast and resolve strings from cid.Cast + // this will assert that we have a correctly formatted CID + // and will handle the different string encodings for v0 and v1 CIDs + // (note that this v0 vs v1 is different from the blockstore v0 vs v1) + c, err := cid.Cast(cidBytes) + if err != nil { + return err + } + if _, err = b.tx.Exec(putPgStr, c.String(), value, b.blockNumber.Uint64()); err != nil { + return err + } + b.valueSize += len(value) + return nil +} + +// Delete satisfies the ethdb.Batch interface +// Delete removes the key from the key-value data store +func (b *Batch) Delete(cidBytes []byte) (err error) { + c, err := cid.Cast(cidBytes) + if err != nil { + return err + } + _, err = b.tx.Exec(deletePgStr, c.String()) + return err +} + +// ValueSize satisfies the ethdb.Batch interface +// ValueSize retrieves the amount of data queued up for writing +// The returned value is the total byte length of all data queued to write +func (b *Batch) ValueSize() int { + return b.valueSize +} + +// Write satisfies the ethdb.Batch interface +// Write flushes any accumulated data to disk +func (b *Batch) Write() error { + if b.tx == nil { + return nil + } + return b.tx.Commit() +} + +// Replay satisfies the ethdb.Batch interface +// Replay replays the batch contents +func (b *Batch) Replay(w ethdb.KeyValueWriter) error { + return errNotSupported +} + +// Reset satisfies the ethdb.Batch interface +// Reset resets the batch for reuse +// This should be called after every write +func (b *Batch) Reset() { + var err error + b.tx, err = b.db.Beginx() + if err != nil { + panic(err) + } + b.valueSize = 0 +} diff --git a/postgres/v0/batch_test.go b/postgres/v0/batch_test.go new file mode 100644 index 0000000..f8e496e --- /dev/null +++ b/postgres/v0/batch_test.go @@ -0,0 +1,138 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb_test + +import ( + "math/big" + "time" + + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + + "github.com/ipfs/go-cid" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/mailgun/groupcache/v2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v0" +) + +var ( + batch ethdb.Batch + testHeader2 = types.Header{Number: big.NewInt(2)} + testValue2, _ = rlp.EncodeToBytes(testHeader2) + testEthKey2 = testHeader2.Hash().Bytes() + testCID2, _ = pgipfsethdb.CIDFromKeccak256(testEthKey2, cid.EthBlock) +) + +var _ = Describe("Batch", func() { + BeforeEach(func() { + db, err = shared.TestDB() + Expect(err).ToNot(HaveOccurred()) + + cacheConfig := pgipfsethdb.CacheConfig{ + Name: "db", + Size: 3000000, // 3MB + ExpiryDuration: time.Hour, + } + + database = pgipfsethdb.NewDatabase(db, cacheConfig) + + databaseWithBlock, ok := database.(*pgipfsethdb.Database) + Expect(ok).To(BeTrue()) + (*databaseWithBlock).BlockNumber = testBlockNumber + + batch = database.NewBatch() + }) + AfterEach(func() { + groupcache.DeregisterGroup("db") + err = shared.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("Put/Write", func() { + It("adds the key-value pair to the batch", func() { + _, err = database.Get(testCID.Bytes()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + _, err = database.Get(testCID2.Bytes()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = batch.Put(testCID.Bytes(), testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testCID2.Bytes(), testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + val, err := database.Get(testCID.Bytes()) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + val2, err := database.Get(testCID2.Bytes()) + Expect(err).ToNot(HaveOccurred()) + Expect(val2).To(Equal(testValue2)) + }) + }) + + Describe("Delete/Reset/Write", func() { + It("deletes the key-value pair in the batch", func() { + err = batch.Put(testCID.Bytes(), testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testCID2.Bytes(), testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + batch.Reset() + err = batch.Delete(testCID.Bytes()) + Expect(err).ToNot(HaveOccurred()) + err = batch.Delete(testCID2.Bytes()) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + _, err = database.Get(testCID.Bytes()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + _, err = database.Get(testCID2.Bytes()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) + + Describe("ValueSize/Reset", func() { + It("returns the size of data in the batch queued for write", func() { + err = batch.Put(testCID.Bytes(), testValue) + Expect(err).ToNot(HaveOccurred()) + err = batch.Put(testCID2.Bytes(), testValue2) + Expect(err).ToNot(HaveOccurred()) + err = batch.Write() + Expect(err).ToNot(HaveOccurred()) + + size := batch.ValueSize() + Expect(size).To(Equal(len(testValue) + len(testValue2))) + + batch.Reset() + size = batch.ValueSize() + Expect(size).To(Equal(0)) + }) + }) +}) diff --git a/postgres/v0/database.go b/postgres/v0/database.go new file mode 100644 index 0000000..4fc59a3 --- /dev/null +++ b/postgres/v0/database.go @@ -0,0 +1,364 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "context" + "database/sql" + "errors" + "fmt" + "math/big" + "strconv" + "strings" + "time" + + "github.com/ipfs/go-cid" + + "github.com/ethereum/go-ethereum/ethdb" + "github.com/jmoiron/sqlx" + "github.com/mailgun/groupcache/v2" + log "github.com/sirupsen/logrus" +) + +var errNotSupported = errors.New("this operation is not supported") + +var ( + hasPgStr = "SELECT exists(select 1 from public.blocks WHERE key = $1 LIMIT 1)" + getPgStr = "SELECT data FROM public.blocks WHERE key = $1 LIMIT 1" + putPgStr = "INSERT INTO public.blocks (key, data, block_number) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING" + deletePgStr = "DELETE FROM public.blocks WHERE key = $1" + dbSizePgStr = "SELECT pg_database_size(current_database())" +) + +var _ ethdb.Database = &Database{} + +// Database is the type that satisfies the ethdb.Database and ethdb.KeyValueStore interfaces for PG-IPFS Ethereum data using a direct Postgres connection +type Database struct { + db *sqlx.DB + cache *groupcache.Group + + BlockNumber *big.Int +} + +func (d *Database) ModifyAncients(f func(ethdb.AncientWriteOp) error) (int64, error) { + return 0, errNotSupported +} + +type CacheConfig struct { + Name string + Size int + ExpiryDuration time.Duration +} + +// NewKeyValueStore returns a ethdb.KeyValueStore interface for PG-IPFS +func NewKeyValueStore(db *sqlx.DB, cacheConfig CacheConfig) ethdb.KeyValueStore { + database := Database{db: db} + database.InitCache(cacheConfig) + + return &database +} + +// NewDatabase returns a ethdb.Database interface for PG-IPFS +func NewDatabase(db *sqlx.DB, cacheConfig CacheConfig) ethdb.Database { + database := Database{db: db} + database.InitCache(cacheConfig) + + return &database +} + +func (d *Database) InitCache(cacheConfig CacheConfig) { + d.cache = groupcache.NewGroup(cacheConfig.Name, int64(cacheConfig.Size), groupcache.GetterFunc( + func(_ context.Context, id string, dest groupcache.Sink) error { + val, err := d.dbGet(id) + + if err != nil { + return err + } + + // Set the value in the groupcache, with expiry + if err := dest.SetBytes(val, time.Now().Add(cacheConfig.ExpiryDuration)); err != nil { + return err + } + + return nil + }, + )) +} + +func (d *Database) GetCacheStats() groupcache.Stats { + return d.cache.Stats +} + +// Has satisfies the ethdb.KeyValueReader interface +// Has retrieves if a cid is present in the key-value data store +func (d *Database) Has(cidBytes []byte) (bool, error) { + c, err := cid.Cast(cidBytes) + if err != nil { + return false, err + } + var exists bool + return exists, d.db.Get(&exists, hasPgStr, c.String()) +} + +// Get retrieves the given key if it's present in the key-value data store +func (d *Database) dbGet(key string) ([]byte, error) { + var data []byte + err := d.db.Get(&data, getPgStr, key) + if err == sql.ErrNoRows { + log.Warn("Database miss for key", key) + } + + return data, err +} + +// Get satisfies the ethdb.KeyValueReader interface +// Get retrieves the given cid if it's present in the key-value data store +func (d *Database) Get(cidBytes []byte) ([]byte, error) { + c, err := cid.Cast(cidBytes) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + + var data []byte + return data, d.cache.Get(ctx, c.String(), groupcache.AllocatingByteSliceSink(&data)) +} + +// Put satisfies the ethdb.KeyValueWriter interface +// Put inserts the given value into the key-value data store +// Key is expected to be a fully formulated cis of value +func (d *Database) Put(cidBytes []byte, value []byte) error { + c, err := cid.Cast(cidBytes) + if err != nil { + return err + } + _, err = d.db.Exec(putPgStr, c.String(), value, d.BlockNumber.Uint64()) + return err +} + +// Delete satisfies the ethdb.KeyValueWriter interface +// Delete removes the cid from the key-value data store +func (d *Database) Delete(cidBytes []byte) error { + c, err := cid.Cast(cidBytes) + if err != nil { + return err + } + cidString := c.String() + + _, err = d.db.Exec(deletePgStr, cidString) + if err != nil { + return err + } + + // Remove from cache. + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) + defer cancel() + err = d.cache.Remove(ctx, cidString) + + return err +} + +// DatabaseProperty enum type +type DatabaseProperty int + +const ( + Unknown DatabaseProperty = iota + Size + Idle + InUse + MaxIdleClosed + MaxLifetimeClosed + MaxOpenConnections + OpenConnections + WaitCount + WaitDuration +) + +// DatabasePropertyFromString helper function +func DatabasePropertyFromString(property string) (DatabaseProperty, error) { + switch strings.ToLower(property) { + case "size": + return Size, nil + case "idle": + return Idle, nil + case "inuse": + return InUse, nil + case "maxidleclosed": + return MaxIdleClosed, nil + case "maxlifetimeclosed": + return MaxLifetimeClosed, nil + case "maxopenconnections": + return MaxOpenConnections, nil + case "openconnections": + return OpenConnections, nil + case "waitcount": + return WaitCount, nil + case "waitduration": + return WaitDuration, nil + default: + return Unknown, fmt.Errorf("unknown database property") + } +} + +// Stat satisfies the ethdb.Stater interface +// Stat returns a particular internal stat of the database +func (d *Database) Stat(property string) (string, error) { + prop, err := DatabasePropertyFromString(property) + if err != nil { + return "", err + } + switch prop { + case Size: + var byteSize string + return byteSize, d.db.Get(&byteSize, dbSizePgStr) + case Idle: + return strconv.Itoa(d.db.Stats().Idle), nil + case InUse: + return strconv.Itoa(d.db.Stats().InUse), nil + case MaxIdleClosed: + return strconv.FormatInt(d.db.Stats().MaxIdleClosed, 10), nil + case MaxLifetimeClosed: + return strconv.FormatInt(d.db.Stats().MaxLifetimeClosed, 10), nil + case MaxOpenConnections: + return strconv.Itoa(d.db.Stats().MaxOpenConnections), nil + case OpenConnections: + return strconv.Itoa(d.db.Stats().OpenConnections), nil + case WaitCount: + return strconv.FormatInt(d.db.Stats().WaitCount, 10), nil + case WaitDuration: + return d.db.Stats().WaitDuration.String(), nil + default: + return "", fmt.Errorf("unhandled database property") + } +} + +// Compact satisfies the ethdb.Compacter interface +// Compact flattens the underlying data store for the given key range +func (d *Database) Compact(start []byte, limit []byte) error { + return errNotSupported +} + +// NewBatch satisfies the ethdb.Batcher interface +// NewBatch creates a write-only database that buffers changes to its host db +// until a final write is called +func (d *Database) NewBatch() ethdb.Batch { + return NewBatch(d.db, nil, d.BlockNumber) +} + +// NewBatchWithSize satisfies the ethdb.Batcher interface. +// NewBatchWithSize creates a write-only database batch with pre-allocated buffer. +func (d *Database) NewBatchWithSize(size int) ethdb.Batch { + return NewBatch(d.db, nil, d.BlockNumber) +} + +// NewIterator satisfies the ethdb.Iteratee interface +// it creates a binary-alphabetical iterator over a subset +// of database content with a particular key prefix, starting at a particular +// initial key (or after, if it does not exist). +// +// Note: This method assumes that the prefix is NOT part of the start, so there's +// no need for the caller to prepend the prefix to the start +func (d *Database) NewIterator(prefix []byte, start []byte) ethdb.Iterator { + return NewIterator(start, prefix, d.db) +} + +// Close satisfies the io.Closer interface +// Close closes the db connection +func (d *Database) Close() error { + return d.db.DB.Close() +} + +// HasAncient satisfies the ethdb.AncientReader interface +// HasAncient returns an indicator whether the specified data exists in the ancient store +func (d *Database) HasAncient(kind string, number uint64) (bool, error) { + return false, errNotSupported +} + +// Ancient satisfies the ethdb.AncientReader interface +// Ancient retrieves an ancient binary blob from the append-only immutable files +func (d *Database) Ancient(kind string, number uint64) ([]byte, error) { + return nil, errNotSupported +} + +// Ancients satisfies the ethdb.AncientReader interface +// Ancients returns the ancient item numbers in the ancient store +func (d *Database) Ancients() (uint64, error) { + return 0, errNotSupported +} + +// Tail satisfies the ethdb.AncientReader interface. +// Tail returns the number of first stored item in the freezer. +func (d *Database) Tail() (uint64, error) { + return 0, errNotSupported +} + +// AncientSize satisfies the ethdb.AncientReader interface +// AncientSize returns the ancient size of the specified category +func (d *Database) AncientSize(kind string) (uint64, error) { + return 0, errNotSupported +} + +// AncientRange retrieves all the items in a range, starting from the index 'start'. +// It will return +// - at most 'count' items, +// - at least 1 item (even if exceeding the maxBytes), but will otherwise +// return as many items as fit into maxBytes. +func (d *Database) AncientRange(kind string, start, count, maxBytes uint64) ([][]byte, error) { + return nil, errNotSupported +} + +// ReadAncients applies the provided AncientReader function +func (d *Database) ReadAncients(fn func(ethdb.AncientReaderOp) error) (err error) { + return errNotSupported +} + +// TruncateHead satisfies the ethdb.AncientWriter interface. +// TruncateHead discards all but the first n ancient data from the ancient store. +func (d *Database) TruncateHead(n uint64) error { + return errNotSupported +} + +// TruncateTail satisfies the ethdb.AncientWriter interface. +// TruncateTail discards the first n ancient data from the ancient store. +func (d *Database) TruncateTail(n uint64) error { + return errNotSupported +} + +// Sync satisfies the ethdb.AncientWriter interface +// Sync flushes all in-memory ancient store data to disk +func (d *Database) Sync() error { + return errNotSupported +} + +// MigrateTable satisfies the ethdb.AncientWriter interface. +// MigrateTable processes and migrates entries of a given table to a new format. +func (d *Database) MigrateTable(string, func([]byte) ([]byte, error)) error { + return errNotSupported +} + +// NewSnapshot satisfies the ethdb.Snapshotter interface. +// NewSnapshot creates a database snapshot based on the current state. +func (d *Database) NewSnapshot() (ethdb.Snapshot, error) { + return nil, errNotSupported +} + +// AncientDatadir returns an error as we don't have a backing chain freezer. +func (d *Database) AncientDatadir() (string, error) { + return "", errNotSupported +} diff --git a/postgres/v0/database_test.go b/postgres/v0/database_test.go new file mode 100644 index 0000000..8736222 --- /dev/null +++ b/postgres/v0/database_test.go @@ -0,0 +1,131 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb_test + +import ( + "math/big" + "time" + + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + + "github.com/ipfs/go-cid" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/rlp" + "github.com/jmoiron/sqlx" + "github.com/mailgun/groupcache/v2" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v0" +) + +var ( + database ethdb.Database + db *sqlx.DB + err error + testBlockNumber = big.NewInt(1337) + testHeader = types.Header{Number: testBlockNumber} + testValue, _ = rlp.EncodeToBytes(testHeader) + testEthKey = testHeader.Hash().Bytes() + testCID, _ = pgipfsethdb.CIDFromKeccak256(testEthKey, cid.EthBlock) +) + +var _ = Describe("Database", func() { + BeforeEach(func() { + db, err = shared.TestDB() + Expect(err).ToNot(HaveOccurred()) + + cacheConfig := pgipfsethdb.CacheConfig{ + Name: "db", + Size: 3000000, // 3MB + ExpiryDuration: time.Hour, + } + + database = pgipfsethdb.NewDatabase(db, cacheConfig) + + databaseWithBlock, ok := database.(*pgipfsethdb.Database) + Expect(ok).To(BeTrue()) + (*databaseWithBlock).BlockNumber = testBlockNumber + }) + AfterEach(func() { + groupcache.DeregisterGroup("db") + err = shared.ResetTestDB(db) + Expect(err).ToNot(HaveOccurred()) + }) + + Describe("Has", func() { + It("returns false if a key-pair doesn't exist in the db", func() { + has, err := database.Has(testCID.Bytes()) + Expect(err).ToNot(HaveOccurred()) + Expect(has).ToNot(BeTrue()) + }) + It("returns true if a key-pair exists in the db", func() { + _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64()) + Expect(err).ToNot(HaveOccurred()) + has, err := database.Has(testCID.Bytes()) + Expect(err).ToNot(HaveOccurred()) + Expect(has).To(BeTrue()) + }) + }) + + Describe("Get", func() { + It("throws an err if the key-pair doesn't exist in the db", func() { + _, err = database.Get(testCID.Bytes()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + It("returns the value associated with the key, if the pair exists", func() { + _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64()) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testCID.Bytes()) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Put", func() { + It("persists the key-value pair in the database", func() { + _, err = database.Get(testCID.Bytes()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + + err = database.Put(testCID.Bytes(), testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testCID.Bytes()) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + }) + }) + + Describe("Delete", func() { + It("removes the key-value pair from the database", func() { + err = database.Put(testCID.Bytes(), testValue) + Expect(err).ToNot(HaveOccurred()) + val, err := database.Get(testCID.Bytes()) + Expect(err).ToNot(HaveOccurred()) + Expect(val).To(Equal(testValue)) + + err = database.Delete(testCID.Bytes()) + Expect(err).ToNot(HaveOccurred()) + _, err = database.Get(testCID.Bytes()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) + }) + }) +}) diff --git a/postgres/v0/iterator.go b/postgres/v0/iterator.go new file mode 100644 index 0000000..e5c8e64 --- /dev/null +++ b/postgres/v0/iterator.go @@ -0,0 +1,91 @@ +// VulcanizeDB +// Copyright © 2020 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ipfs/go-cid" + "github.com/jmoiron/sqlx" +) + +var _ ethdb.Iterator = &Iterator{} + +// Iterator is the type that satisfies the ethdb.Iterator interface for PG-IPFS Ethereum data using a direct Postgres connection +// Iteratee interface is used in Geth for various tests, trie/sync_bloom.go (for fast sync), +// rawdb.InspectDatabase, and the new core/state/snapshot features. +// This should not be confused with trie.NodeIterator or state.NodeIteraor (which can be constructed +// from the ethdb.KeyValueStoreand ethdb.Database interfaces) +type Iterator struct { + db *sqlx.DB + currentKey, prefix []byte + err error +} + +// NewIterator returns an ethdb.Iterator interface for PG-IPFS +func NewIterator(start, prefix []byte, db *sqlx.DB) ethdb.Iterator { + return &Iterator{ + db: db, + prefix: prefix, + currentKey: start, + } +} + +// Next satisfies the ethdb.Iterator interface +// Next moves the iterator to the next key/value pair +// It returns whether the iterator is exhausted +func (i *Iterator) Next() bool { + // this is complicated by the ipfs db keys not being the keccak256 hashes + // go-ethereum usage of this method expects the iteration to occur over keccak256 keys + panic("implement me: Next") +} + +// Error satisfies the ethdb.Iterator interface +// Error returns any accumulated error +// Exhausting all the key/value pairs is not considered to be an error +func (i *Iterator) Error() error { + return i.err +} + +// Key satisfies the ethdb.Iterator interface +// Key returns the key of the current key/value pair, or nil if done +// The caller should not modify the contents of the returned slice +// and its contents may change on the next call to Next +func (i *Iterator) Key() []byte { + return i.currentKey +} + +// Value satisfies the ethdb.Iterator interface +// Value returns the value of the current key/value pair, or nil if done +// The caller should not modify the contents of the returned slice +// and its contents may change on the next call to Next +func (i *Iterator) Value() []byte { + c, err := cid.Cast(i.currentKey) + if err != nil { + i.err = err + return nil + } + var data []byte + i.err = i.db.Get(&data, getPgStr, c) + return data +} + +// Release satisfies the ethdb.Iterator interface +// Release releases associated resources +// Release should always succeed and can be called multiple times without causing error +func (i *Iterator) Release() { + i.db.Close() +} diff --git a/postgres/postgres_suite_test.go b/postgres/v0/postgres_suite_test.go similarity index 100% rename from postgres/postgres_suite_test.go rename to postgres/v0/postgres_suite_test.go diff --git a/postgres/v0/util.go b/postgres/v0/util.go new file mode 100644 index 0000000..85700bf --- /dev/null +++ b/postgres/v0/util.go @@ -0,0 +1,32 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb + +import ( + "github.com/ipfs/go-cid" + _ "github.com/lib/pq" //postgres driver + "github.com/multiformats/go-multihash" +) + +// CIDFromKeccak256 converts keccak256 hash bytes into a v1 cid +func CIDFromKeccak256(hash []byte, codecType uint64) (cid.Cid, error) { + mh, err := multihash.Encode(hash, multihash.KECCAK_256) + if err != nil { + return cid.Cid{}, err + } + return cid.NewCidV1(codecType, mh), nil +} diff --git a/postgres/batch.go b/postgres/v1/batch.go similarity index 100% rename from postgres/batch.go rename to postgres/v1/batch.go diff --git a/postgres/batch_test.go b/postgres/v1/batch_test.go similarity index 95% rename from postgres/batch_test.go rename to postgres/v1/batch_test.go index ba4ca20..1d559b8 100644 --- a/postgres/batch_test.go +++ b/postgres/v1/batch_test.go @@ -20,6 +20,8 @@ import ( "math/big" "time" + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -27,7 +29,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v1" ) var ( @@ -39,7 +41,7 @@ var ( var _ = Describe("Batch", func() { BeforeEach(func() { - db, err = pgipfsethdb.TestDB() + db, err = shared.TestDB() Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ @@ -58,7 +60,7 @@ var _ = Describe("Batch", func() { }) AfterEach(func() { groupcache.DeregisterGroup("db") - err = pgipfsethdb.ResetTestDB(db) + err = shared.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) diff --git a/postgres/database.go b/postgres/v1/database.go similarity index 100% rename from postgres/database.go rename to postgres/v1/database.go diff --git a/postgres/database_test.go b/postgres/v1/database_test.go similarity index 95% rename from postgres/database_test.go rename to postgres/v1/database_test.go index b929561..5af1d93 100644 --- a/postgres/database_test.go +++ b/postgres/v1/database_test.go @@ -20,6 +20,8 @@ import ( "math/big" "time" + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -28,7 +30,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v1" ) var ( @@ -44,7 +46,7 @@ var ( var _ = Describe("Database", func() { BeforeEach(func() { - db, err = pgipfsethdb.TestDB() + db, err = shared.TestDB() Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ @@ -61,7 +63,7 @@ var _ = Describe("Database", func() { }) AfterEach(func() { groupcache.DeregisterGroup("db") - err = pgipfsethdb.ResetTestDB(db) + err = shared.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) diff --git a/postgres/iterator.go b/postgres/v1/iterator.go similarity index 100% rename from postgres/iterator.go rename to postgres/v1/iterator.go diff --git a/postgres/v1/postgres_suite_test.go b/postgres/v1/postgres_suite_test.go new file mode 100644 index 0000000..fa1f767 --- /dev/null +++ b/postgres/v1/postgres_suite_test.go @@ -0,0 +1,29 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package pgipfsethdb_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestPGIPFSETHDB(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "PG-IPFS ethdb test") +} diff --git a/postgres/util.go b/postgres/v1/util.go similarity index 69% rename from postgres/util.go rename to postgres/v1/util.go index eedd8fd..e65b102 100644 --- a/postgres/util.go +++ b/postgres/v1/util.go @@ -19,7 +19,6 @@ package pgipfsethdb import ( blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" - "github.com/jmoiron/sqlx" _ "github.com/lib/pq" //postgres driver "github.com/multiformats/go-multihash" ) @@ -33,17 +32,3 @@ func MultihashKeyFromKeccak256(h []byte) (string, error) { dbKey := dshelp.MultihashToDsKey(mh) return blockstore.BlockPrefix.String() + dbKey.String(), nil } - -// TestDB connect to the testing database -// it assumes the database has the IPFS public.blocks table present -// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table -func TestDB() (*sqlx.DB, error) { - connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" - return sqlx.Connect("postgres", connectStr) -} - -// ResetTestDB drops all rows in the test db public.blocks table -func ResetTestDB(db *sqlx.DB) error { - _, err := db.Exec("TRUNCATE public.blocks CASCADE") - return err -}