Patch for concurrent iterator & others (onto v1.11.6) #386

Closed
roysc wants to merge 1565 commits from v1.11.6-statediff-v5 into master
2 changed files with 29 additions and 21 deletions
Showing only changes of commit 0ebcd5f4af - Show all commits

View File

@ -24,7 +24,7 @@ const (
VersionMajor = 1 // Major version component of the current release VersionMajor = 1 // Major version component of the current release
VersionMinor = 11 // Minor version component of the current release VersionMinor = 11 // Minor version component of the current release
VersionPatch = 6 // Patch version component of the current release VersionPatch = 6 // Patch version component of the current release
VersionMeta = "statediff-5.0.3-alpha" // Version metadata to append to the version string VersionMeta = "statediff-5.0.4-alpha" // Version metadata to append to the version string
) )
// Version holds the textual version string. // Version holds the textual version string.

View File

@ -132,18 +132,18 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, outp
} }
logger := log.New("hash", args.BlockHash.Hex(), "number", args.BlockNumber) logger := log.New("hash", args.BlockHash.Hex(), "number", args.BlockNumber)
return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput, logger) return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, ipldOutput, logger, nil)
} }
func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params, func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs []IterPair, params Params,
output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger) error { output types2.StateNodeSink, ipldOutput types2.IPLDSink, logger log.Logger, prefixPath []byte) error {
logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes") logger.Debug("statediff BEGIN BuildStateDiffWithIntermediateStateNodes")
defer metrics2.ReportAndUpdateDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer) defer metrics2.ReportAndUpdateDuration("statediff END BuildStateDiffWithIntermediateStateNodes", time.Now(), logger, metrics2.IndexerMetrics.BuildStateDiffWithIntermediateStateNodesTimer)
// collect a slice of all the nodes that were touched and exist at B (B-A) // collect a slice of all the nodes that were touched and exist at B (B-A)
// a map of their leafkey to all the accounts that were touched and exist at B // a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets // and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, err := sdb.createdAndUpdatedState( diffAccountsAtB, err := sdb.createdAndUpdatedState(
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger) iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger, prefixPath)
if err != nil { if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err) return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
} }
@ -152,7 +152,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
// a map of their leafkey to all the accounts that were touched and exist at A // a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState( diffAccountsAtA, err := sdb.deletedOrUpdatedState(
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB, iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
params.watchedAddressesLeafPaths, output, logger) params.watchedAddressesLeafPaths, output, logger, prefixPath)
if err != nil { if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err) return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
} }
@ -191,7 +191,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A // a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both // and a slice of the paths for all of the nodes included in both
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger) (types2.AccountMap, error) { watchedAddressesLeafPaths [][]byte, output types2.IPLDSink, logger log.Logger, prefixPath []byte) (types2.AccountMap, error) {
logger.Debug("statediff BEGIN createdAndUpdatedState") logger.Debug("statediff BEGIN createdAndUpdatedState")
defer metrics2.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateTimer) defer metrics2.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.CreatedAndUpdatedStateTimer)
diffAccountsAtB := make(types2.AccountMap) diffAccountsAtB := make(types2.AccountMap)
@ -206,7 +206,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
// index values by leaf key // index values by leaf key
if it.Leaf() { if it.Leaf() {
// if it is a "value" node, we will index the value by leaf key // if it is a "value" node, we will index the value by leaf key
accountW, err := sdb.processStateValueNode(it, watchedAddressesLeafPaths) accountW, err := sdb.processStateValueNode(it, watchedAddressesLeafPaths, prefixPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -259,30 +259,38 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
} }
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT // reminder: it.Leaf() == true when the iterator is positioned at a "value node" which is not something that actually exists in an MMPT
func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watchedAddressesLeafPaths [][]byte) (*types2.AccountWrapper, error) { func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watchedAddressesLeafPaths [][]byte, prefixPath []byte) (*types2.AccountWrapper, error) {
// skip if it is not a watched address // skip if it is not a watched address
// If we aren't watching any specific addresses, we are watching everything // If we aren't watching any specific addresses, we are watching everything
if len(watchedAddressesLeafPaths) > 0 && !isWatchedAddress(watchedAddressesLeafPaths, it.Path()) { if len(watchedAddressesLeafPaths) > 0 && !isWatchedAddress(watchedAddressesLeafPaths, it.Path()) {
return nil, nil return nil, nil
} }
// created vs updated is important for leaf nodes since we need to diff their storage
// so we need to map all changed accounts at B to their leafkey, since account can change pathes but not leafkey
var account types.StateAccount
accountRLP := make([]byte, len(it.LeafBlob()))
copy(accountRLP, it.LeafBlob())
if err := rlp.DecodeBytes(accountRLP, &account); err != nil {
return nil, fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", it.LeafKey(), err)
}
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
// since this is a "value node", we need to move up to the "parent" node which is the actual leaf node // since this is a "value node", we need to move up to the "parent" node which is the actual leaf node
// it should be in the fastcache since it necessarily was recently accessed to reach the current node // it should be in the fastcache since it necessarily was recently accessed to reach the current node
parentNodeRLP, err := sdb.StateCache.TrieDB().Node(it.Parent()) parentNodeRLP, err := sdb.StateCache.TrieDB().Node(it.Parent())
if err != nil { if err != nil {
return nil, err return nil, err
} }
var nodeElements []interface{}
if err = rlp.DecodeBytes(parentNodeRLP, &nodeElements); err != nil {
return nil, err
}
parentSubPath := make([]byte, len(it.ParentPath()))
copy(parentSubPath, it.ParentPath())
parentPath := append(prefixPath, parentSubPath...)
partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(parentPath, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
var account types.StateAccount
accountRLP := make([]byte, len(it.LeafBlob()))
copy(accountRLP, it.LeafBlob())
if err := rlp.DecodeBytes(accountRLP, &account); err != nil {
return nil, fmt.Errorf("error decoding account for leaf value at leaf key %x\nerror: %v", leafKey, err)
}
return &types2.AccountWrapper{ return &types2.AccountWrapper{
LeafKey: leafKey, LeafKey: leafKey,
Account: &account, Account: &account,
@ -293,7 +301,7 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watched
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B // deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B // and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap, func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap,
watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger) (types2.AccountMap, error) { watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink, logger log.Logger, prefixPath []byte) (types2.AccountMap, error) {
logger.Debug("statediff BEGIN deletedOrUpdatedState") logger.Debug("statediff BEGIN deletedOrUpdatedState")
defer metrics2.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer) defer metrics2.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics2.IndexerMetrics.DeletedOrUpdatedStateTimer)
diffAccountAtA := make(types2.AccountMap) diffAccountAtA := make(types2.AccountMap)
@ -307,7 +315,7 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
} }
if it.Leaf() { if it.Leaf() {
accountW, err := sdb.processStateValueNode(it, watchedAddressesLeafPaths) accountW, err := sdb.processStateValueNode(it, watchedAddressesLeafPaths, prefixPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }