This change imports the Swarm protocol codebase. Compared to the 'swarm' branch, a few mostly cosmetic changes had to be made: * The various redundant log message prefixes are gone. * All files now have LGPLv3 license headers. * Minor code changes were needed to please go vet and make the tests pass on Windows. * Further changes were required to adapt to the go-ethereum develop branch and its new Go APIs. Some code has not (yet) been brought over: * swarm/cmd/bzzhash: will reappear as cmd/bzzhash later * swarm/cmd/bzzup.sh: will be reimplemented in cmd/bzzup * swarm/cmd/makegenesis: will reappear somehow * swarm/examples/album: will move to a separate repository * swarm/examples/filemanager: ditto * swarm/examples/files: will not be merged * swarm/test/*: will not be merged * swarm/services/swear: will reappear as contracts/swear when needed
		
			
				
	
	
		
			391 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			391 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2016 The go-ethereum Authors
 | 
						|
// This file is part of the go-ethereum library.
 | 
						|
//
 | 
						|
// The go-ethereum library is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Lesser General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// The go-ethereum library is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
// GNU Lesser General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Lesser General Public License
 | 
						|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package network
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/binary"
 | 
						|
	"fmt"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/logger"
 | 
						|
	"github.com/ethereum/go-ethereum/logger/glog"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/storage"
 | 
						|
	"github.com/syndtr/goleveldb/leveldb"
 | 
						|
	"github.com/syndtr/goleveldb/leveldb/iterator"
 | 
						|
)
 | 
						|
 | 
						|
const counterKeyPrefix = 0x01
 | 
						|
 | 
						|
/*
 | 
						|
syncDb is a queueing service for outgoing deliveries.
 | 
						|
One instance per priority queue for each peer
 | 
						|
 | 
						|
a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
 | 
						|
once its in-memory buffer is full it switches to persisting in db
 | 
						|
and dbRead iterator iterates through the items keeping their order
 | 
						|
once the db read catches up (there is no more items in the db) then
 | 
						|
it switches back to in-memory buffer.
 | 
						|
 | 
						|
when syncdb is stopped all items in the buffer are saved to the db
 | 
						|
*/
 | 
						|
type syncDb struct {
 | 
						|
	start          []byte               // this syncdb starting index in requestdb
 | 
						|
	key            storage.Key          // remote peers address key
 | 
						|
	counterKey     []byte               // db key to persist counter
 | 
						|
	priority       uint                 // priotity High|Medium|Low
 | 
						|
	buffer         chan interface{}     // incoming request channel
 | 
						|
	db             *storage.LDBDatabase // underlying db (TODO should be interface)
 | 
						|
	done           chan bool            // chan to signal goroutines finished quitting
 | 
						|
	quit           chan bool            // chan to signal quitting to goroutines
 | 
						|
	total, dbTotal int                  // counts for one session
 | 
						|
	batch          chan chan int        // channel for batch requests
 | 
						|
	dbBatchSize    uint                 // number of items before batch is saved
 | 
						|
}
 | 
						|
 | 
						|
// constructor needs a shared request db (leveldb)
 | 
						|
// priority is used in the index key
 | 
						|
// uses a buffer and a leveldb for persistent storage
 | 
						|
// bufferSize, dbBatchSize are config parameters
 | 
						|
func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
 | 
						|
	start := make([]byte, 42)
 | 
						|
	start[1] = byte(priorities - priority)
 | 
						|
	copy(start[2:34], key)
 | 
						|
 | 
						|
	counterKey := make([]byte, 34)
 | 
						|
	counterKey[0] = counterKeyPrefix
 | 
						|
	copy(counterKey[1:], start[1:34])
 | 
						|
 | 
						|
	syncdb := &syncDb{
 | 
						|
		start:       start,
 | 
						|
		key:         key,
 | 
						|
		counterKey:  counterKey,
 | 
						|
		priority:    priority,
 | 
						|
		buffer:      make(chan interface{}, bufferSize),
 | 
						|
		db:          db,
 | 
						|
		done:        make(chan bool),
 | 
						|
		quit:        make(chan bool),
 | 
						|
		batch:       make(chan chan int),
 | 
						|
		dbBatchSize: dbBatchSize,
 | 
						|
	}
 | 
						|
	glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority)
 | 
						|
 | 
						|
	// starts the main forever loop reading from buffer
 | 
						|
	go syncdb.bufferRead(deliver)
 | 
						|
	return syncdb
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
bufferRead is a forever iterator loop that takes care of delivering
 | 
						|
outgoing store requests reads from incoming buffer
 | 
						|
 | 
						|
its argument is the deliver function taking the item as first argument
 | 
						|
and a quit channel as second.
 | 
						|
Closing of this channel is supposed to abort all waiting for delivery
 | 
						|
(typically network write)
 | 
						|
 | 
						|
The iteration switches between 2 modes,
 | 
						|
* buffer mode reads the in-memory buffer and delivers the items directly
 | 
						|
* db mode reads from the buffer and writes to the db, parallelly another
 | 
						|
routine is started that reads from the db and delivers items
 | 
						|
 | 
						|
If there is buffer contention in buffer mode (slow network, high upload volume)
 | 
						|
syncdb switches to db mode and starts dbRead
 | 
						|
Once db backlog is delivered, it reverts back to in-memory buffer
 | 
						|
 | 
						|
It is automatically started when syncdb is initialised.
 | 
						|
 | 
						|
It saves the buffer to db upon receiving quit signal. syncDb#stop()
 | 
						|
*/
 | 
						|
func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
 | 
						|
	var buffer, db chan interface{} // channels representing the two read modes
 | 
						|
	var more bool
 | 
						|
	var req interface{}
 | 
						|
	var entry *syncDbEntry
 | 
						|
	var inBatch, inDb int
 | 
						|
	batch := new(leveldb.Batch)
 | 
						|
	var dbSize chan int
 | 
						|
	quit := self.quit
 | 
						|
	counterValue := make([]byte, 8)
 | 
						|
 | 
						|
	// counter is used for keeping the items in order, persisted to db
 | 
						|
	// start counter where db was at, 0 if not found
 | 
						|
	data, err := self.db.Get(self.counterKey)
 | 
						|
	var counter uint64
 | 
						|
	if err == nil {
 | 
						|
		counter = binary.BigEndian.Uint64(data)
 | 
						|
		glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter)
 | 
						|
	} else {
 | 
						|
		glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter)
 | 
						|
	}
 | 
						|
 | 
						|
LOOP:
 | 
						|
	for {
 | 
						|
		// waiting for item next in the buffer, or quit signal or batch request
 | 
						|
		select {
 | 
						|
		// buffer only closes when writing to db
 | 
						|
		case req = <-buffer:
 | 
						|
			// deliver request : this is blocking on network write so
 | 
						|
			// it is passed the quit channel as argument, so that it returns
 | 
						|
			// if syncdb is stopped. In this case we need to save the item to the db
 | 
						|
			more = deliver(req, self.quit)
 | 
						|
			if !more {
 | 
						|
				glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
 | 
						|
				// received quit signal, save request currently waiting delivery
 | 
						|
				// by switching to db mode and closing the buffer
 | 
						|
				buffer = nil
 | 
						|
				db = self.buffer
 | 
						|
				close(db)
 | 
						|
				quit = nil // needs to block the quit case in select
 | 
						|
				break      // break from select, this item will be written to the db
 | 
						|
			}
 | 
						|
			self.total++
 | 
						|
			glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
 | 
						|
			// by the time deliver returns, there were new writes to the buffer
 | 
						|
			// if buffer contention is detected, switch to db mode which drains
 | 
						|
			// the buffer so no process will block on pushing store requests
 | 
						|
			if len(buffer) == cap(buffer) {
 | 
						|
				glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total)
 | 
						|
				buffer = nil
 | 
						|
				db = self.buffer
 | 
						|
			}
 | 
						|
			continue LOOP
 | 
						|
 | 
						|
			// incoming entry to put into db
 | 
						|
		case req, more = <-db:
 | 
						|
			if !more {
 | 
						|
				// only if quit is called, saved all the buffer
 | 
						|
				binary.BigEndian.PutUint64(counterValue, counter)
 | 
						|
				batch.Put(self.counterKey, counterValue) // persist counter in batch
 | 
						|
				self.writeSyncBatch(batch)               // save batch
 | 
						|
				glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority)
 | 
						|
				break LOOP
 | 
						|
			}
 | 
						|
			self.dbTotal++
 | 
						|
			self.total++
 | 
						|
			// otherwise break after select
 | 
						|
		case dbSize = <-self.batch:
 | 
						|
			// explicit request for batch
 | 
						|
			if inBatch == 0 && quit != nil {
 | 
						|
				// there was no writes since the last batch so db depleted
 | 
						|
				// switch to buffer mode
 | 
						|
				glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority)
 | 
						|
				db = nil
 | 
						|
				buffer = self.buffer
 | 
						|
				dbSize <- 0 // indicates to 'caller' that batch has been written
 | 
						|
				inDb = 0
 | 
						|
				continue LOOP
 | 
						|
			}
 | 
						|
			binary.BigEndian.PutUint64(counterValue, counter)
 | 
						|
			batch.Put(self.counterKey, counterValue)
 | 
						|
			glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue)
 | 
						|
			batch = self.writeSyncBatch(batch)
 | 
						|
			dbSize <- inBatch // indicates to 'caller' that batch has been written
 | 
						|
			inBatch = 0
 | 
						|
			continue LOOP
 | 
						|
 | 
						|
			// closing syncDb#quit channel is used to signal to all goroutines to quit
 | 
						|
		case <-quit:
 | 
						|
			// need to save backlog, so switch to db mode
 | 
						|
			db = self.buffer
 | 
						|
			buffer = nil
 | 
						|
			quit = nil
 | 
						|
			glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority)
 | 
						|
			close(db)
 | 
						|
			continue LOOP
 | 
						|
		}
 | 
						|
 | 
						|
		// only get here if we put req into db
 | 
						|
		entry, err = self.newSyncDbEntry(req, counter)
 | 
						|
		if err != nil {
 | 
						|
			glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err)
 | 
						|
			continue LOOP
 | 
						|
		}
 | 
						|
		batch.Put(entry.key, entry.val)
 | 
						|
		glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter)
 | 
						|
		// if just switched to db mode and not quitting, then launch dbRead
 | 
						|
		// in a parallel go routine to send deliveries from db
 | 
						|
		if inDb == 0 && quit != nil {
 | 
						|
			glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead")
 | 
						|
			go self.dbRead(true, counter, deliver)
 | 
						|
		}
 | 
						|
		inDb++
 | 
						|
		inBatch++
 | 
						|
		counter++
 | 
						|
		// need to save the batch if it gets too large (== dbBatchSize)
 | 
						|
		if inBatch%int(self.dbBatchSize) == 0 {
 | 
						|
			batch = self.writeSyncBatch(batch)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter)
 | 
						|
	close(self.done)
 | 
						|
}
 | 
						|
 | 
						|
// writes the batch to the db and returns a new batch object
 | 
						|
func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
 | 
						|
	err := self.db.Write(batch)
 | 
						|
	if err != nil {
 | 
						|
		glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err)
 | 
						|
		return batch
 | 
						|
	}
 | 
						|
	return new(leveldb.Batch)
 | 
						|
}
 | 
						|
 | 
						|
// abstract type for db entries (TODO could be a feature of Receipts)
 | 
						|
type syncDbEntry struct {
 | 
						|
	key, val []byte
 | 
						|
}
 | 
						|
 | 
						|
func (self syncDbEntry) String() string {
 | 
						|
	return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
	dbRead is iterating over store requests to be sent over to the peer
 | 
						|
	this is mainly to prevent crashes due to network output buffer contention (???)
 | 
						|
	as well as to make syncronisation resilient to disconnects
 | 
						|
	the messages are supposed to be sent in the p2p priority queue.
 | 
						|
 | 
						|
	the request DB is shared between peers, but domains for each syncdb
 | 
						|
	are disjoint. dbkeys (42 bytes) are structured:
 | 
						|
	* 0: 0x00 (0x01 reserved for counter key)
 | 
						|
	* 1: priorities - priority (so that high priority can be replayed first)
 | 
						|
	* 2-33: peers address
 | 
						|
	* 34-41: syncdb counter to preserve order (this field is missing for the counter key)
 | 
						|
 | 
						|
	values (40 bytes) are:
 | 
						|
	* 0-31: key
 | 
						|
	* 32-39: request id
 | 
						|
 | 
						|
dbRead needs a boolean to indicate if on first round all the historical
 | 
						|
record is synced. Second argument to indicate current db counter
 | 
						|
The third is the function to apply
 | 
						|
*/
 | 
						|
func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
 | 
						|
	key := make([]byte, 42)
 | 
						|
	copy(key, self.start)
 | 
						|
	binary.BigEndian.PutUint64(key[34:], counter)
 | 
						|
	var batches, n, cnt, total int
 | 
						|
	var more bool
 | 
						|
	var entry *syncDbEntry
 | 
						|
	var it iterator.Iterator
 | 
						|
	var del *leveldb.Batch
 | 
						|
	batchSizes := make(chan int)
 | 
						|
 | 
						|
	for {
 | 
						|
		// if useBatches is false, cnt is not set
 | 
						|
		if useBatches {
 | 
						|
			// this could be called before all cnt items sent out
 | 
						|
			// so that loop is not blocking while delivering
 | 
						|
			// only relevant if cnt is large
 | 
						|
			select {
 | 
						|
			case self.batch <- batchSizes:
 | 
						|
			case <-self.quit:
 | 
						|
				return
 | 
						|
			}
 | 
						|
			// wait for the write to finish and get the item count in the next batch
 | 
						|
			cnt = <-batchSizes
 | 
						|
			batches++
 | 
						|
			if cnt == 0 {
 | 
						|
				// empty
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
		it = self.db.NewIterator()
 | 
						|
		it.Seek(key)
 | 
						|
		if !it.Valid() {
 | 
						|
			copy(key, self.start)
 | 
						|
			useBatches = true
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		del = new(leveldb.Batch)
 | 
						|
		glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt)
 | 
						|
 | 
						|
		for n = 0; !useBatches || n < cnt; it.Next() {
 | 
						|
			copy(key, it.Key())
 | 
						|
			if len(key) == 0 || key[0] != 0 {
 | 
						|
				copy(key, self.start)
 | 
						|
				useBatches = true
 | 
						|
				break
 | 
						|
			}
 | 
						|
			val := make([]byte, 40)
 | 
						|
			copy(val, it.Value())
 | 
						|
			entry = &syncDbEntry{key, val}
 | 
						|
			// glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total)
 | 
						|
			more = fun(entry, self.quit)
 | 
						|
			if !more {
 | 
						|
				// quit received when waiting to deliver entry, the entry will not be deleted
 | 
						|
				glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt)
 | 
						|
				break
 | 
						|
			}
 | 
						|
			// since subsequent batches of the same db session are indexed incrementally
 | 
						|
			// deleting earlier batches can be delayed and parallelised
 | 
						|
			// this could be batch delete when db is idle (but added complexity esp when quitting)
 | 
						|
			del.Delete(key)
 | 
						|
			n++
 | 
						|
			total++
 | 
						|
		}
 | 
						|
		glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total)
 | 
						|
		self.db.Write(del) // this could be async called only when db is idle
 | 
						|
		it.Release()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
//
 | 
						|
func (self *syncDb) stop() {
 | 
						|
	close(self.quit)
 | 
						|
	<-self.done
 | 
						|
}
 | 
						|
 | 
						|
// calculate a dbkey for the request, for the db to work
 | 
						|
// see syncdb for db key structure
 | 
						|
// polimorphic: accepted types, see syncer#addRequest
 | 
						|
func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
 | 
						|
	var key storage.Key
 | 
						|
	var chunk *storage.Chunk
 | 
						|
	var id uint64
 | 
						|
	var ok bool
 | 
						|
	var sreq *storeRequestMsgData
 | 
						|
 | 
						|
	if key, ok = req.(storage.Key); ok {
 | 
						|
		id = generateId()
 | 
						|
	} else if chunk, ok = req.(*storage.Chunk); ok {
 | 
						|
		key = chunk.Key
 | 
						|
		id = generateId()
 | 
						|
	} else if sreq, ok = req.(*storeRequestMsgData); ok {
 | 
						|
		key = sreq.Key
 | 
						|
		id = sreq.Id
 | 
						|
	} else if entry, ok = req.(*syncDbEntry); !ok {
 | 
						|
		return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
 | 
						|
	}
 | 
						|
 | 
						|
	// order by peer > priority > seqid
 | 
						|
	// value is request id if exists
 | 
						|
	if entry == nil {
 | 
						|
		dbkey := make([]byte, 42)
 | 
						|
		dbval := make([]byte, 40)
 | 
						|
 | 
						|
		// encode key
 | 
						|
		copy(dbkey[:], self.start[:34]) // db  peer
 | 
						|
		binary.BigEndian.PutUint64(dbkey[34:], counter)
 | 
						|
		// encode value
 | 
						|
		copy(dbval, key[:])
 | 
						|
		binary.BigEndian.PutUint64(dbval[32:], id)
 | 
						|
 | 
						|
		entry = &syncDbEntry{dbkey, dbval}
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 |