This commit is contained in:
Roy Crihfield 2023-08-30 11:09:53 +08:00
parent f8d68bd1d8
commit c0c5d97c41
3 changed files with 52 additions and 54 deletions

View File

@ -50,7 +50,7 @@ var (
nullCodeHash = crypto.Keccak256([]byte{})
zeroHash common.Hash
defaultSubtrieWorkers uint = 8
defaultSubtrieWorkers uint = 1
)
// Builder interface exposes the method for building a state diff between two blocks
@ -89,7 +89,7 @@ func syncedAppender[T any](to *[]T) func(T) error {
}
// NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) Builder {
func NewBuilder(stateCache adapt.StateView) *builder {
return &builder{
stateCache: stateCache,
subtrieWorkers: defaultSubtrieWorkers,
@ -190,39 +190,36 @@ func (sdb *builder) WriteStateDiffTracked(
}
if subiters != nil {
if len(subiters) != int(sdb.subtrieWorkers) {
return fmt.Errorf("expected to restore %d iterators, got %d", sdb.subtrieWorkers, len(subiters))
// Completed iterators are not saved by the tracker, so restoring fewer than configured is ok,
// but having too many is not expected.
if len(subiters) > int(sdb.subtrieWorkers) {
return fmt.Errorf("restored too many iterators: expected %d, got %d",
sdb.subtrieWorkers, len(subiters))
}
} else {
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
for i := 0; i < int(sdb.subtrieWorkers); i++ {
it, aux := utils.NewSymmetricDifferenceIterator(subitersA[i], subitersB[i])
it = tracker.Tracked(it)
subiters = append(subiters, it)
auxes = append(auxes, aux)
subiters = iterutils.SubtrieIterators(makeIterator, uint(sdb.subtrieWorkers))
for i := range subiters {
subiters[i] = tracker.Tracked(subiters[i])
}
}
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// errgroup will cancel if any group fails
g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ {
for i := range subiters {
func(subdiv uint) {
g.Go(func() error {
// a, b := subitersA[subdiv], subitersB[subdiv]
return sdb.processAccounts(ctx,
subiters[subdiv], auxes[subdiv],
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger,
)
})
}(i)
}(uint(i))
}
return g.Wait()
}
// processAccounts processes account creations and deletions, and returns a set of updated
// existing accounts, indexed by leaf key.
// processAccounts processes account creations, deletions, and updates
func (sdb *builder) processAccounts(
ctx context.Context,
it trie.NodeIterator, aux *utils.SymmDiffAux,
@ -236,8 +233,7 @@ func (sdb *builder) processAccounts(
updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob []byte
prevBlob = it.NodeBlob()
var prevBlob = it.NodeBlob()
for it.Next(true) {
select {
case <-ctx.Done():
@ -276,7 +272,7 @@ func (sdb *builder) processAccounts(
}
continue
}
// Node exists in the new trie
// Node exists in the new trie (B)
if it.Leaf() {
accountW, err := sdb.decodeStateLeaf(it, prevBlob)
if err != nil {
@ -296,42 +292,43 @@ func (sdb *builder) processAccounts(
return err
}
}
} else {
// New trie nodes will be written to blockstore only.
// Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node.
if it.Hash() == zeroHash {
continue
}
// TODO - this can be handled when value node is (craeted?)
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
// if doing a selective diff, we need to ensure this is a watched path
if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return err
}
ok, err := isLeaf(elements)
if err != nil {
return err
}
if ok {
partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...)
if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
}
}
}
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
continue
}
// New inner trie nodes will be written to blockstore only.
// Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node.
// TODO: A zero hash indicates what?
if it.Hash() == zeroHash {
continue
}
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
// if doing a selective diff, we need to ensure this is a watched path
if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return err
}
prevBlob = nodeVal
ok, err := isLeaf(elements)
if err != nil {
return err
}
if ok {
partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...)
if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
}
}
}
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
return err
}
prevBlob = nodeVal
}
for key, update := range updates {

View File

@ -43,7 +43,8 @@ var testHeaderTable = Table{
"mh_key",
"times_validated",
"coinbase",
)}
),
}
func TestTable(t *testing.T) {
headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)`

View File

@ -53,7 +53,7 @@ type StateLeafNode struct {
StorageDiff []StorageLeafNode
}
// StorageLeafNode holds the data for a single storage diff node leaf node
// StorageLeafNode holds the data for a single storage diff leaf node
type StorageLeafNode struct {
Removed bool
Value []byte