rm PrepareTxForBatch method
This commit is contained in:
parent
f4191c87e8
commit
efc37ef8d7
@ -280,10 +280,6 @@ func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, error) {
|
|
||||||
return tx, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// logNodeCounters periodically logs the number of node processed.
|
// logNodeCounters periodically logs the number of node processed.
|
||||||
func (p *publisher) logNodeCounters() {
|
func (p *publisher) logNodeCounters() {
|
||||||
t := time.NewTicker(logInterval)
|
t := time.NewTicker(logInterval)
|
||||||
|
@ -213,26 +213,6 @@ func (p *publisher) PublishCode(height *big.Int, codeHash common.Hash, codeBytes
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *publisher) PrepareTxForBatch(tx snapt.Tx, maxBatchSize uint) (snapt.Tx, error) {
|
|
||||||
var err error
|
|
||||||
// maximum batch size reached, commit the current transaction and begin a new transaction.
|
|
||||||
if maxBatchSize <= p.currBatchSize {
|
|
||||||
if err = tx.Commit(); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
snapTx, err := p.db.Begin(context.Background())
|
|
||||||
tx = pubTx{Tx: snapTx}
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
p.currBatchSize = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// logNodeCounters periodically logs the number of node processed.
|
// logNodeCounters periodically logs the number of node processed.
|
||||||
func (p *publisher) logNodeCounters() {
|
func (p *publisher) logNodeCounters() {
|
||||||
t := time.NewTicker(logInterval)
|
t := time.NewTicker(logInterval)
|
||||||
|
@ -53,7 +53,6 @@ type Service struct {
|
|||||||
ethDB ethdb.Database
|
ethDB ethdb.Database
|
||||||
stateDB state.Database
|
stateDB state.Database
|
||||||
ipfsPublisher Publisher
|
ipfsPublisher Publisher
|
||||||
maxBatchSize uint
|
|
||||||
tracker iteratorTracker
|
tracker iteratorTracker
|
||||||
recoveryFile string
|
recoveryFile string
|
||||||
}
|
}
|
||||||
@ -74,7 +73,6 @@ func NewSnapshotService(edb ethdb.Database, pub Publisher, recoveryFile string)
|
|||||||
ethDB: edb,
|
ethDB: edb,
|
||||||
stateDB: state.NewDatabase(edb),
|
stateDB: state.NewDatabase(edb),
|
||||||
ipfsPublisher: pub,
|
ipfsPublisher: pub,
|
||||||
maxBatchSize: defaultBatchSize,
|
|
||||||
recoveryFile: recoveryFile,
|
recoveryFile: recoveryFile,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@ -399,11 +397,6 @@ func (s *Service) createNodeSnapshot(tx Tx, path []byte, it trie.NodeIterator, h
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch res.node.NodeType {
|
switch res.node.NodeType {
|
||||||
case Leaf:
|
case Leaf:
|
||||||
// if the node is a leaf, decode the account and publish the associated storage trie
|
// if the node is a leaf, decode the account and publish the associated storage trie
|
||||||
@ -486,11 +479,6 @@ func (s *Service) storageSnapshot(sr common.Hash, headerID string, height *big.I
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err = s.ipfsPublisher.PrepareTxForBatch(tx, s.maxBatchSize)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var nodeData []byte
|
var nodeData []byte
|
||||||
nodeData, err = s.stateDB.TrieDB().Node(it.Hash())
|
nodeData, err = s.stateDB.TrieDB().Node(it.Hash())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -64,8 +64,6 @@ func TestCreateSnapshot(t *testing.T) {
|
|||||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
|
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
|
||||||
pub.EXPECT().BeginTx().Return(tx, nil).
|
pub.EXPECT().BeginTx().Return(tx, nil).
|
||||||
Times(workers)
|
Times(workers)
|
||||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
|
|
||||||
AnyTimes()
|
|
||||||
tx.EXPECT().Commit().
|
tx.EXPECT().Commit().
|
||||||
Times(workers)
|
Times(workers)
|
||||||
pub.EXPECT().PublishStateNode(
|
pub.EXPECT().PublishStateNode(
|
||||||
@ -187,8 +185,6 @@ func TestAccountSelectiveSnapshot(t *testing.T) {
|
|||||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
|
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
|
||||||
pub.EXPECT().BeginTx().Return(tx, nil).
|
pub.EXPECT().BeginTx().Return(tx, nil).
|
||||||
Times(workers)
|
Times(workers)
|
||||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).
|
|
||||||
AnyTimes()
|
|
||||||
tx.EXPECT().Commit().
|
tx.EXPECT().Commit().
|
||||||
Times(workers)
|
Times(workers)
|
||||||
pub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)).
|
pub.EXPECT().PublishCode(gomock.Eq(fixt.Chain2_Block32_Header.Number), gomock.Any(), gomock.Any(), gomock.Eq(tx)).
|
||||||
@ -299,7 +295,6 @@ func TestRecovery(t *testing.T) {
|
|||||||
pub, tx := makeMocks(t)
|
pub, tx := makeMocks(t)
|
||||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
|
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
|
||||||
pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers)
|
pub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers)
|
||||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
|
|
||||||
tx.EXPECT().Commit().MaxTimes(workers)
|
tx.EXPECT().Commit().MaxTimes(workers)
|
||||||
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
|
pub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
|
||||||
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
|
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
|
||||||
@ -346,7 +341,6 @@ func TestRecovery(t *testing.T) {
|
|||||||
recoveryPub, tx := makeMocks(t)
|
recoveryPub, tx := makeMocks(t)
|
||||||
recoveryPub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
|
recoveryPub.EXPECT().PublishHeader(gomock.Eq(&fixt.Block1_Header))
|
||||||
recoveryPub.EXPECT().BeginTx().Return(tx, nil).AnyTimes()
|
recoveryPub.EXPECT().BeginTx().Return(tx, nil).AnyTimes()
|
||||||
recoveryPub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
|
|
||||||
tx.EXPECT().Commit().AnyTimes()
|
tx.EXPECT().Commit().AnyTimes()
|
||||||
recoveryPub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
|
recoveryPub.EXPECT().PublishStateNode(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
|
||||||
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
|
DoAndReturn(func(node *snapt.Node, _ string, _ *big.Int, _ snapt.Tx) error {
|
||||||
@ -432,7 +426,6 @@ func TestAccountSelectiveRecovery(t *testing.T) {
|
|||||||
pub, tx := makeMocks(t)
|
pub, tx := makeMocks(t)
|
||||||
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
|
pub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
|
||||||
pub.EXPECT().BeginTx().Return(tx, nil).Times(workers)
|
pub.EXPECT().BeginTx().Return(tx, nil).Times(workers)
|
||||||
pub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
|
|
||||||
tx.EXPECT().Commit().Times(workers)
|
tx.EXPECT().Commit().Times(workers)
|
||||||
pub.EXPECT().PublishStateNode(
|
pub.EXPECT().PublishStateNode(
|
||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
@ -492,7 +485,6 @@ func TestAccountSelectiveRecovery(t *testing.T) {
|
|||||||
recoveryPub, tx := makeMocks(t)
|
recoveryPub, tx := makeMocks(t)
|
||||||
recoveryPub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
|
recoveryPub.EXPECT().PublishHeader(gomock.Eq(&fixt.Chain2_Block32_Header))
|
||||||
recoveryPub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers)
|
recoveryPub.EXPECT().BeginTx().Return(tx, nil).MaxTimes(workers)
|
||||||
recoveryPub.EXPECT().PrepareTxForBatch(gomock.Any(), gomock.Any()).Return(tx, nil).AnyTimes()
|
|
||||||
tx.EXPECT().Commit().MaxTimes(workers)
|
tx.EXPECT().Commit().MaxTimes(workers)
|
||||||
recoveryPub.EXPECT().PublishStateNode(
|
recoveryPub.EXPECT().PublishStateNode(
|
||||||
gomock.Any(),
|
gomock.Any(),
|
||||||
|
@ -13,7 +13,6 @@ type Publisher interface {
|
|||||||
PublishStorageNode(node *Node, headerID string, height *big.Int, statePath []byte, tx Tx) error
|
PublishStorageNode(node *Node, headerID string, height *big.Int, statePath []byte, tx Tx) error
|
||||||
PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, tx Tx) error
|
PublishCode(height *big.Int, codeHash common.Hash, codeBytes []byte, tx Tx) error
|
||||||
BeginTx() (Tx, error)
|
BeginTx() (Tx, error)
|
||||||
PrepareTxForBatch(tx Tx, batchSize uint) (Tx, error)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Tx interface {
|
type Tx interface {
|
||||||
|
Loading…
Reference in New Issue
Block a user