Limiting Geth to one transaction per block (#3)

* Limiting Geth to one transaction per block
* Adding TransitionBatchBuilder to build & submit rollup blocks
This commit is contained in:
Will Meister 2020-05-14 14:45:42 -05:00 committed by GitHub
parent aae5230baf
commit e14cfa5576
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 975 additions and 9 deletions

1
.gitignore vendored
View File

@ -34,6 +34,7 @@ profile.cov
# IdeaIDE
.idea
*.iml
# VS Code
.vscode

View File

@ -13,4 +13,10 @@ RUN apk add --no-cache ca-certificates
COPY --from=builder /go-ethereum/build/bin/geth /usr/local/bin/
EXPOSE 8545 8546 8547 30303 30303/udp
ENTRYPOINT ["geth"]
# ENTRYPOINT ["geth"]
COPY docker/entrypoint.sh /bin
RUN chmod +x /bin/entrypoint.sh
EXPOSE 9545
ENTRYPOINT ["sh", "/bin/entrypoint.sh"]

21
docker/entrypoint.sh Normal file
View File

@ -0,0 +1,21 @@
#!/bin/sh
## Passed in from environment variables:
# HOSTNAME=
# PORT=8545
# NETWORK_ID=108
CLEAR_DATA_FILE_PATH="${VOLUME_PATH}/.clear_data_key_${CLEAR_DATA_KEY}"
TARGET_GAS_LIMIT=${TARGET_GAS_LIMIT:-4294967295}
if [[ -n "$CLEAR_DATA_KEY" && ! -f "$CLEAR_DATA_FILE_PATH" ]]; then
echo "Detected change in CLEAR_DATA_KEY. Purging data."
rm -rf ${VOLUME_PATH}/*
rm -rf ${VOLUME_PATH}/.clear_data_key_*
echo "Local data cleared from '${VOLUME_PATH}/*'"
echo "Contents of volume dir: $(ls -alh $VOLUME_PATH)"
touch $CLEAR_DATA_FILE_PATH
fi
echo "Starting Geth..."
## Command to kick off geth
geth --dev --datadir $VOLUME_PATH --rpc --rpcaddr $HOSTNAME --rpcvhosts=* --rpcport $PORT --networkid $NETWORK_ID --rpcapi 'eth,net' --gasprice '0' --targetgaslimit $TARGET_GAS_LIMIT --nousb --gcmode=archive --verbosity "6"

View File

@ -20,10 +20,12 @@ package eth
import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/rollup"
"math/big"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@ -207,7 +209,13 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
if checkpoint == nil {
checkpoint = params.TrustedCheckpoints[genesisHash]
}
if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
blockSubmitter := rollup.NewBlockSubmitter()
rollupBlockBuilder, e := rollup.NewTransitionBatchBuilder(chainDb, eth.blockchain, blockSubmitter, 5 * time.Minute, 100_000_000_000, 200)
if e != nil {
return nil, e
}
if eth.protocolManager, err = NewProtocolManager(chainConfig, checkpoint, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb, cacheLimit, config.Whitelist, rollupBlockBuilder); err != nil {
return nil, err
}
eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock)

View File

@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/rollup"
"math"
"math/big"
"sync"
@ -77,6 +78,8 @@ type ProtocolManager struct {
blockchain *core.BlockChain
maxPeers int
rollupBlockBuilder *rollup.TransitionBatchBuilder
downloader *downloader.Downloader
fetcher *fetcher.Fetcher
peers *peerSet
@ -101,7 +104,7 @@ type ProtocolManager struct {
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the Ethereum network.
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash, rollupBuilder *rollup.TransitionBatchBuilder) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
networkID: networkID,
@ -115,6 +118,7 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
rollupBlockBuilder: rollupBuilder,
}
if mode == downloader.FullSync {
// The database seems empty as the current block is the genesis. Yet the fast
@ -267,6 +271,8 @@ func (pm *ProtocolManager) Stop() {
pm.txsSub.Unsubscribe() // quits txBroadcastLoop
pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
pm.rollupBlockBuilder.Stop()
// Quit the sync loop.
// After this send has completed, no new peers will be accepted.
pm.noMorePeers <- struct{}{}
@ -815,6 +821,7 @@ func (pm *ProtocolManager) minedBroadcastLoop() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
pm.rollupBlockBuilder.NewBlock(ev.Block)
}
}
}

View File

@ -18,6 +18,7 @@ package eth
import (
"fmt"
"github.com/ethereum/go-ethereum/rollup"
"math"
"math/big"
"math/rand"
@ -495,7 +496,12 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, 1, nil)
blockSubmitter := rollup.NewBlockSubmitter()
rollupBlockBuilder, err := rollup.NewTransitionBatchBuilder(db, blockchain, blockSubmitter, 5 * time.Minute, 9_000_000_000, 200)
if err != nil {
t.Fatalf("failed to create Rollup Block Builder: %v", err)
}
pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, 1, nil, rollupBlockBuilder)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
@ -582,7 +588,12 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, 1, nil)
blockSubmitter := rollup.NewBlockSubmitter()
rollupBlockBuilder, err := rollup.NewTransitionBatchBuilder(db, blockchain, blockSubmitter, 5 * time.Minute, 9_000_000_000, 200)
if err != nil {
t.Fatalf("failed to create Rollup Block Builder: %v", err)
}
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, 1, nil, rollupBlockBuilder)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
@ -650,7 +661,12 @@ func TestBroadcastMalformedBlock(t *testing.T) {
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), engine, blockchain, db, 1, nil)
blockSubmitter := rollup.NewBlockSubmitter()
rollupBlockBuilder, err := rollup.NewTransitionBatchBuilder(db, blockchain, blockSubmitter, 5 * time.Minute, 9_000_000_000, 200)
if err != nil {
t.Fatalf("failed to create Rollup Block Builder: %v", err)
}
pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), engine, blockchain, db, 1, nil, rollupBlockBuilder)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}

View File

@ -23,10 +23,12 @@ import (
"crypto/ecdsa"
"crypto/rand"
"fmt"
"github.com/ethereum/go-ethereum/rollup"
"math/big"
"sort"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
@ -68,7 +70,12 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}
pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, 1, nil)
blockSubmitter := rollup.NewBlockSubmitter()
rollupBlockBuilder, err := rollup.NewTransitionBatchBuilder(db, blockchain, blockSubmitter, 5 * time.Minute, 9_000_000_000, 200)
if err != nil {
panic(fmt.Errorf("failed to create Rollup Block Builder: %v", err)
}
pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, 1, nil, rollupBlockBuilder)
if err != nil {
return nil, nil, err
}

View File

@ -18,6 +18,7 @@ package eth
import (
"fmt"
"github.com/ethereum/go-ethereum/rollup"
"math/big"
"sync"
"testing"
@ -180,8 +181,14 @@ func TestForkIDSplit(t *testing.T) {
blocksNoFork, _ = core.GenerateChain(configNoFork, genesisNoFork, engine, dbNoFork, 2, nil)
blocksProFork, _ = core.GenerateChain(configProFork, genesisProFork, engine, dbProFork, 2, nil)
ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainNoFork, dbNoFork, 1, nil)
ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainProFork, dbProFork, 1, nil)
blockSubmitterNoFork = rollup.NewBlockSubmitter()
blockSubmitterProFork = rollup.NewBlockSubmitter()
rollupBlockBuilderNoFork, _ = rollup.NewTransitionBatchBuilder(dbNoFork, chainNoFork, blockSubmitterNoFork, 5 * time.Minute, 9_000_000_000, 200)
rollupBlockBuilderProFork, _ = rollup.NewTransitionBatchBuilder(dbProFork, chainProFork, blockSubmitterProFork, 5 * time.Minute, 9_000_000_000, 200)
ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainNoFork, dbNoFork, 1, nil, rollupBlockBuilderNoFork)
ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainProFork, dbProFork, 1, nil, rollupBlockBuilderProFork)
)
ethNoFork.Start(1000)
ethProFork.Start(1000)

View File

@ -696,6 +696,10 @@ func (w *worker) updateSnapshot() {
}
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
// Make sure there's only one tx per block
if w.current != nil && len(w.current.txs) > 0 {
return nil, core.ErrGasLimitReached
}
snap := w.current.state.Snapshot()
receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())

View File

@ -0,0 +1,342 @@
package rollup
import (
"encoding/binary"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"sync"
"time"
)
var (
logger = log.New(TransitionBatchBuilder{})
ErrTransactionLimitReached = errors.New("transaction limit reached")
ErrMoreThanOneTxInBlock = errors.New("block contains more than one transaction")
LastProcessedDBKey = []byte("lastProcessedRollupBlock")
)
type ActiveBatch struct {
firstBlockNumber uint64
lastBlockNumber uint64
gasUsed uint64
transitionBatch *TransitionBatch
}
func newActiveBatch(defaultTxCapacity int) *ActiveBatch {
return &ActiveBatch{
firstBlockNumber: 0,
lastBlockNumber: 0,
gasUsed: TransitionBatchGasBuffer,
transitionBatch: NewTransitionBatch(defaultTxCapacity),
}
}
// addBlock adds a Geth Block to the ActiveBatch in question, only if it fits.
// Cases in which it would not fit are if it would put the block above the configured
// max number of transactions or max block gas, resulting in
// ErrTransactionLimitReached and core.ErrGasLimitReached, respectively.
func (b *ActiveBatch) addBlock(block *types.Block, maxBlockGas uint64, maxBlockTransactions int) error {
if maxBlockTransactions < len(b.transitionBatch.transitions)+1 {
return ErrTransactionLimitReached
}
blockGasCost := GetBlockRollupGasUsage(block)
if maxBlockGas < b.gasUsed+blockGasCost {
return core.ErrGasLimitReached
}
b.transitionBatch.addBlock(block)
b.gasUsed += blockGasCost
if b.firstBlockNumber == 0 {
b.firstBlockNumber = block.NumberU64()
}
b.lastBlockNumber = block.NumberU64()
return nil
}
type TransitionBatchBuilder struct {
db ethdb.Database
blockProvider BlockStore
rollupBatchSubmitter RollupTransitionBatchSubmitter
pendingMu sync.RWMutex
newBlockCh chan *types.Block
maxTransitionBatchTime time.Duration
maxTransitionBatchGas uint64
maxTransitionBatchTransactions int
lastProcessedBlockNumber uint64
activeBatch *ActiveBatch
}
func NewTransitionBatchBuilder(db ethdb.Database, blockStore interface{}, rollupBlockSubmitter interface{}, maxBlockTime time.Duration, maxBlockGas uint64, maxBlockTransactions int) (*TransitionBatchBuilder, error) {
lastBlock, err := fetchLastProcessedBlockNumber(db)
if err != nil {
return nil, err
}
builder := &TransitionBatchBuilder{
db: db,
blockProvider: blockStore.(BlockStore),
rollupBatchSubmitter: rollupBlockSubmitter.(RollupTransitionBatchSubmitter),
newBlockCh: make(chan *types.Block, 10_000),
maxTransitionBatchTime: maxBlockTime,
maxTransitionBatchGas: maxBlockGas,
maxTransitionBatchTransactions: maxBlockTransactions,
lastProcessedBlockNumber: lastBlock,
activeBatch: newActiveBatch(maxBlockTransactions),
}
go builder.buildLoop(maxBlockTime)
return builder, nil
}
// NewBlock handles new blocks from Geth by adding them to the newBlockCh channel
// for processing and returning so as to not delay the caller.
func (b *TransitionBatchBuilder) NewBlock(block *types.Block) {
b.newBlockCh <- block
}
// Stop handles graceful shutdown of the TransitionBatchBuilder.
func (b *TransitionBatchBuilder) Stop() {
close(b.newBlockCh)
}
// buildLoop initiates TransitionBatch production and submission either based on
// a new Geth Block being received or the maxBlockTime being reached.
func (b *TransitionBatchBuilder) buildLoop(maxBlockTime time.Duration) {
lastProcessed := b.lastProcessedBlockNumber
if err := b.sync(); err != nil {
panic(fmt.Errorf("error syncing: %+v", err))
}
timer := time.NewTimer(maxBlockTime)
for {
select {
case block, ok := <-b.newBlockCh:
if !ok {
timer.Stop()
logger.Info("Closing transition batch builder new block channel. If not shutting down, this is an error")
return
}
built, err := b.handleNewBlock(block)
if err != nil {
panic(fmt.Errorf("error handling new block. Error: %v. Block: %+v", err, block))
}
if timer != nil && built {
timer.Reset(b.maxTransitionBatchTime)
}
case <-timer.C:
if lastProcessed != b.lastProcessedBlockNumber && b.activeBatch.firstBlockNumber != 0 {
if _, err := b.buildRollupBlock(true); err != nil {
panic(fmt.Errorf("error buidling block: %v", err))
}
}
lastProcessed = b.lastProcessedBlockNumber
timer.Reset(maxBlockTime)
}
}
}
// handleNewBlock processes a newly received Geth Block, ignoring old / future blocks
// and building and submitting TransitionBatches if the pending TransitionBatch is full.
func (b *TransitionBatchBuilder) handleNewBlock(block *types.Block) (bool, error) {
logger.Debug("handling new block in transition batch builder", "block", block)
if block.NumberU64() <= b.lastProcessedBlockNumber {
logger.Debug("handling old block -- ignoring", "block", block)
return false, nil
}
if block.NumberU64() > b.lastProcessedBlockNumber+1 {
logger.Error("received future block", "block", block, "expectedNumber", b.lastProcessedBlockNumber+1)
// TODO: add to queue and/or try to fetch blocks in between.
return false, nil
}
if txCount := len(block.Transactions()); txCount > 1 {
// should never happen
logger.Error("received block with more than one transaction", "block", block)
return false, ErrMoreThanOneTxInBlock
} else if txCount == 0 {
logger.Debug("handling empty block -- ignoring", "block", block)
b.lastProcessedBlockNumber = block.NumberU64()
return false, nil
}
switch err := b.addBlock(block); err {
case core.ErrGasLimitReached, ErrTransactionLimitReached:
if _, e := b.buildRollupBlock(false); e != nil {
logger.Error("unable to build transition batch", "error", e, "transition batch", b.activeBatch)
return false, e
}
if addErr := b.addBlock(block); addErr != nil {
// TODO: Retry and whatnot instead of instant panic
logger.Error("unable to build transition batch", "error", addErr, "transition batch", b.activeBatch)
return false, addErr
}
default:
if err != nil {
logger.Error("unrecognized error adding to transition batch in progress", "error", err, "transition batch", b.activeBatch)
return false, err
} else {
logger.Debug("successfully added block to transition batch in progress", "number", block.NumberU64())
}
}
built, err := b.tryBuildRollupBlock()
if err != nil {
logger.Error("error building block", "error", err, "block", block)
return false, err
}
return built, nil
}
// sync catches the TransitionBatchBuilder up to the Geth chain by fetching all Geth Blocks between
// its last processed Block and the current Block, building and submitting RollupBlocks if/when
// they are full.
func (b *TransitionBatchBuilder) sync() error {
logger.Info("syncing blocks in transition batch builder", "starting block", b.lastProcessedBlockNumber)
for {
blockNum := b.lastProcessedBlockNumber + uint64(1)
block := b.blockProvider.GetBlockByNumber(blockNum)
logger.Info("got block number", "number", blockNum, "block", block)
if block == nil {
logger.Info("done syncing blocks in transition batch builder", "number", b.lastProcessedBlockNumber)
return nil
}
if _, err := b.handleNewBlock(block); err != nil {
logger.Error("Error handling new block", "error", err)
return err
} else {
logger.Debug("successfully synced block", "number", blockNum, "last processed", b.lastProcessedBlockNumber)
}
}
}
// addBlock adds a Geth Block to the TransitionBatch if it fits. If not, it will return an error.
func (b *TransitionBatchBuilder) addBlock(block *types.Block) error {
b.pendingMu.Lock()
defer b.pendingMu.Unlock()
if err := b.activeBatch.addBlock(block, b.maxTransitionBatchGas, b.maxTransitionBatchTransactions); err != nil {
return err
}
b.lastProcessedBlockNumber = block.NumberU64()
return nil
}
// tryBuildRollupBlock builds and submits a TransitionBatch if the pending TransitionBatch is full.
func (b *TransitionBatchBuilder) tryBuildRollupBlock() (bool, error) {
txCount := len(b.activeBatch.transitionBatch.transitions)
gasAfterOneMoreTx := b.activeBatch.gasUsed + MinTxGas
if txCount < b.maxTransitionBatchTransactions && gasAfterOneMoreTx <= b.maxTransitionBatchGas {
logger.Debug("transition batch is not full, so not finalizing it", "txCount", txCount, "gasAfterOneMoreTx", gasAfterOneMoreTx)
return false, nil
}
logger.Debug("transition batch is full, finalizing it", "txCount", txCount, "gasAfterOneMoreTx", gasAfterOneMoreTx)
return b.buildRollupBlock(false)
}
// buildRollupBlock builds a TransitionBatch if the pending TransitionBatch is full or if force is true
// and the pending TransitionBatch is not empty.
func (b *TransitionBatchBuilder) buildRollupBlock(force bool) (bool, error) {
var toSubmit *ActiveBatch
b.pendingMu.Lock()
defer b.pendingMu.Unlock()
txCount := len(b.activeBatch.transitionBatch.transitions)
if force && txCount == 0 {
logger.Debug("transition batch is empty so not finalizing it, even though force = true")
return false, nil
}
if !force && txCount < b.maxTransitionBatchTransactions && b.activeBatch.gasUsed+MinTxGas <= b.maxTransitionBatchGas {
logger.Debug("transition batch is not full, so not finalizing it")
return false, nil
}
logger.Debug("building transition batch")
toSubmit = b.activeBatch
b.activeBatch = newActiveBatch(b.maxTransitionBatchTransactions)
if err := b.submitBlock(toSubmit); err != nil {
logger.Error("error submitting transition batch", "lastBlockNumber", toSubmit.lastBlockNumber, "error", err)
return false, err
}
logger.Debug("successfully built transition batch", "lastBlockNumber", toSubmit.lastBlockNumber)
return true, nil
}
// submitBlock submits a TransitionBatch to the RollupTransitionBatchSubmitter and updates the DB
// to indicate the last processed Geth Block included in the TransitionBatch.
func (b *TransitionBatchBuilder) submitBlock(block *ActiveBatch) error {
// TODO: Submit to chain & get hash
logger.Debug("submitting transition batch", "block", block)
if err := b.rollupBatchSubmitter.submit(block.transitionBatch); err != nil {
return err
}
if err := b.db.Put(LastProcessedDBKey, SerializeBlockNumber(block.lastBlockNumber)); err != nil {
logger.Error("error saving last processed transition batch", "block", block)
// TODO: Something here
}
logger.Debug("transition batch submitted", "block", block)
return nil
}
// fetchLastProcessedBlockNumber fetches the last processed Geth Block # from the DB.
func fetchLastProcessedBlockNumber(db ethdb.Database) (uint64, error) {
has, err := db.Has(LastProcessedDBKey)
if err != nil {
logger.Error("received error checking if LastProcessedDBKey exists in DB", "error", err)
return 0, err
}
if has {
lastProcessedBytes, e := db.Get(LastProcessedDBKey)
if e != nil {
logger.Error("error fetching LastProcessedDBKey from DB", "error", err)
return 0, err
}
lastProcessedBlock := DeserializeBlockNumber(lastProcessedBytes)
logger.Info("fetched last processed block from database", "number", lastProcessedBlock)
return lastProcessedBlock, nil
} else {
logger.Info("no last processed block found in the db -- returning 0")
return 0, nil
}
}
// SerializeBlockNumber serializes the number for DB storage
func SerializeBlockNumber(blockNumber uint64) []byte {
numberAsByteArray := make([]byte, 8)
binary.LittleEndian.PutUint64(numberAsByteArray, blockNumber)
return numberAsByteArray
}
// DeserializeBlockNumber deserializes the number from DB storage
func DeserializeBlockNumber(blockNumber []byte) uint64 {
return binary.LittleEndian.Uint64(blockNumber)
}
// GetBlockRollupGasUsage determines the amount of L1 gas the provided Geth Block will use
// when submitted to mainnet.
func GetBlockRollupGasUsage(block *types.Block) uint64 {
return params.SstoreSetGas + uint64(len(block.Transactions()[0].Data()))*params.TxDataNonZeroGasEIP2028
}

View File

@ -0,0 +1,491 @@
package rollup
import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"math/big"
"testing"
"time"
)
var (
timeoutDuration = time.Millisecond * 100
testTxPoolConfig core.TxPoolConfig
cliqueChainConfig *params.ChainConfig
// Test accounts
testBankKey, _ = crypto.GenerateKey()
testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
testBankFunds = big.NewInt(1000000000000000000)
testUserKey, _ = crypto.GenerateKey()
testUserAddress = crypto.PubkeyToAddress(testUserKey.PublicKey)
)
func init() {
cliqueChainConfig = params.AllCliqueProtocolChanges
cliqueChainConfig.Clique = &params.CliqueConfig{
Period: 10,
Epoch: 30000,
}
}
type TestBlockStore struct {
blocks map[uint64]*types.Block
}
func newTestBlockStore(blocks []*types.Block) *TestBlockStore {
store := &TestBlockStore{blocks: make(map[uint64]*types.Block, len(blocks))}
for _, block := range blocks {
store.blocks[block.NumberU64()] = block
}
return store
}
func (t *TestBlockStore) GetBlockByNumber(number uint64) *types.Block {
if block, found := t.blocks[number]; found {
return block
}
return nil
}
type TestTransitionBatchSubmitter struct {
submittedTransitions []*TransitionBatch
submitCh chan *TransitionBatch
}
func newTestBlockSubmitter(submittedBlocks []*TransitionBatch, submitCh chan *TransitionBatch) *TestTransitionBatchSubmitter {
return &TestTransitionBatchSubmitter{
submittedTransitions: submittedBlocks,
submitCh: submitCh,
}
}
func (t *TestTransitionBatchSubmitter) submit(block *TransitionBatch) error {
t.submittedTransitions = append(t.submittedTransitions, block)
t.submitCh <- block
return nil
}
func createBlocks(number int, startIndex int, withTx bool) types.Blocks {
blocks := make(types.Blocks, number)
for i := 0; i < number; i++ {
header := &types.Header{Number: big.NewInt(int64(i + startIndex))}
txs := make(types.Transactions, 0)
if withTx {
tx, _ := types.SignTx(types.NewTransaction(uint64(i), testUserAddress, big.NewInt(1), params.TxGas, big.NewInt(0), nil), types.HomesteadSigner{}, testBankKey)
txs = append(txs, tx)
}
block := types.NewBlock(header, txs, make([]*types.Header, 0), make([]*types.Receipt, 0))
blocks[i] = block
}
return blocks
}
func assertTransitionFromBlock(t *testing.T, transition *Transition, block *types.Block) {
if transition.postState != block.Root() {
t.Fatal("expecting transitionBatch postState to equal block root", "postState", transition.postState, "block.Hash()", block.Root())
}
if transition.transaction.Hash() != block.Transactions()[0].Hash() {
t.Fatal("expecting transitionBatch tx hash to equal block tx hash", "transitionBatch tx", transition.transaction.Hash(), "block tx", block.Transactions()[0].Hash())
}
}
func newTestTransitionBatchBuilder(blockStore *TestBlockStore, batchSubmitter *TestTransitionBatchSubmitter, lastProcessedBlock uint64, maxBlockTime time.Duration, maxBlockGas uint64, maxBlockTransactions int) (*TransitionBatchBuilder, error) {
db := rawdb.NewMemoryDatabase()
if lastProcessedBlock != 0 {
if err := db.Put(LastProcessedDBKey, SerializeBlockNumber(lastProcessedBlock)); err != nil {
return nil, err
}
}
return NewTransitionBatchBuilder(db, blockStore, batchSubmitter, maxBlockTime, maxBlockGas, maxBlockTransactions)
}
func getSubmitChBlockStoreAndSubmitter() (chan *TransitionBatch, *TestBlockStore, *TestTransitionBatchSubmitter) {
submitCh := make(chan *TransitionBatch, 10)
return submitCh, newTestBlockStore(make([]*types.Block, 0)), newTestBlockSubmitter(make([]*TransitionBatch, 0), submitCh)
}
/***************
* Tests Start *
***************/
/********************
* Submission Tests *
********************/
// Single block submission tests
func TestBatchSubmissionMaxTransactions(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, 1_000_000_000, 1)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(1, 1, true)
blockBuilder.NewBlock(blocks[0])
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], blocks[0])
if len(batchSubmitter.submittedTransitions) > 1 {
t.Fatal("Expected 1 batch to have been submitted", "numSubmitted", len(batchSubmitter.submittedTransitions))
}
case <-timeout:
t.Fatalf("test timeout")
}
}
func TestBlockLessThanMaxTransactions(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, 1_000_000_000, 2)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(1, 1, true)
blockBuilder.NewBlock(blocks[0])
timeout := time.After(timeoutDuration)
select {
case <-batchSubmitCh:
t.Fatalf("should not have submitted a block")
case <-timeout:
}
}
func TestBatchSubmissionMaxGas(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blocks := createBlocks(1, 1, true)
gasLimit := GetBlockRollupGasUsage(blocks[0]) + TransitionBatchGasBuffer
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, gasLimit, 2)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blockBuilder.NewBlock(blocks[0])
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], blocks[0])
if len(batchSubmitter.submittedTransitions) > 1 {
t.Fatal("Expected 1 batch to have been submitted", "numSubmitted", len(batchSubmitter.submittedTransitions))
}
case <-timeout:
t.Fatalf("test timeout")
}
}
func TestBlockLessThanMaxGas(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blocks := createBlocks(1, 1, true)
gasLimit := GetBlockRollupGasUsage(blocks[0]) + TransitionBatchGasBuffer + MinTxGas
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, gasLimit, 2)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blockBuilder.NewBlock(blocks[0])
timeout := time.After(timeoutDuration)
select {
case <-batchSubmitCh:
t.Fatalf("should not have submitted a block")
case <-timeout:
}
}
// Multiple block submission tests
func TestMultipleBatchSubmissionMaxTransactions(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, 1_000_000_000, 1)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(2, 1, true)
blockBuilder.NewBlock(blocks[0])
blockBuilder.NewBlock(blocks[1])
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], blocks[0])
time.Sleep(time.Microsecond * 10)
if len(batchSubmitter.submittedTransitions) != 2 {
t.Fatal("Expected 2 batch to have been submitted", "numSubmitted", len(batchSubmitter.submittedTransitions))
}
assertTransitionFromBlock(t, batchSubmitter.submittedTransitions[1].transitions[0], blocks[1])
case <-timeout:
t.Fatalf("test timeout")
}
}
func TestMultipleBlocksLessThanMaxTransactions(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, 1_000_000_000, 3)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(2, 1, true)
blockBuilder.NewBlock(blocks[0])
blockBuilder.NewBlock(blocks[1])
timeout := time.After(timeoutDuration)
select {
case <-batchSubmitCh:
t.Fatalf("should not have submitted a block")
case <-timeout:
}
}
func TestMultipleBatchSubmissionMaxGas(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blocks := createBlocks(2, 1, true)
gasLimit := GetBlockRollupGasUsage(blocks[0]) + TransitionBatchGasBuffer
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, gasLimit, 3)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blockBuilder.NewBlock(blocks[0])
blockBuilder.NewBlock(blocks[1])
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], blocks[0])
time.Sleep(time.Microsecond * 10)
if len(batchSubmitter.submittedTransitions) != 2 {
t.Fatal("Expected 2 batch to have been submitted", "numSubmitted", len(batchSubmitter.submittedTransitions))
}
assertTransitionFromBlock(t, batchSubmitter.submittedTransitions[1].transitions[0], blocks[1])
case <-timeout:
t.Fatalf("test timeout")
}
}
func TestMultipleBlocksLessThanMaxGas(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blocks := createBlocks(2, 1, true)
gasLimit := 2 * (GetBlockRollupGasUsage(blocks[0]) + TransitionBatchGasBuffer + MinTxGas)
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, gasLimit, 3)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blockBuilder.NewBlock(blocks[0])
blockBuilder.NewBlock(blocks[1])
timeout := time.After(timeoutDuration)
select {
case <-batchSubmitCh:
t.Fatalf("should not have submitted a block")
case <-timeout:
}
}
// Empty block tests
func TestEmptyBlocksIgnored(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, 1_000_000_000, 1)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(2, 1, false)
blockBuilder.NewBlock(blocks[0])
blockBuilder.NewBlock(blocks[1])
timeout := time.After(timeoutDuration)
select {
case <-batchSubmitCh:
t.Fatalf("should not have submitted a block")
case <-timeout:
}
}
func TestEmptyBlocksIgnoredWithNonEmpty(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Minute*1, 1_000_000_000, 1)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
emptyBlocks := createBlocks(2, 1, false)
blockBuilder.NewBlock(emptyBlocks[0])
blockBuilder.NewBlock(emptyBlocks[1])
nonEmpty := createBlocks(1, 3, true)[0]
blockBuilder.NewBlock(nonEmpty)
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], nonEmpty)
if len(batchSubmitter.submittedTransitions) > 1 {
t.Fatal("Expected 1 batch to have been submitted", "numSubmitted", len(batchSubmitter.submittedTransitions))
}
case <-timeout:
t.Fatalf("test timeout")
}
}
// timer submission
func TestBatchSubmissionMaxTimeBetweenBlocks(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Microsecond*1, 1_000_000_000, 10)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(2, 1, true)
blockBuilder.NewBlock(blocks[0])
blockBuilder.NewBlock(blocks[1])
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], blocks[0])
time.Sleep(time.Microsecond * 10)
if len(batchSubmitter.submittedTransitions) != 2 && len(transitionBatch.transitions) != 2 {
t.Fatal("Expected 2 transitions to have been submitted", "blocksSubmitted", len(batchSubmitter.submittedTransitions), "transitionsInFirst", len(transitionBatch.transitions))
}
var secondTransition *Transition
switch true {
case len(batchSubmitter.submittedTransitions) == 2:
secondTransition = batchSubmitter.submittedTransitions[1].transitions[0]
case len(transitionBatch.transitions) == 2:
secondTransition = transitionBatch.transitions[1]
}
assertTransitionFromBlock(t, secondTransition, blocks[1])
case <-timeout:
t.Fatalf("test timeout")
}
}
func TestBatchSubmissionMaxTimeBetweenBlocksReset(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 0, time.Microsecond*1, 1_000_000_000, 10)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(2, 1, true)
blockBuilder.NewBlock(blocks[0])
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], blocks[0])
if len(batchSubmitter.submittedTransitions) != 1 {
t.Fatal("Expected 1 batch to have been submitted", "blocksSubmitted", len(batchSubmitter.submittedTransitions))
}
case <-timeout:
t.Fatalf("test timeout")
}
blockBuilder.NewBlock(blocks[1])
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], blocks[1])
if len(batchSubmitter.submittedTransitions) != 2 {
t.Fatal("Expected 2 batches to have been submitted", "blocksSubmitted", len(batchSubmitter.submittedTransitions))
}
case <-timeout:
t.Fatalf("test timeout")
}
}
/***********************
* Existing Data Tests *
***********************/
func TestBatchSubmissionWithExistingData(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 1, time.Minute*1, 1_000_000_000, 1)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(1, 2, true)
blockBuilder.NewBlock(blocks[0])
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], blocks[0])
if len(batchSubmitter.submittedTransitions) > 1 {
t.Fatal("Expected 1 batch to have been submitted", "numSubmitted", len(batchSubmitter.submittedTransitions))
}
case <-timeout:
t.Fatalf("test timeout")
}
}
func TestBatchSubmissionWithExistingDataNoRepeats(t *testing.T) {
batchSubmitCh, blockStore, batchSubmitter := getSubmitChBlockStoreAndSubmitter()
blockBuilder, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 1, time.Minute*1, 1_000_000_000, 1)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
blocks := createBlocks(1, 1, true)
blockBuilder.NewBlock(blocks[0])
timeout := time.After(timeoutDuration)
select {
case <-batchSubmitCh:
t.Fatalf("block should not have been submitted")
case <-timeout:
}
}
func TestBatchSubmissionWithExistingDataNewBlocks(t *testing.T) {
existingBlocks := createBlocks(2, 1, true)
batchSubmitCh := make(chan *TransitionBatch, 10)
blockStore, batchSubmitter := newTestBlockStore(existingBlocks), newTestBlockSubmitter(make([]*TransitionBatch, 0), batchSubmitCh)
_, err := newTestTransitionBatchBuilder(blockStore, batchSubmitter, 1, time.Minute*1, 1_000_000_000, 1)
if err != nil {
t.Fatalf("unable to make test batch builder, error: %v", err)
}
timeout := time.After(timeoutDuration)
select {
case transitionBatch := <-batchSubmitCh:
assertTransitionFromBlock(t, transitionBatch.transitions[0], existingBlocks[1])
if len(batchSubmitter.submittedTransitions) != 1 {
t.Fatal("Expected 1 batch to have been submitted", "numSubmitted", len(batchSubmitter.submittedTransitions))
}
case <-timeout:
t.Fatalf("test timeout")
}
}

View File

@ -0,0 +1,14 @@
package rollup
type RollupTransitionBatchSubmitter interface {
submit(block *TransitionBatch) error
}
type TransitionBatchSubmitter struct{}
func NewBlockSubmitter() *TransitionBatchSubmitter {
return &TransitionBatchSubmitter{}
}
func (d *TransitionBatchSubmitter) submit(block *TransitionBatch) error {
return nil
}

42
rollup/types.go Normal file
View File

@ -0,0 +1,42 @@
package rollup
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
)
const (
MinTxBytes = uint64(100)
MinTxGas = MinTxBytes*params.TxDataNonZeroGasEIP2028 + params.SstoreSetGas
TransitionBatchGasBuffer = uint64(1_000_000)
)
type BlockStore interface {
GetBlockByNumber(number uint64) *types.Block
}
type Transition struct {
transaction *types.Transaction
postState common.Hash
}
func newTransition(tx *types.Transaction, postState common.Hash) *Transition {
return &Transition{
transaction: tx,
postState: postState,
}
}
type TransitionBatch struct {
transitions []*Transition
}
func NewTransitionBatch(defaultSize int) *TransitionBatch {
return &TransitionBatch{transitions: make([]*Transition, 0, defaultSize)}
}
// addBlock adds a Geth Block to the TransitionBatch. This is just its transaction and state root.
func (r *TransitionBatch) addBlock(block *types.Block) {
r.transitions = append(r.transitions, newTransition(block.Transactions()[0], block.Root()))
}