snapshotter ignores nodes not along a path along those derived from a list of account addresses if one is provided

This commit is contained in:
i-norden 2022-06-14 23:10:29 -05:00 committed by nabarun
parent b241bf05eb
commit 5b86cadeff

View File

@ -48,12 +48,13 @@ var (
// Service holds ethDB and stateDB to read data from lvldb and Publisher // Service holds ethDB and stateDB to read data from lvldb and Publisher
// to publish trie in postgres DB. // to publish trie in postgres DB.
type Service struct { type Service struct {
ethDB ethdb.Database watchingAddresses bool
stateDB state.Database ethDB ethdb.Database
ipfsPublisher Publisher stateDB state.Database
maxBatchSize uint ipfsPublisher Publisher
tracker iteratorTracker maxBatchSize uint
recoveryFile string tracker iteratorTracker
recoveryFile string
} }
func NewLevelDB(con *EthConfig) (ethdb.Database, error) { func NewLevelDB(con *EthConfig) (ethdb.Database, error) {
@ -78,11 +79,17 @@ func NewSnapshotService(edb ethdb.Database, pub Publisher, recoveryFile string)
} }
type SnapshotParams struct { type SnapshotParams struct {
Height uint64 WatchedAddresses map[common.Address]struct{}
Workers uint Height uint64
Workers uint
} }
func (s *Service) CreateSnapshot(params SnapshotParams) error { func (s *Service) CreateSnapshot(params SnapshotParams) error {
paths := make([][]byte, 0, len(params.WatchedAddresses))
for addr := range params.WatchedAddresses {
paths = append(paths, crypto.Keccak256(addr.Bytes()))
}
s.watchingAddresses = len(paths) > 0
// extract header from lvldb and publish to PG-IPFS // extract header from lvldb and publish to PG-IPFS
// hold onto the headerID so that we can link the state nodes to this header // hold onto the headerID so that we can link the state nodes to this header
log.Infof("Creating snapshot at height %d", params.Height) log.Infof("Creating snapshot at height %d", params.Height)
@ -144,21 +151,21 @@ func (s *Service) CreateSnapshot(params SnapshotParams) error {
}() }()
if len(iters) > 0 { if len(iters) > 0 {
return s.createSnapshotAsync(iters, headerID, new(big.Int).SetUint64(params.Height)) return s.createSnapshotAsync(iters, headerID, new(big.Int).SetUint64(params.Height), paths)
} else { } else {
return s.createSnapshot(iters[0], headerID, new(big.Int).SetUint64(params.Height)) return s.createSnapshot(iters[0], headerID, new(big.Int).SetUint64(params.Height), paths)
} }
} }
// Create snapshot up to head (ignores height param) // Create snapshot up to head (ignores height param)
func (s *Service) CreateLatestSnapshot(workers uint) error { func (s *Service) CreateLatestSnapshot(workers uint, watchedAddresses map[common.Address]struct{}) error {
log.Info("Creating snapshot at head") log.Info("Creating snapshot at head")
hash := rawdb.ReadHeadHeaderHash(s.ethDB) hash := rawdb.ReadHeadHeaderHash(s.ethDB)
height := rawdb.ReadHeaderNumber(s.ethDB, hash) height := rawdb.ReadHeaderNumber(s.ethDB, hash)
if height == nil { if height == nil {
return fmt.Errorf("unable to read header height for header hash %s", hash.String()) return fmt.Errorf("unable to read header height for header hash %s", hash.String())
} }
return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers}) return s.CreateSnapshot(SnapshotParams{Height: *height, Workers: workers, WatchedAddresses: watchedAddresses})
} }
type nodeResult struct { type nodeResult struct {
@ -199,7 +206,15 @@ func resolveNode(it trie.NodeIterator, trieDB *trie.Database) (*nodeResult, erro
}, nil }, nil
} }
func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *big.Int) error { func validPath(currentPath []byte, seekingPaths [][]byte) bool {
for _, seekingPath := range seekingPaths {
if bytes.HasPrefix(seekingPath, currentPath) {
return true
}
}
return false
}
func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
tx, err := s.ipfsPublisher.BeginTx() tx, err := s.ipfsPublisher.BeginTx()
if err != nil { if err != nil {
return err return err
@ -212,6 +227,9 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *
}() }()
for it.Next(true) { for it.Next(true) {
if s.watchingAddresses && !validPath(it.Path(), seekingPaths) {
continue
}
res, err := resolveNode(it, s.stateDB.TrieDB()) res, err := resolveNode(it, s.stateDB.TrieDB())
if err != nil { if err != nil {
return err return err
@ -274,14 +292,14 @@ func (s *Service) createSnapshot(it trie.NodeIterator, headerID string, height *
} }
// Full-trie concurrent snapshot // Full-trie concurrent snapshot
func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string, height *big.Int) error { func (s *Service) createSnapshotAsync(iters []trie.NodeIterator, headerID string, height *big.Int, seekingPaths [][]byte) error {
errors := make(chan error) errors := make(chan error)
var wg sync.WaitGroup var wg sync.WaitGroup
for _, it := range iters { for _, it := range iters {
wg.Add(1) wg.Add(1)
go func(it trie.NodeIterator) { go func(it trie.NodeIterator) {
defer wg.Done() defer wg.Done()
if err := s.createSnapshot(it, headerID, height); err != nil { if err := s.createSnapshot(it, headerID, height, seekingPaths); err != nil {
errors <- err errors <- err
} }
}(it) }(it)