trie: remove dependency on ethdb
This removes the core/types -> leveldb dependency.
This commit is contained in:
		
							parent
							
								
									7731061903
								
							
						
					
					
						commit
						d3b751e4d9
					
				| @ -21,7 +21,6 @@ import ( | ||||
| 	"math/big" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| 	"github.com/ethereum/go-ethereum/ethdb" | ||||
| 	"github.com/ethereum/go-ethereum/rlp" | ||||
| 	"github.com/ethereum/go-ethereum/trie" | ||||
| ) | ||||
| @ -32,7 +31,7 @@ import ( | ||||
| type StateSync trie.TrieSync | ||||
| 
 | ||||
| // NewStateSync create a new state trie download scheduler.
 | ||||
| func NewStateSync(root common.Hash, database ethdb.Database) *StateSync { | ||||
| func NewStateSync(root common.Hash, database trie.DatabaseReader) *StateSync { | ||||
| 	var syncer *trie.TrieSync | ||||
| 
 | ||||
| 	callback := func(leaf []byte, parent common.Hash) error { | ||||
| @ -62,8 +61,8 @@ func (s *StateSync) Missing(max int) []common.Hash { | ||||
| // Process injects a batch of retrieved trie nodes data, returning if something
 | ||||
| // was committed to the database and also the index of an entry if processing of
 | ||||
| // it failed.
 | ||||
| func (s *StateSync) Process(list []trie.SyncResult) (bool, int, error) { | ||||
| 	return (*trie.TrieSync)(s).Process(list) | ||||
| func (s *StateSync) Process(list []trie.SyncResult, dbw trie.DatabaseWriter) (bool, int, error) { | ||||
| 	return (*trie.TrieSync)(s).Process(list, dbw) | ||||
| } | ||||
| 
 | ||||
| // Pending returns the number of state entries currently pending for download.
 | ||||
|  | ||||
| @ -138,7 +138,7 @@ func testIterativeStateSync(t *testing.T, batch int) { | ||||
| 			} | ||||
| 			results[i] = trie.SyncResult{Hash: hash, Data: data} | ||||
| 		} | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		queue = append(queue[:0], sched.Missing(batch)...) | ||||
| @ -168,7 +168,7 @@ func TestIterativeDelayedStateSync(t *testing.T) { | ||||
| 			} | ||||
| 			results[i] = trie.SyncResult{Hash: hash, Data: data} | ||||
| 		} | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		queue = append(queue[len(results):], sched.Missing(0)...) | ||||
| @ -206,7 +206,7 @@ func testIterativeRandomStateSync(t *testing.T, batch int) { | ||||
| 			results = append(results, trie.SyncResult{Hash: hash, Data: data}) | ||||
| 		} | ||||
| 		// Feed the retrieved results back and queue new tasks
 | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		queue = make(map[common.Hash]struct{}) | ||||
| @ -249,7 +249,7 @@ func TestIterativeRandomDelayedStateSync(t *testing.T) { | ||||
| 			} | ||||
| 		} | ||||
| 		// Feed the retrieved results back and queue new tasks
 | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		for _, hash := range sched.Missing(0) { | ||||
| @ -283,7 +283,7 @@ func TestIncompleteStateSync(t *testing.T) { | ||||
| 			results[i] = trie.SyncResult{Hash: hash, Data: data} | ||||
| 		} | ||||
| 		// Process each of the state nodes
 | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		for _, result := range results { | ||||
|  | ||||
| @ -1123,15 +1123,20 @@ func (q *queue) deliverNodeData(results []trie.SyncResult, callback func(int, bo | ||||
| 			callback(i, progressed, errNoFetchesPending) | ||||
| 			return | ||||
| 		} | ||||
| 		if prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}); err != nil { | ||||
| 			// Processing a state result failed, bail out
 | ||||
| 
 | ||||
| 		batch := q.stateDatabase.NewBatch() | ||||
| 		prog, _, err := q.stateScheduler.Process([]trie.SyncResult{result}, batch) | ||||
| 		if err != nil { | ||||
| 			q.stateSchedLock.Unlock() | ||||
| 			callback(i, progressed, err) | ||||
| 			return | ||||
| 		} else if prog { | ||||
| 			progressed = true | ||||
| 		} | ||||
| 		if err = batch.Write(); err != nil { | ||||
| 			q.stateSchedLock.Unlock() | ||||
| 			callback(i, progressed, err) | ||||
| 		} | ||||
| 
 | ||||
| 		// Item processing succeeded, release the lock (temporarily)
 | ||||
| 		progressed = progressed || prog | ||||
| 		q.stateSchedLock.Unlock() | ||||
| 	} | ||||
| 	callback(len(results), progressed, nil) | ||||
|  | ||||
							
								
								
									
										24
									
								
								trie/sync.go
									
									
									
									
									
								
							
							
						
						
									
										24
									
								
								trie/sync.go
									
									
									
									
									
								
							| @ -21,7 +21,6 @@ import ( | ||||
| 	"fmt" | ||||
| 
 | ||||
| 	"github.com/ethereum/go-ethereum/common" | ||||
| 	"github.com/ethereum/go-ethereum/ethdb" | ||||
| 	"gopkg.in/karalabe/cookiejar.v2/collections/prque" | ||||
| ) | ||||
| 
 | ||||
| @ -58,13 +57,13 @@ type TrieSyncLeafCallback func(leaf []byte, parent common.Hash) error | ||||
| // unknown trie hashes to retrieve, accepts node data associated with said hashes
 | ||||
| // and reconstructs the trie step by step until all is done.
 | ||||
| type TrieSync struct { | ||||
| 	database ethdb.Database           // State database for storing all the assembled node data
 | ||||
| 	database DatabaseReader | ||||
| 	requests map[common.Hash]*request // Pending requests pertaining to a key hash
 | ||||
| 	queue    *prque.Prque             // Priority queue with the pending requests
 | ||||
| } | ||||
| 
 | ||||
| // NewTrieSync creates a new trie data download scheduler.
 | ||||
| func NewTrieSync(root common.Hash, database ethdb.Database, callback TrieSyncLeafCallback) *TrieSync { | ||||
| func NewTrieSync(root common.Hash, database DatabaseReader, callback TrieSyncLeafCallback) *TrieSync { | ||||
| 	ts := &TrieSync{ | ||||
| 		database: database, | ||||
| 		requests: make(map[common.Hash]*request), | ||||
| @ -145,7 +144,7 @@ func (s *TrieSync) Missing(max int) []common.Hash { | ||||
| // Process injects a batch of retrieved trie nodes data, returning if something
 | ||||
| // was committed to the database and also the index of an entry if processing of
 | ||||
| // it failed.
 | ||||
| func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { | ||||
| func (s *TrieSync) Process(results []SyncResult, dbw DatabaseWriter) (bool, int, error) { | ||||
| 	committed := false | ||||
| 
 | ||||
| 	for i, item := range results { | ||||
| @ -157,7 +156,7 @@ func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { | ||||
| 		// If the item is a raw entry request, commit directly
 | ||||
| 		if request.raw { | ||||
| 			request.data = item.Data | ||||
| 			s.commit(request, nil) | ||||
| 			s.commit(request, dbw) | ||||
| 			committed = true | ||||
| 			continue | ||||
| 		} | ||||
| @ -174,7 +173,7 @@ func (s *TrieSync) Process(results []SyncResult) (bool, int, error) { | ||||
| 			return committed, i, err | ||||
| 		} | ||||
| 		if len(requests) == 0 && request.deps == 0 { | ||||
| 			s.commit(request, nil) | ||||
| 			s.commit(request, dbw) | ||||
| 			committed = true | ||||
| 			continue | ||||
| 		} | ||||
| @ -266,16 +265,9 @@ func (s *TrieSync) children(req *request, object node) ([]*request, error) { | ||||
| // commit finalizes a retrieval request and stores it into the database. If any
 | ||||
| // of the referencing parent requests complete due to this commit, they are also
 | ||||
| // committed themselves.
 | ||||
| func (s *TrieSync) commit(req *request, batch ethdb.Batch) (err error) { | ||||
| 	// Create a new batch if none was specified
 | ||||
| 	if batch == nil { | ||||
| 		batch = s.database.NewBatch() | ||||
| 		defer func() { | ||||
| 			err = batch.Write() | ||||
| 		}() | ||||
| 	} | ||||
| func (s *TrieSync) commit(req *request, dbw DatabaseWriter) (err error) { | ||||
| 	// Write the node content to disk
 | ||||
| 	if err := batch.Put(req.hash[:], req.data); err != nil { | ||||
| 	if err := dbw.Put(req.hash[:], req.data); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	delete(s.requests, req.hash) | ||||
| @ -284,7 +276,7 @@ func (s *TrieSync) commit(req *request, batch ethdb.Batch) (err error) { | ||||
| 	for _, parent := range req.parents { | ||||
| 		parent.deps-- | ||||
| 		if parent.deps == 0 { | ||||
| 			if err := s.commit(parent, batch); err != nil { | ||||
| 			if err := s.commit(parent, dbw); err != nil { | ||||
| 				return err | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| @ -122,7 +122,7 @@ func testIterativeTrieSync(t *testing.T, batch int) { | ||||
| 			} | ||||
| 			results[i] = SyncResult{hash, data} | ||||
| 		} | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		queue = append(queue[:0], sched.Missing(batch)...) | ||||
| @ -152,7 +152,7 @@ func TestIterativeDelayedTrieSync(t *testing.T) { | ||||
| 			} | ||||
| 			results[i] = SyncResult{hash, data} | ||||
| 		} | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		queue = append(queue[len(results):], sched.Missing(10000)...) | ||||
| @ -190,7 +190,7 @@ func testIterativeRandomTrieSync(t *testing.T, batch int) { | ||||
| 			results = append(results, SyncResult{hash, data}) | ||||
| 		} | ||||
| 		// Feed the retrieved results back and queue new tasks
 | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		queue = make(map[common.Hash]struct{}) | ||||
| @ -231,7 +231,7 @@ func TestIterativeRandomDelayedTrieSync(t *testing.T) { | ||||
| 			} | ||||
| 		} | ||||
| 		// Feed the retrieved results back and queue new tasks
 | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		for _, result := range results { | ||||
| @ -272,7 +272,7 @@ func TestDuplicateAvoidanceTrieSync(t *testing.T) { | ||||
| 
 | ||||
| 			results[i] = SyncResult{hash, data} | ||||
| 		} | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		queue = append(queue[:0], sched.Missing(0)...) | ||||
| @ -304,7 +304,7 @@ func TestIncompleteTrieSync(t *testing.T) { | ||||
| 			results[i] = SyncResult{hash, data} | ||||
| 		} | ||||
| 		// Process each of the trie nodes
 | ||||
| 		if _, index, err := sched.Process(results); err != nil { | ||||
| 		if _, index, err := sched.Process(results, dstDb); err != nil { | ||||
| 			t.Fatalf("failed to process result #%d: %v", index, err) | ||||
| 		} | ||||
| 		for _, result := range results { | ||||
|  | ||||
| @ -60,8 +60,12 @@ func init() { | ||||
| 
 | ||||
| // Database must be implemented by backing stores for the trie.
 | ||||
| type Database interface { | ||||
| 	DatabaseReader | ||||
| 	DatabaseWriter | ||||
| 	// Get returns the value for key from the database.
 | ||||
| } | ||||
| 
 | ||||
| // DatabaseReader wraps the Get method of a backing store for the trie.
 | ||||
| type DatabaseReader interface { | ||||
| 	Get(key []byte) (value []byte, err error) | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user