From d2176e69af5ca174f19768d378c3760b12d0eae6 Mon Sep 17 00:00:00 2001 From: i-norden Date: Mon, 20 Feb 2023 12:26:40 -0600 Subject: [PATCH] update postgres v0 subpkg and tests to work with cid keys --- postgres/v0/batch.go | 23 ++++++++++++++------- postgres/v0/batch_test.go | 39 ++++++++++++++++++++---------------- postgres/v0/database.go | 37 ++++++++++++++++++---------------- postgres/v0/database_test.go | 38 +++++++++++++++++++---------------- postgres/v0/iterator.go | 5 +++-- postgres/v0/util.go | 29 ++++++--------------------- 6 files changed, 88 insertions(+), 83 deletions(-) diff --git a/postgres/v0/batch.go b/postgres/v0/batch.go index 0099567..0a83747 100644 --- a/postgres/v0/batch.go +++ b/postgres/v0/batch.go @@ -19,6 +19,8 @@ package pgipfsethdb import ( "math/big" + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" ) @@ -49,13 +51,20 @@ func NewBatch(db *sqlx.DB, tx *sqlx.Tx, blockNumber *big.Int) ethdb.Batch { // Put satisfies the ethdb.Batch interface // Put inserts the given value into the key-value data store -// Key is expected to be the keccak256 hash of value -func (b *Batch) Put(key []byte, value []byte) (err error) { - mhKey, err := MultihashKeyFromKeccak256(key) +// Key is expected to be a fully formulated cid key +// TODO: note, now that we expected a cid we could route to the "cids" tables based on prefix instead of to public.blocks +// but is it better to handle this routing here, or use a completely different interface since we already have to refactor +// at levels above this package in order to pass in cids instead of raw keccak256 hashes +func (b *Batch) Put(cidBytes []byte, value []byte) (err error) { + // cast and resolve strings from cid.Cast + // this will assert that we have a correctly formatted CID + // and will handle the different string encodings for v0 and v1 CIDs + // (note that this v0 vs v1 is different from the blockstore v0 vs v1) + c, err := cid.Cast(cidBytes) if err != nil { return err } - if _, err = b.tx.Exec(putPgStr, mhKey, value, b.blockNumber.Uint64()); err != nil { + if _, err = b.tx.Exec(putPgStr, c.String(), value, b.blockNumber.Uint64()); err != nil { return err } b.valueSize += len(value) @@ -64,12 +73,12 @@ func (b *Batch) Put(key []byte, value []byte) (err error) { // Delete satisfies the ethdb.Batch interface // Delete removes the key from the key-value data store -func (b *Batch) Delete(key []byte) (err error) { - mhKey, err := MultihashKeyFromKeccak256(key) +func (b *Batch) Delete(cidBytes []byte) (err error) { + c, err := cid.Cast(cidBytes) if err != nil { return err } - _, err = b.tx.Exec(deletePgStr, mhKey) + _, err = b.tx.Exec(deletePgStr, c.String()) return err } diff --git a/postgres/v0/batch_test.go b/postgres/v0/batch_test.go index ba4ca20..f8e496e 100644 --- a/postgres/v0/batch_test.go +++ b/postgres/v0/batch_test.go @@ -20,6 +20,10 @@ import ( "math/big" "time" + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -27,7 +31,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v0" ) var ( @@ -35,11 +39,12 @@ var ( testHeader2 = types.Header{Number: big.NewInt(2)} testValue2, _ = rlp.EncodeToBytes(testHeader2) testEthKey2 = testHeader2.Hash().Bytes() + testCID2, _ = pgipfsethdb.CIDFromKeccak256(testEthKey2, cid.EthBlock) ) var _ = Describe("Batch", func() { BeforeEach(func() { - db, err = pgipfsethdb.TestDB() + db, err = shared.TestDB() Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ @@ -58,30 +63,30 @@ var _ = Describe("Batch", func() { }) AfterEach(func() { groupcache.DeregisterGroup("db") - err = pgipfsethdb.ResetTestDB(db) + err = shared.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) Describe("Put/Write", func() { It("adds the key-value pair to the batch", func() { - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) - _, err = database.Get(testEthKey2) + _, err = database.Get(testCID2.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) - err = batch.Put(testEthKey, testValue) + err = batch.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - err = batch.Put(testEthKey2, testValue2) + err = batch.Put(testCID2.Bytes(), testValue2) Expect(err).ToNot(HaveOccurred()) err = batch.Write() Expect(err).ToNot(HaveOccurred()) - val, err := database.Get(testEthKey) + val, err := database.Get(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) - val2, err := database.Get(testEthKey2) + val2, err := database.Get(testCID2.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val2).To(Equal(testValue2)) }) @@ -89,25 +94,25 @@ var _ = Describe("Batch", func() { Describe("Delete/Reset/Write", func() { It("deletes the key-value pair in the batch", func() { - err = batch.Put(testEthKey, testValue) + err = batch.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - err = batch.Put(testEthKey2, testValue2) + err = batch.Put(testCID2.Bytes(), testValue2) Expect(err).ToNot(HaveOccurred()) err = batch.Write() Expect(err).ToNot(HaveOccurred()) batch.Reset() - err = batch.Delete(testEthKey) + err = batch.Delete(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) - err = batch.Delete(testEthKey2) + err = batch.Delete(testCID2.Bytes()) Expect(err).ToNot(HaveOccurred()) err = batch.Write() Expect(err).ToNot(HaveOccurred()) - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) - _, err = database.Get(testEthKey2) + _, err = database.Get(testCID2.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) }) @@ -115,9 +120,9 @@ var _ = Describe("Batch", func() { Describe("ValueSize/Reset", func() { It("returns the size of data in the batch queued for write", func() { - err = batch.Put(testEthKey, testValue) + err = batch.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - err = batch.Put(testEthKey2, testValue2) + err = batch.Put(testCID2.Bytes(), testValue2) Expect(err).ToNot(HaveOccurred()) err = batch.Write() Expect(err).ToNot(HaveOccurred()) diff --git a/postgres/v0/database.go b/postgres/v0/database.go index c21da02..4fc59a3 100644 --- a/postgres/v0/database.go +++ b/postgres/v0/database.go @@ -26,6 +26,8 @@ import ( "strings" "time" + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/ethdb" "github.com/jmoiron/sqlx" "github.com/mailgun/groupcache/v2" @@ -102,14 +104,14 @@ func (d *Database) GetCacheStats() groupcache.Stats { } // Has satisfies the ethdb.KeyValueReader interface -// Has retrieves if a key is present in the key-value data store -func (d *Database) Has(key []byte) (bool, error) { - mhKey, err := MultihashKeyFromKeccak256(key) +// Has retrieves if a cid is present in the key-value data store +func (d *Database) Has(cidBytes []byte) (bool, error) { + c, err := cid.Cast(cidBytes) if err != nil { return false, err } var exists bool - return exists, d.db.Get(&exists, hasPgStr, mhKey) + return exists, d.db.Get(&exists, hasPgStr, c.String()) } // Get retrieves the given key if it's present in the key-value data store @@ -124,9 +126,9 @@ func (d *Database) dbGet(key string) ([]byte, error) { } // Get satisfies the ethdb.KeyValueReader interface -// Get retrieves the given key if it's present in the key-value data store -func (d *Database) Get(key []byte) ([]byte, error) { - mhKey, err := MultihashKeyFromKeccak256(key) +// Get retrieves the given cid if it's present in the key-value data store +func (d *Database) Get(cidBytes []byte) ([]byte, error) { + c, err := cid.Cast(cidBytes) if err != nil { return nil, err } @@ -135,30 +137,31 @@ func (d *Database) Get(key []byte) ([]byte, error) { defer cancel() var data []byte - return data, d.cache.Get(ctx, mhKey, groupcache.AllocatingByteSliceSink(&data)) + return data, d.cache.Get(ctx, c.String(), groupcache.AllocatingByteSliceSink(&data)) } // Put satisfies the ethdb.KeyValueWriter interface // Put inserts the given value into the key-value data store -// Key is expected to be the keccak256 hash of value -func (d *Database) Put(key []byte, value []byte) error { - mhKey, err := MultihashKeyFromKeccak256(key) +// Key is expected to be a fully formulated cis of value +func (d *Database) Put(cidBytes []byte, value []byte) error { + c, err := cid.Cast(cidBytes) if err != nil { return err } - _, err = d.db.Exec(putPgStr, mhKey, value, d.BlockNumber.Uint64()) + _, err = d.db.Exec(putPgStr, c.String(), value, d.BlockNumber.Uint64()) return err } // Delete satisfies the ethdb.KeyValueWriter interface -// Delete removes the key from the key-value data store -func (d *Database) Delete(key []byte) error { - mhKey, err := MultihashKeyFromKeccak256(key) +// Delete removes the cid from the key-value data store +func (d *Database) Delete(cidBytes []byte) error { + c, err := cid.Cast(cidBytes) if err != nil { return err } + cidString := c.String() - _, err = d.db.Exec(deletePgStr, mhKey) + _, err = d.db.Exec(deletePgStr, cidString) if err != nil { return err } @@ -166,7 +169,7 @@ func (d *Database) Delete(key []byte) error { // Remove from cache. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*500) defer cancel() - err = d.cache.Remove(ctx, mhKey) + err = d.cache.Remove(ctx, cidString) return err } diff --git a/postgres/v0/database_test.go b/postgres/v0/database_test.go index b929561..8736222 100644 --- a/postgres/v0/database_test.go +++ b/postgres/v0/database_test.go @@ -20,6 +20,10 @@ import ( "math/big" "time" + "github.com/cerc-io/ipfs-ethdb/v4/postgres/shared" + + "github.com/ipfs/go-cid" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" @@ -28,7 +32,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres" + pgipfsethdb "github.com/cerc-io/ipfs-ethdb/v4/postgres/v0" ) var ( @@ -39,12 +43,12 @@ var ( testHeader = types.Header{Number: testBlockNumber} testValue, _ = rlp.EncodeToBytes(testHeader) testEthKey = testHeader.Hash().Bytes() - testMhKey, _ = pgipfsethdb.MultihashKeyFromKeccak256(testEthKey) + testCID, _ = pgipfsethdb.CIDFromKeccak256(testEthKey, cid.EthBlock) ) var _ = Describe("Database", func() { BeforeEach(func() { - db, err = pgipfsethdb.TestDB() + db, err = shared.TestDB() Expect(err).ToNot(HaveOccurred()) cacheConfig := pgipfsethdb.CacheConfig{ @@ -61,20 +65,20 @@ var _ = Describe("Database", func() { }) AfterEach(func() { groupcache.DeregisterGroup("db") - err = pgipfsethdb.ResetTestDB(db) + err = shared.ResetTestDB(db) Expect(err).ToNot(HaveOccurred()) }) Describe("Has", func() { It("returns false if a key-pair doesn't exist in the db", func() { - has, err := database.Has(testEthKey) + has, err := database.Has(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(has).ToNot(BeTrue()) }) It("returns true if a key-pair exists in the db", func() { - _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64()) + _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64()) Expect(err).ToNot(HaveOccurred()) - has, err := database.Has(testEthKey) + has, err := database.Has(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(has).To(BeTrue()) }) @@ -82,14 +86,14 @@ var _ = Describe("Database", func() { Describe("Get", func() { It("throws an err if the key-pair doesn't exist in the db", func() { - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) }) It("returns the value associated with the key, if the pair exists", func() { - _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testMhKey, testValue, testBlockNumber.Uint64()) + _, err = db.Exec("INSERT into public.blocks (key, data, block_number) VALUES ($1, $2, $3)", testCID.String(), testValue, testBlockNumber.Uint64()) Expect(err).ToNot(HaveOccurred()) - val, err := database.Get(testEthKey) + val, err := database.Get(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) }) @@ -97,13 +101,13 @@ var _ = Describe("Database", func() { Describe("Put", func() { It("persists the key-value pair in the database", func() { - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) - err = database.Put(testEthKey, testValue) + err = database.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - val, err := database.Get(testEthKey) + val, err := database.Get(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) }) @@ -111,15 +115,15 @@ var _ = Describe("Database", func() { Describe("Delete", func() { It("removes the key-value pair from the database", func() { - err = database.Put(testEthKey, testValue) + err = database.Put(testCID.Bytes(), testValue) Expect(err).ToNot(HaveOccurred()) - val, err := database.Get(testEthKey) + val, err := database.Get(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) Expect(val).To(Equal(testValue)) - err = database.Delete(testEthKey) + err = database.Delete(testCID.Bytes()) Expect(err).ToNot(HaveOccurred()) - _, err = database.Get(testEthKey) + _, err = database.Get(testCID.Bytes()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("sql: no rows in result set")) }) diff --git a/postgres/v0/iterator.go b/postgres/v0/iterator.go index 0d3a0a4..e5c8e64 100644 --- a/postgres/v0/iterator.go +++ b/postgres/v0/iterator.go @@ -18,6 +18,7 @@ package pgipfsethdb import ( "github.com/ethereum/go-ethereum/ethdb" + "github.com/ipfs/go-cid" "github.com/jmoiron/sqlx" ) @@ -72,13 +73,13 @@ func (i *Iterator) Key() []byte { // The caller should not modify the contents of the returned slice // and its contents may change on the next call to Next func (i *Iterator) Value() []byte { - mhKey, err := MultihashKeyFromKeccak256(i.currentKey) + c, err := cid.Cast(i.currentKey) if err != nil { i.err = err return nil } var data []byte - i.err = i.db.Get(&data, getPgStr, mhKey) + i.err = i.db.Get(&data, getPgStr, c) return data } diff --git a/postgres/v0/util.go b/postgres/v0/util.go index eedd8fd..85700bf 100644 --- a/postgres/v0/util.go +++ b/postgres/v0/util.go @@ -17,33 +17,16 @@ package pgipfsethdb import ( - blockstore "github.com/ipfs/go-ipfs-blockstore" - dshelp "github.com/ipfs/go-ipfs-ds-help" - "github.com/jmoiron/sqlx" + "github.com/ipfs/go-cid" _ "github.com/lib/pq" //postgres driver "github.com/multiformats/go-multihash" ) -// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string -func MultihashKeyFromKeccak256(h []byte) (string, error) { - mh, err := multihash.Encode(h, multihash.KECCAK_256) +// CIDFromKeccak256 converts keccak256 hash bytes into a v1 cid +func CIDFromKeccak256(hash []byte, codecType uint64) (cid.Cid, error) { + mh, err := multihash.Encode(hash, multihash.KECCAK_256) if err != nil { - return "", err + return cid.Cid{}, err } - dbKey := dshelp.MultihashToDsKey(mh) - return blockstore.BlockPrefix.String() + dbKey.String(), nil -} - -// TestDB connect to the testing database -// it assumes the database has the IPFS public.blocks table present -// DO NOT use a production db for the test db, as it will remove all contents of the public.blocks table -func TestDB() (*sqlx.DB, error) { - connectStr := "postgresql://localhost:5432/vulcanize_testing?sslmode=disable" - return sqlx.Connect("postgres", connectStr) -} - -// ResetTestDB drops all rows in the test db public.blocks table -func ResetTestDB(db *sqlx.DB) error { - _, err := db.Exec("TRUNCATE public.blocks CASCADE") - return err + return cid.NewCidV1(codecType, mh), nil }