2020-05-30 17:09:43 +00:00
// VulcanizeDB
// Copyright © 2020 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
2020-05-30 17:20:00 +00:00
package ipfsethdb
2020-05-30 17:09:43 +00:00
import (
2020-06-29 21:11:29 +00:00
"errors"
2020-07-09 19:49:01 +00:00
"github.com/ethereum/go-ethereum/common"
2020-05-30 17:09:43 +00:00
"github.com/ethereum/go-ethereum/ethdb"
2020-06-29 21:11:29 +00:00
"github.com/hashicorp/golang-lru"
"github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
)
var (
EvictionWarningErr = errors . New ( "warn: batch has exceeded capacity, data has been evicted" )
2020-05-30 17:09:43 +00:00
)
2020-06-29 21:11:29 +00:00
// Batch is the type that satisfies the ethdb.Batch interface for IPFS Ethereum data using the ipfs blockservice interface
// This is ipfs-backing-datastore agnostic but must operate through a configured ipfs node (and so is subject to lockfile contention with e.g. an ipfs daemon)
// If blockservice block exchange is configured the blockservice can fetch data that are missing locally from IPFS peers
2020-05-30 17:09:43 +00:00
type Batch struct {
2020-06-29 21:11:29 +00:00
blockService blockservice . BlockService
putCache , deleteCache * lru . Cache
valueSize int
2020-05-30 17:09:43 +00:00
}
2020-06-29 21:11:29 +00:00
// NewBatch returns a ethdb.Batch interface for IPFS
func NewBatch ( bs blockservice . BlockService , capacity int ) ( ethdb . Batch , error ) {
putCache , err := lru . New ( capacity )
if err != nil {
return nil , err
}
deleteCache , err := lru . New ( capacity )
if err != nil {
return nil , err
2020-05-30 17:09:43 +00:00
}
2020-06-29 21:11:29 +00:00
return & Batch {
blockService : bs ,
putCache : putCache ,
deleteCache : deleteCache ,
} , nil
2020-05-30 17:09:43 +00:00
}
// Put satisfies the ethdb.Batch interface
// Put inserts the given value into the key-value data store
2020-06-29 21:11:29 +00:00
// Key is expected to be the keccak256 hash of value
// Returns an error when batch capacity has been exceeded and data was evicted
// It is up to ensure they do not exceed capacity
// The alternative is to check the cache len vs its capacity before inserting
// but this adds additional overhead to every Put/Delete
2020-05-30 17:09:43 +00:00
func ( b * Batch ) Put ( key [ ] byte , value [ ] byte ) ( err error ) {
2020-06-29 21:11:29 +00:00
b . valueSize += len ( value )
2020-07-09 19:49:01 +00:00
strKey := common . Bytes2Hex ( key )
if b . putCache . Add ( strKey , value ) {
2020-06-29 21:11:29 +00:00
return EvictionWarningErr
2020-05-30 17:09:43 +00:00
}
return nil
}
// Delete satisfies the ethdb.Batch interface
// Delete removes the key from the key-value data store
func ( b * Batch ) Delete ( key [ ] byte ) ( err error ) {
2020-07-09 19:49:01 +00:00
strKey := common . Bytes2Hex ( key )
if b . deleteCache . Add ( strKey , true ) {
2020-06-29 21:11:29 +00:00
return EvictionWarningErr
2020-05-30 17:09:43 +00:00
}
2020-06-29 21:11:29 +00:00
return nil
2020-05-30 17:09:43 +00:00
}
// ValueSize satisfies the ethdb.Batch interface
// ValueSize retrieves the amount of data queued up for writing
// The returned value is the total byte length of all data queued to write
func ( b * Batch ) ValueSize ( ) int {
2020-06-29 21:11:29 +00:00
return b . valueSize
2020-05-30 17:09:43 +00:00
}
// Write satisfies the ethdb.Batch interface
// Write flushes any accumulated data to disk
func ( b * Batch ) Write ( ) error {
2020-06-29 21:11:29 +00:00
puts := make ( [ ] blocks . Block , b . putCache . Len ( ) )
for i , key := range b . putCache . Keys ( ) {
val , _ := b . putCache . Get ( key ) // don't need to check "ok"s, the key is known and val is always []byte
2020-07-09 19:49:01 +00:00
b , err := NewBlock ( common . Hex2Bytes ( key . ( string ) ) , val . ( [ ] byte ) )
2020-06-29 21:11:29 +00:00
if err != nil {
return err
}
puts [ i ] = b
}
if err := b . blockService . AddBlocks ( puts ) ; err != nil {
return err
}
for _ , key := range b . deleteCache . Keys ( ) {
2020-07-09 19:49:01 +00:00
// we are using state codec because we don't know the codec and at this level the codec doesn't matter, the datastore key is multihash-only derived
2020-07-12 23:16:42 +00:00
c , err := Keccak256ToCid ( common . Hex2Bytes ( key . ( string ) ) , stateTrieCodec )
2020-06-29 21:11:29 +00:00
if err != nil {
return err
}
if err := b . blockService . DeleteBlock ( c ) ; err != nil {
return err
}
2020-05-30 17:09:43 +00:00
}
2020-06-29 21:11:29 +00:00
return nil
2020-05-30 17:09:43 +00:00
}
// Replay satisfies the ethdb.Batch interface
// Replay replays the batch contents
func ( b * Batch ) Replay ( w ethdb . KeyValueWriter ) error {
2020-06-29 21:11:29 +00:00
for _ , key := range b . putCache . Keys ( ) {
val , _ := b . putCache . Get ( key )
if err := w . Put ( key . ( [ ] byte ) , val . ( [ ] byte ) ) ; err != nil {
return err
}
}
for _ , key := range b . deleteCache . Keys ( ) {
if err := w . Delete ( key . ( [ ] byte ) ) ; err != nil {
return err
}
}
return nil
2020-05-30 17:09:43 +00:00
}
// Reset satisfies the ethdb.Batch interface
// Reset resets the batch for reuse
2020-06-03 20:42:11 +00:00
// This should be called after every write
2020-05-30 17:09:43 +00:00
func ( b * Batch ) Reset ( ) {
2020-06-29 21:11:29 +00:00
b . deleteCache . Purge ( )
b . putCache . Purge ( )
b . valueSize = 0
2020-05-30 17:10:48 +00:00
}