forked from cerc-io/ipfs-ethdb
batch.Replay() method
This commit is contained in:
parent
400f64e7f9
commit
07a7a534da
@ -17,22 +17,25 @@
|
|||||||
package pgipfsethdb
|
package pgipfsethdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
|
// Batch is the type that satisfies the ethdb.Batch interface for PG-IPFS Ethereum data using a direct Postgres connection
|
||||||
type Batch struct {
|
type Batch struct {
|
||||||
db *sqlx.DB
|
db *sqlx.DB
|
||||||
tx *sqlx.Tx
|
tx *sqlx.Tx
|
||||||
valueSize int
|
valueSize int
|
||||||
|
replayCache map[string][]byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
// NewBatch returns a ethdb.Batch interface for PG-IPFS
|
||||||
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
|
func NewBatch(db *sqlx.DB, tx *sqlx.Tx) ethdb.Batch {
|
||||||
b := &Batch{
|
b := &Batch{
|
||||||
db: db,
|
db: db,
|
||||||
tx: tx,
|
tx: tx,
|
||||||
|
replayCache: make(map[string][]byte),
|
||||||
}
|
}
|
||||||
if tx == nil {
|
if tx == nil {
|
||||||
b.Reset()
|
b.Reset()
|
||||||
@ -55,6 +58,7 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
b.valueSize += len(value)
|
b.valueSize += len(value)
|
||||||
|
b.replayCache[common.Bytes2Hex(key)] = value
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,7 +66,11 @@ func (b *Batch) Put(key []byte, value []byte) (err error) {
|
|||||||
// Delete removes the key from the key-value data store
|
// Delete removes the key from the key-value data store
|
||||||
func (b *Batch) Delete(key []byte) (err error) {
|
func (b *Batch) Delete(key []byte) (err error) {
|
||||||
_, err = b.tx.Exec(deletePgStr, key)
|
_, err = b.tx.Exec(deletePgStr, key)
|
||||||
return err
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
delete(b.replayCache, common.Bytes2Hex(key))
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ValueSize satisfies the ethdb.Batch interface
|
// ValueSize satisfies the ethdb.Batch interface
|
||||||
@ -79,13 +87,27 @@ func (b *Batch) Write() error {
|
|||||||
if b.tx == nil {
|
if b.tx == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return b.tx.Commit()
|
if err := b.tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.replayCache = nil
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Replay satisfies the ethdb.Batch interface
|
// Replay satisfies the ethdb.Batch interface
|
||||||
// Replay replays the batch contents
|
// Replay replays the batch contents
|
||||||
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
|
func (b *Batch) Replay(w ethdb.KeyValueWriter) error {
|
||||||
return errNotSupported
|
if b.tx != nil {
|
||||||
|
b.tx.Rollback()
|
||||||
|
b.tx = nil
|
||||||
|
}
|
||||||
|
for key, value := range b.replayCache {
|
||||||
|
if err := w.Put(common.Hex2Bytes(key), value); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.replayCache = nil
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset satisfies the ethdb.Batch interface
|
// Reset satisfies the ethdb.Batch interface
|
||||||
@ -97,5 +119,6 @@ func (b *Batch) Reset() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
b.replayCache = make(map[string][]byte)
|
||||||
b.valueSize = 0
|
b.valueSize = 0
|
||||||
}
|
}
|
||||||
|
@ -115,4 +115,30 @@ var _ = Describe("Batch", func() {
|
|||||||
Expect(size).To(Equal(0))
|
Expect(size).To(Equal(0))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
Describe("Replay", func() {
|
||||||
|
It("returns the size of data in the batch queued for write", func() {
|
||||||
|
err = batch.Put(testKeccakEthKey, testValue)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
err = batch.Put(testKeccakEthKey2, testValue2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
_, err = database.Get(testKeccakEthKey)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
_, err = database.Get(testKeccakEthKey2)
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
Expect(err.Error()).To(ContainSubstring("sql: no rows in result set"))
|
||||||
|
|
||||||
|
err = batch.Replay(database)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
|
||||||
|
val, err := database.Get(testKeccakEthKey)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val).To(Equal(testValue))
|
||||||
|
val2, err := database.Get(testKeccakEthKey2)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(val2).To(Equal(testValue2))
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user