* build: enable unconvert linter - fixes #15453 - update code base for failing cases * cmd/puppeth: replace syscall.Stdin with os.Stdin.Fd() for unconvert linter
		
			
				
	
	
		
			777 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			777 lines
		
	
	
		
			25 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2016 The go-ethereum Authors
 | 
						|
// This file is part of the go-ethereum library.
 | 
						|
//
 | 
						|
// The go-ethereum library is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Lesser General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// The go-ethereum library is distributed in the hope that it will be useful,
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 | 
						|
// GNU Lesser General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Lesser General Public License
 | 
						|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package network
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/binary"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"path/filepath"
 | 
						|
 | 
						|
	"github.com/ethereum/go-ethereum/log"
 | 
						|
	"github.com/ethereum/go-ethereum/swarm/storage"
 | 
						|
)
 | 
						|
 | 
						|
// syncer parameters (global, not peer specific) default values
 | 
						|
const (
 | 
						|
	requestDbBatchSize = 512  // size of batch before written to request db
 | 
						|
	keyBufferSize      = 1024 // size of buffer  for unsynced keys
 | 
						|
	syncBatchSize      = 128  // maximum batchsize for outgoing requests
 | 
						|
	syncBufferSize     = 128  // size of buffer  for delivery requests
 | 
						|
	syncCacheSize      = 1024 // cache capacity to store request queue in memory
 | 
						|
)
 | 
						|
 | 
						|
// priorities
 | 
						|
const (
 | 
						|
	Low        = iota // 0
 | 
						|
	Medium            // 1
 | 
						|
	High              // 2
 | 
						|
	priorities        // 3 number of priority levels
 | 
						|
)
 | 
						|
 | 
						|
// request types
 | 
						|
const (
 | 
						|
	DeliverReq   = iota // 0
 | 
						|
	PushReq             // 1
 | 
						|
	PropagateReq        // 2
 | 
						|
	HistoryReq          // 3
 | 
						|
	BacklogReq          // 4
 | 
						|
)
 | 
						|
 | 
						|
// json serialisable struct to record the syncronisation state between 2 peers
 | 
						|
type syncState struct {
 | 
						|
	*storage.DbSyncState // embeds the following 4 fields:
 | 
						|
	// Start      Key    // lower limit of address space
 | 
						|
	// Stop       Key    // upper limit of address space
 | 
						|
	// First      uint64 // counter taken from last sync state
 | 
						|
	// Last       uint64 // counter of remote peer dbStore at the time of last connection
 | 
						|
	SessionAt  uint64      // set at the time of connection
 | 
						|
	LastSeenAt uint64      // set at the time of connection
 | 
						|
	Latest     storage.Key // cursor of dbstore when last (continuously set by syncer)
 | 
						|
	Synced     bool        // true iff Sync is done up to the last disconnect
 | 
						|
	synced     chan bool   // signal that sync stage finished
 | 
						|
}
 | 
						|
 | 
						|
// wrapper of db-s to provide mockable custom local chunk store access to syncer
 | 
						|
type DbAccess struct {
 | 
						|
	db  *storage.DbStore
 | 
						|
	loc *storage.LocalStore
 | 
						|
}
 | 
						|
 | 
						|
func NewDbAccess(loc *storage.LocalStore) *DbAccess {
 | 
						|
	return &DbAccess{loc.DbStore.(*storage.DbStore), loc}
 | 
						|
}
 | 
						|
 | 
						|
// to obtain the chunks from key or request db entry only
 | 
						|
func (self *DbAccess) get(key storage.Key) (*storage.Chunk, error) {
 | 
						|
	return self.loc.Get(key)
 | 
						|
}
 | 
						|
 | 
						|
// current storage counter of chunk db
 | 
						|
func (self *DbAccess) counter() uint64 {
 | 
						|
	return self.db.Counter()
 | 
						|
}
 | 
						|
 | 
						|
// implemented by dbStoreSyncIterator
 | 
						|
type keyIterator interface {
 | 
						|
	Next() storage.Key
 | 
						|
}
 | 
						|
 | 
						|
// generator function for iteration by address range and storage counter
 | 
						|
func (self *DbAccess) iterator(s *syncState) keyIterator {
 | 
						|
	it, err := self.db.NewSyncIterator(*(s.DbSyncState))
 | 
						|
	if err != nil {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	return keyIterator(it)
 | 
						|
}
 | 
						|
 | 
						|
func (self syncState) String() string {
 | 
						|
	if self.Synced {
 | 
						|
		return fmt.Sprintf(
 | 
						|
			"session started at: %v, last seen at: %v, latest key: %v",
 | 
						|
			self.SessionAt, self.LastSeenAt,
 | 
						|
			self.Latest.Log(),
 | 
						|
		)
 | 
						|
	} else {
 | 
						|
		return fmt.Sprintf(
 | 
						|
			"address: %v-%v, index: %v-%v, session started at: %v, last seen at: %v, latest key: %v",
 | 
						|
			self.Start.Log(), self.Stop.Log(),
 | 
						|
			self.First, self.Last,
 | 
						|
			self.SessionAt, self.LastSeenAt,
 | 
						|
			self.Latest.Log(),
 | 
						|
		)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// syncer parameters (global, not peer specific)
 | 
						|
type SyncParams struct {
 | 
						|
	RequestDbPath      string // path for request db (leveldb)
 | 
						|
	RequestDbBatchSize uint   // nuber of items before batch is saved to requestdb
 | 
						|
	KeyBufferSize      uint   // size of key buffer
 | 
						|
	SyncBatchSize      uint   // maximum batchsize for outgoing requests
 | 
						|
	SyncBufferSize     uint   // size of buffer for
 | 
						|
	SyncCacheSize      uint   // cache capacity to store request queue in memory
 | 
						|
	SyncPriorities     []uint // list of priority levels for req types 0-3
 | 
						|
	SyncModes          []bool // list of sync modes for  for req types 0-3
 | 
						|
}
 | 
						|
 | 
						|
// constructor with default values
 | 
						|
func NewSyncParams(bzzdir string) *SyncParams {
 | 
						|
	return &SyncParams{
 | 
						|
		RequestDbPath:      filepath.Join(bzzdir, "requests"),
 | 
						|
		RequestDbBatchSize: requestDbBatchSize,
 | 
						|
		KeyBufferSize:      keyBufferSize,
 | 
						|
		SyncBufferSize:     syncBufferSize,
 | 
						|
		SyncBatchSize:      syncBatchSize,
 | 
						|
		SyncCacheSize:      syncCacheSize,
 | 
						|
		SyncPriorities:     []uint{High, Medium, Medium, Low, Low},
 | 
						|
		SyncModes:          []bool{true, true, true, true, false},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// syncer is the agent that manages content distribution/storage replication/chunk storeRequest forwarding
 | 
						|
type syncer struct {
 | 
						|
	*SyncParams                     // sync parameters
 | 
						|
	syncF           func() bool     // if syncing is needed
 | 
						|
	key             storage.Key     // remote peers address key
 | 
						|
	state           *syncState      // sync state for our dbStore
 | 
						|
	syncStates      chan *syncState // different stages of sync
 | 
						|
	deliveryRequest chan bool       // one of two triggers needed to send unsyncedKeys
 | 
						|
	newUnsyncedKeys chan bool       // one of two triggers needed to send unsynced keys
 | 
						|
	quit            chan bool       // signal to quit loops
 | 
						|
 | 
						|
	// DB related fields
 | 
						|
	dbAccess *DbAccess // access to dbStore
 | 
						|
 | 
						|
	// native fields
 | 
						|
	queues     [priorities]*syncDb                   // in-memory cache / queues for sync reqs
 | 
						|
	keys       [priorities]chan interface{}          // buffer for unsynced keys
 | 
						|
	deliveries [priorities]chan *storeRequestMsgData // delivery
 | 
						|
 | 
						|
	// bzz protocol instance outgoing message callbacks (mockable for testing)
 | 
						|
	unsyncedKeys func([]*syncRequest, *syncState) error // send unsyncedKeysMsg
 | 
						|
	store        func(*storeRequestMsgData) error       // send storeRequestMsg
 | 
						|
}
 | 
						|
 | 
						|
// a syncer instance is linked to each peer connection
 | 
						|
// constructor is called from protocol after successful handshake
 | 
						|
// the returned instance is attached to the peer and can be called
 | 
						|
// by the forwarder
 | 
						|
func newSyncer(
 | 
						|
	db *storage.LDBDatabase, remotekey storage.Key,
 | 
						|
	dbAccess *DbAccess,
 | 
						|
	unsyncedKeys func([]*syncRequest, *syncState) error,
 | 
						|
	store func(*storeRequestMsgData) error,
 | 
						|
	params *SyncParams,
 | 
						|
	state *syncState,
 | 
						|
	syncF func() bool,
 | 
						|
) (*syncer, error) {
 | 
						|
 | 
						|
	syncBufferSize := params.SyncBufferSize
 | 
						|
	keyBufferSize := params.KeyBufferSize
 | 
						|
	dbBatchSize := params.RequestDbBatchSize
 | 
						|
 | 
						|
	self := &syncer{
 | 
						|
		syncF:           syncF,
 | 
						|
		key:             remotekey,
 | 
						|
		dbAccess:        dbAccess,
 | 
						|
		syncStates:      make(chan *syncState, 20),
 | 
						|
		deliveryRequest: make(chan bool, 1),
 | 
						|
		newUnsyncedKeys: make(chan bool, 1),
 | 
						|
		SyncParams:      params,
 | 
						|
		state:           state,
 | 
						|
		quit:            make(chan bool),
 | 
						|
		unsyncedKeys:    unsyncedKeys,
 | 
						|
		store:           store,
 | 
						|
	}
 | 
						|
 | 
						|
	// initialising
 | 
						|
	for i := 0; i < priorities; i++ {
 | 
						|
		self.keys[i] = make(chan interface{}, keyBufferSize)
 | 
						|
		self.deliveries[i] = make(chan *storeRequestMsgData)
 | 
						|
		// initialise a syncdb instance for each priority queue
 | 
						|
		self.queues[i] = newSyncDb(db, remotekey, uint(i), syncBufferSize, dbBatchSize, self.deliver(uint(i)))
 | 
						|
	}
 | 
						|
	log.Info(fmt.Sprintf("syncer started: %v", state))
 | 
						|
	// launch chunk delivery service
 | 
						|
	go self.syncDeliveries()
 | 
						|
	// launch sync task manager
 | 
						|
	if self.syncF() {
 | 
						|
		go self.sync()
 | 
						|
	}
 | 
						|
	// process unsynced keys to broadcast
 | 
						|
	go self.syncUnsyncedKeys()
 | 
						|
 | 
						|
	return self, nil
 | 
						|
}
 | 
						|
 | 
						|
// metadata serialisation
 | 
						|
func encodeSync(state *syncState) (*json.RawMessage, error) {
 | 
						|
	data, err := json.MarshalIndent(state, "", " ")
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	meta := json.RawMessage(data)
 | 
						|
	return &meta, nil
 | 
						|
}
 | 
						|
 | 
						|
func decodeSync(meta *json.RawMessage) (*syncState, error) {
 | 
						|
	if meta == nil {
 | 
						|
		return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
 | 
						|
	}
 | 
						|
	data := []byte(*(meta))
 | 
						|
	if len(data) == 0 {
 | 
						|
		return nil, fmt.Errorf("unable to deserialise sync state from <nil>")
 | 
						|
	}
 | 
						|
	state := &syncState{DbSyncState: &storage.DbSyncState{}}
 | 
						|
	err := json.Unmarshal(data, state)
 | 
						|
	return state, err
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 sync implements the syncing script
 | 
						|
 * first all items left in the request Db are replayed
 | 
						|
   * type = StaleSync
 | 
						|
   * Mode: by default once again via confirmation roundtrip
 | 
						|
   * Priority: the items are replayed as the proirity specified for StaleSync
 | 
						|
   * but within the order respects earlier priority level of request
 | 
						|
 * after all items are consumed for a priority level, the the respective
 | 
						|
  queue for delivery requests is open (this way new reqs not written to db)
 | 
						|
  (TODO: this should be checked)
 | 
						|
 * the sync state provided by the remote peer is used to sync history
 | 
						|
   * all the backlog from earlier (aborted) syncing is completed starting from latest
 | 
						|
   * if Last  < LastSeenAt then all items in between then process all
 | 
						|
     backlog from upto last disconnect
 | 
						|
   * if Last > 0 &&
 | 
						|
 | 
						|
 sync is called from the syncer constructor and is not supposed to be used externally
 | 
						|
*/
 | 
						|
func (self *syncer) sync() {
 | 
						|
	state := self.state
 | 
						|
	// sync finished
 | 
						|
	defer close(self.syncStates)
 | 
						|
 | 
						|
	// 0. first replay stale requests from request db
 | 
						|
	if state.SessionAt == 0 {
 | 
						|
		log.Debug(fmt.Sprintf("syncer[%v]: nothing to sync", self.key.Log()))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	log.Debug(fmt.Sprintf("syncer[%v]: start replaying stale requests from request db", self.key.Log()))
 | 
						|
	for p := priorities - 1; p >= 0; p-- {
 | 
						|
		self.queues[p].dbRead(false, 0, self.replay())
 | 
						|
	}
 | 
						|
	log.Debug(fmt.Sprintf("syncer[%v]: done replaying stale requests from request db", self.key.Log()))
 | 
						|
 | 
						|
	// unless peer is synced sync unfinished history beginning on
 | 
						|
	if !state.Synced {
 | 
						|
		start := state.Start
 | 
						|
 | 
						|
		if !storage.IsZeroKey(state.Latest) {
 | 
						|
			// 1. there is unfinished earlier sync
 | 
						|
			state.Start = state.Latest
 | 
						|
			log.Debug(fmt.Sprintf("syncer[%v]: start syncronising backlog (unfinished sync: %v)", self.key.Log(), state))
 | 
						|
			// blocks while the entire history upto state is synced
 | 
						|
			self.syncState(state)
 | 
						|
			if state.Last < state.SessionAt {
 | 
						|
				state.First = state.Last + 1
 | 
						|
			}
 | 
						|
		}
 | 
						|
		state.Latest = storage.ZeroKey
 | 
						|
		state.Start = start
 | 
						|
		// 2. sync up to last disconnect1
 | 
						|
		if state.First < state.LastSeenAt {
 | 
						|
			state.Last = state.LastSeenAt
 | 
						|
			log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history upto last disconnect at %v: %v", self.key.Log(), state.LastSeenAt, state))
 | 
						|
			self.syncState(state)
 | 
						|
			state.First = state.LastSeenAt
 | 
						|
		}
 | 
						|
		state.Latest = storage.ZeroKey
 | 
						|
 | 
						|
	} else {
 | 
						|
		// synchronisation starts at end of last session
 | 
						|
		state.First = state.LastSeenAt
 | 
						|
	}
 | 
						|
 | 
						|
	// 3. sync up to current session start
 | 
						|
	// if there have been new chunks since last session
 | 
						|
	if state.LastSeenAt < state.SessionAt {
 | 
						|
		state.Last = state.SessionAt
 | 
						|
		log.Debug(fmt.Sprintf("syncer[%v]: start syncronising history since last disconnect at %v up until session start at %v: %v", self.key.Log(), state.LastSeenAt, state.SessionAt, state))
 | 
						|
		// blocks until state syncing is finished
 | 
						|
		self.syncState(state)
 | 
						|
	}
 | 
						|
	log.Info(fmt.Sprintf("syncer[%v]: syncing all history complete", self.key.Log()))
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
// wait till syncronised block uptil state is synced
 | 
						|
func (self *syncer) syncState(state *syncState) {
 | 
						|
	self.syncStates <- state
 | 
						|
	select {
 | 
						|
	case <-state.synced:
 | 
						|
	case <-self.quit:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// stop quits both request processor and saves the request cache to disk
 | 
						|
func (self *syncer) stop() {
 | 
						|
	close(self.quit)
 | 
						|
	log.Trace(fmt.Sprintf("syncer[%v]: stop and save sync request db backlog", self.key.Log()))
 | 
						|
	for _, db := range self.queues {
 | 
						|
		db.stop()
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// rlp serialisable sync request
 | 
						|
type syncRequest struct {
 | 
						|
	Key      storage.Key
 | 
						|
	Priority uint
 | 
						|
}
 | 
						|
 | 
						|
func (self *syncRequest) String() string {
 | 
						|
	return fmt.Sprintf("<Key: %v, Priority: %v>", self.Key.Log(), self.Priority)
 | 
						|
}
 | 
						|
 | 
						|
func (self *syncer) newSyncRequest(req interface{}, p int) (*syncRequest, error) {
 | 
						|
	key, _, _, _, err := parseRequest(req)
 | 
						|
	// TODO: if req has chunk, it should be put in a cache
 | 
						|
	// create
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
	return &syncRequest{key, uint(p)}, nil
 | 
						|
}
 | 
						|
 | 
						|
// serves historical items from the DB
 | 
						|
// * read is on demand, blocking unless history channel is read
 | 
						|
// * accepts sync requests (syncStates) to create new db iterator
 | 
						|
// * closes the channel one iteration finishes
 | 
						|
func (self *syncer) syncHistory(state *syncState) chan interface{} {
 | 
						|
	var n uint
 | 
						|
	history := make(chan interface{})
 | 
						|
	log.Debug(fmt.Sprintf("syncer[%v]: syncing history between %v - %v for chunk addresses %v - %v", self.key.Log(), state.First, state.Last, state.Start, state.Stop))
 | 
						|
	it := self.dbAccess.iterator(state)
 | 
						|
	if it != nil {
 | 
						|
		go func() {
 | 
						|
			// signal end of the iteration ended
 | 
						|
			defer close(history)
 | 
						|
		IT:
 | 
						|
			for {
 | 
						|
				key := it.Next()
 | 
						|
				if key == nil {
 | 
						|
					break IT
 | 
						|
				}
 | 
						|
				select {
 | 
						|
				// blocking until history channel is read from
 | 
						|
				case history <- key:
 | 
						|
					n++
 | 
						|
					log.Trace(fmt.Sprintf("syncer[%v]: history: %v (%v keys)", self.key.Log(), key.Log(), n))
 | 
						|
					state.Latest = key
 | 
						|
				case <-self.quit:
 | 
						|
					return
 | 
						|
				}
 | 
						|
			}
 | 
						|
			log.Debug(fmt.Sprintf("syncer[%v]: finished syncing history between %v - %v for chunk addresses %v - %v (at %v) (chunks = %v)", self.key.Log(), state.First, state.Last, state.Start, state.Stop, state.Latest, n))
 | 
						|
		}()
 | 
						|
	}
 | 
						|
	return history
 | 
						|
}
 | 
						|
 | 
						|
// triggers key syncronisation
 | 
						|
func (self *syncer) sendUnsyncedKeys() {
 | 
						|
	select {
 | 
						|
	case self.deliveryRequest <- true:
 | 
						|
	default:
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// assembles a new batch of unsynced keys
 | 
						|
// * keys are drawn from the key buffers in order of priority queue
 | 
						|
// * if the queues of priority for History (HistoryReq) or higher are depleted,
 | 
						|
//   historical data is used so historical items are lower priority within
 | 
						|
//   their priority group.
 | 
						|
// * Order of historical data is unspecified
 | 
						|
func (self *syncer) syncUnsyncedKeys() {
 | 
						|
	// send out new
 | 
						|
	var unsynced []*syncRequest
 | 
						|
	var more, justSynced bool
 | 
						|
	var keyCount, historyCnt int
 | 
						|
	var history chan interface{}
 | 
						|
 | 
						|
	priority := High
 | 
						|
	keys := self.keys[priority]
 | 
						|
	var newUnsyncedKeys, deliveryRequest chan bool
 | 
						|
	keyCounts := make([]int, priorities)
 | 
						|
	histPrior := self.SyncPriorities[HistoryReq]
 | 
						|
	syncStates := self.syncStates
 | 
						|
	state := self.state
 | 
						|
 | 
						|
LOOP:
 | 
						|
	for {
 | 
						|
 | 
						|
		var req interface{}
 | 
						|
		// select the highest priority channel to read from
 | 
						|
		// keys channels are buffered so the highest priority ones
 | 
						|
		// are checked first - integrity can only be guaranteed if writing
 | 
						|
		// is locked while selecting
 | 
						|
		if priority != High || len(keys) == 0 {
 | 
						|
			// selection is not needed if the High priority queue has items
 | 
						|
			keys = nil
 | 
						|
		PRIORITIES:
 | 
						|
			for priority = High; priority >= 0; priority-- {
 | 
						|
				// the first priority channel that is non-empty will be assigned to keys
 | 
						|
				if len(self.keys[priority]) > 0 {
 | 
						|
					log.Trace(fmt.Sprintf("syncer[%v]: reading request with	priority %v", self.key.Log(), priority))
 | 
						|
					keys = self.keys[priority]
 | 
						|
					break PRIORITIES
 | 
						|
				}
 | 
						|
				log.Trace(fmt.Sprintf("syncer[%v/%v]: queue: [%v, %v, %v]", self.key.Log(), priority, len(self.keys[High]), len(self.keys[Medium]), len(self.keys[Low])))
 | 
						|
				// if the input queue is empty on this level, resort to history if there is any
 | 
						|
				if uint(priority) == histPrior && history != nil {
 | 
						|
					log.Trace(fmt.Sprintf("syncer[%v]: reading history for %v", self.key.Log(), self.key))
 | 
						|
					keys = history
 | 
						|
					break PRIORITIES
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		// if peer ready to receive but nothing to send
 | 
						|
		if keys == nil && deliveryRequest == nil {
 | 
						|
			// if no items left and switch to waiting mode
 | 
						|
			log.Trace(fmt.Sprintf("syncer[%v]: buffers consumed. Waiting", self.key.Log()))
 | 
						|
			newUnsyncedKeys = self.newUnsyncedKeys
 | 
						|
		}
 | 
						|
 | 
						|
		// send msg iff
 | 
						|
		// * peer is ready to receive keys AND (
 | 
						|
		// * all queues and history are depleted OR
 | 
						|
		// * batch full OR
 | 
						|
		// * all history have been consumed, synced)
 | 
						|
		if deliveryRequest == nil &&
 | 
						|
			(justSynced ||
 | 
						|
				len(unsynced) > 0 && keys == nil ||
 | 
						|
				len(unsynced) == int(self.SyncBatchSize)) {
 | 
						|
			justSynced = false
 | 
						|
			// listen to requests
 | 
						|
			deliveryRequest = self.deliveryRequest
 | 
						|
			newUnsyncedKeys = nil // not care about data until next req comes in
 | 
						|
			// set sync to current counter
 | 
						|
			// (all nonhistorical outgoing traffic sheduled and persisted
 | 
						|
			state.LastSeenAt = self.dbAccess.counter()
 | 
						|
			state.Latest = storage.ZeroKey
 | 
						|
			log.Trace(fmt.Sprintf("syncer[%v]: sending %v", self.key.Log(), unsynced))
 | 
						|
			//  send the unsynced keys
 | 
						|
			stateCopy := *state
 | 
						|
			err := self.unsyncedKeys(unsynced, &stateCopy)
 | 
						|
			if err != nil {
 | 
						|
				log.Warn(fmt.Sprintf("syncer[%v]: unable to send unsynced keys: %v", self.key.Log(), err))
 | 
						|
			}
 | 
						|
			self.state = state
 | 
						|
			log.Debug(fmt.Sprintf("syncer[%v]: --> %v keys sent: (total: %v (%v), history: %v), sent sync state: %v", self.key.Log(), len(unsynced), keyCounts, keyCount, historyCnt, stateCopy))
 | 
						|
			unsynced = nil
 | 
						|
			keys = nil
 | 
						|
		}
 | 
						|
 | 
						|
		// process item and add it to the batch
 | 
						|
		select {
 | 
						|
		case <-self.quit:
 | 
						|
			break LOOP
 | 
						|
		case req, more = <-keys:
 | 
						|
			if keys == history && !more {
 | 
						|
				log.Trace(fmt.Sprintf("syncer[%v]: syncing history segment complete", self.key.Log()))
 | 
						|
				// history channel is closed, waiting for new state (called from sync())
 | 
						|
				syncStates = self.syncStates
 | 
						|
				state.Synced = true // this signals that the  current segment is complete
 | 
						|
				select {
 | 
						|
				case state.synced <- false:
 | 
						|
				case <-self.quit:
 | 
						|
					break LOOP
 | 
						|
				}
 | 
						|
				justSynced = true
 | 
						|
				history = nil
 | 
						|
			}
 | 
						|
		case <-deliveryRequest:
 | 
						|
			log.Trace(fmt.Sprintf("syncer[%v]: peer ready to receive", self.key.Log()))
 | 
						|
 | 
						|
			// this 1 cap channel can wake up the loop
 | 
						|
			// signaling that peer is ready to receive unsynced Keys
 | 
						|
			// the channel is set to nil any further writes will be ignored
 | 
						|
			deliveryRequest = nil
 | 
						|
 | 
						|
		case <-newUnsyncedKeys:
 | 
						|
			log.Trace(fmt.Sprintf("syncer[%v]: new unsynced keys available", self.key.Log()))
 | 
						|
			// this 1 cap channel can wake up the loop
 | 
						|
			// signals that data is available to send if peer is ready to receive
 | 
						|
			newUnsyncedKeys = nil
 | 
						|
			keys = self.keys[High]
 | 
						|
 | 
						|
		case state, more = <-syncStates:
 | 
						|
			// this resets the state
 | 
						|
			if !more {
 | 
						|
				state = self.state
 | 
						|
				log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing complete upto %v)", self.key.Log(), priority, state))
 | 
						|
				state.Synced = true
 | 
						|
				syncStates = nil
 | 
						|
			} else {
 | 
						|
				log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) syncing history upto %v priority %v)", self.key.Log(), priority, state, histPrior))
 | 
						|
				state.Synced = false
 | 
						|
				history = self.syncHistory(state)
 | 
						|
				// only one history at a time, only allow another one once the
 | 
						|
				// history channel is closed
 | 
						|
				syncStates = nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if req == nil {
 | 
						|
			continue LOOP
 | 
						|
		}
 | 
						|
 | 
						|
		log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) added to unsynced keys: %v", self.key.Log(), priority, req))
 | 
						|
		keyCounts[priority]++
 | 
						|
		keyCount++
 | 
						|
		if keys == history {
 | 
						|
			log.Trace(fmt.Sprintf("syncer[%v]: (priority %v) history item %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
 | 
						|
			historyCnt++
 | 
						|
		}
 | 
						|
		if sreq, err := self.newSyncRequest(req, priority); err == nil {
 | 
						|
			// extract key from req
 | 
						|
			log.Trace(fmt.Sprintf("syncer[%v]: (priority %v): request %v (synced = %v)", self.key.Log(), priority, req, state.Synced))
 | 
						|
			unsynced = append(unsynced, sreq)
 | 
						|
		} else {
 | 
						|
			log.Warn(fmt.Sprintf("syncer[%v]: (priority %v): error creating request for %v: %v)", self.key.Log(), priority, req, err))
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// delivery loop
 | 
						|
// takes into account priority, send store Requests with chunk (delivery)
 | 
						|
// idle blocking if no new deliveries in any of the queues
 | 
						|
func (self *syncer) syncDeliveries() {
 | 
						|
	var req *storeRequestMsgData
 | 
						|
	p := High
 | 
						|
	var deliveries chan *storeRequestMsgData
 | 
						|
	var msg *storeRequestMsgData
 | 
						|
	var err error
 | 
						|
	var c = [priorities]int{}
 | 
						|
	var n = [priorities]int{}
 | 
						|
	var total, success uint
 | 
						|
 | 
						|
	for {
 | 
						|
		deliveries = self.deliveries[p]
 | 
						|
		select {
 | 
						|
		case req = <-deliveries:
 | 
						|
			n[p]++
 | 
						|
			c[p]++
 | 
						|
		default:
 | 
						|
			if p == Low {
 | 
						|
				// blocking, depletion on all channels, no preference for priority
 | 
						|
				select {
 | 
						|
				case req = <-self.deliveries[High]:
 | 
						|
					n[High]++
 | 
						|
				case req = <-self.deliveries[Medium]:
 | 
						|
					n[Medium]++
 | 
						|
				case req = <-self.deliveries[Low]:
 | 
						|
					n[Low]++
 | 
						|
				case <-self.quit:
 | 
						|
					return
 | 
						|
				}
 | 
						|
				p = High
 | 
						|
			} else {
 | 
						|
				p--
 | 
						|
				continue
 | 
						|
			}
 | 
						|
		}
 | 
						|
		total++
 | 
						|
		msg, err = self.newStoreRequestMsgData(req)
 | 
						|
		if err != nil {
 | 
						|
			log.Warn(fmt.Sprintf("syncer[%v]: failed to create store request for %v: %v", self.key.Log(), req, err))
 | 
						|
		} else {
 | 
						|
			err = self.store(msg)
 | 
						|
			if err != nil {
 | 
						|
				log.Warn(fmt.Sprintf("syncer[%v]: failed to deliver %v: %v", self.key.Log(), req, err))
 | 
						|
			} else {
 | 
						|
				success++
 | 
						|
				log.Trace(fmt.Sprintf("syncer[%v]: %v successfully delivered", self.key.Log(), req))
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if total%self.SyncBatchSize == 0 {
 | 
						|
			log.Debug(fmt.Sprintf("syncer[%v]: deliver Total: %v, Success: %v, High: %v/%v, Medium: %v/%v, Low %v/%v", self.key.Log(), total, success, c[High], n[High], c[Medium], n[Medium], c[Low], n[Low]))
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 addRequest handles requests for delivery
 | 
						|
 it accepts 4 types:
 | 
						|
 | 
						|
 * storeRequestMsgData: coming from netstore propagate response
 | 
						|
 * chunk: coming from forwarding (questionable: id?)
 | 
						|
 * key: from incoming syncRequest
 | 
						|
 * syncDbEntry: key,id encoded in db
 | 
						|
 | 
						|
 If sync mode is on for the type of request, then
 | 
						|
 it sends the request to the keys queue of the correct priority
 | 
						|
 channel buffered with capacity (SyncBufferSize)
 | 
						|
 | 
						|
 If sync mode is off then, requests are directly sent to deliveries
 | 
						|
*/
 | 
						|
func (self *syncer) addRequest(req interface{}, ty int) {
 | 
						|
	// retrieve priority for request type name int8
 | 
						|
 | 
						|
	priority := self.SyncPriorities[ty]
 | 
						|
	// sync mode for this type ON
 | 
						|
	if self.syncF() || ty == DeliverReq {
 | 
						|
		if self.SyncModes[ty] {
 | 
						|
			self.addKey(req, priority, self.quit)
 | 
						|
		} else {
 | 
						|
			self.addDelivery(req, priority, self.quit)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// addKey queues sync request for sync confirmation with given priority
 | 
						|
// ie the key will go out in an unsyncedKeys message
 | 
						|
func (self *syncer) addKey(req interface{}, priority uint, quit chan bool) bool {
 | 
						|
	select {
 | 
						|
	case self.keys[priority] <- req:
 | 
						|
		// this wakes up the unsynced keys loop if idle
 | 
						|
		select {
 | 
						|
		case self.newUnsyncedKeys <- true:
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		return true
 | 
						|
	case <-quit:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// addDelivery queues delivery request for with given priority
 | 
						|
// ie the chunk will be delivered ASAP mod priority queueing handled by syncdb
 | 
						|
// requests are persisted across sessions for correct sync
 | 
						|
func (self *syncer) addDelivery(req interface{}, priority uint, quit chan bool) bool {
 | 
						|
	select {
 | 
						|
	case self.queues[priority].buffer <- req:
 | 
						|
		return true
 | 
						|
	case <-quit:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// doDelivery delivers the chunk for the request with given priority
 | 
						|
// without queuing
 | 
						|
func (self *syncer) doDelivery(req interface{}, priority uint, quit chan bool) bool {
 | 
						|
	msgdata, err := self.newStoreRequestMsgData(req)
 | 
						|
	if err != nil {
 | 
						|
		log.Warn(fmt.Sprintf("unable to deliver request %v: %v", msgdata, err))
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	select {
 | 
						|
	case self.deliveries[priority] <- msgdata:
 | 
						|
		return true
 | 
						|
	case <-quit:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// returns the delivery function for given priority
 | 
						|
// passed on to syncDb
 | 
						|
func (self *syncer) deliver(priority uint) func(req interface{}, quit chan bool) bool {
 | 
						|
	return func(req interface{}, quit chan bool) bool {
 | 
						|
		return self.doDelivery(req, priority, quit)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// returns the replay function passed on to syncDb
 | 
						|
// depending on sync mode settings for BacklogReq,
 | 
						|
// re	play of request db backlog sends items via confirmation
 | 
						|
// or directly delivers
 | 
						|
func (self *syncer) replay() func(req interface{}, quit chan bool) bool {
 | 
						|
	sync := self.SyncModes[BacklogReq]
 | 
						|
	priority := self.SyncPriorities[BacklogReq]
 | 
						|
	// sync mode for this type ON
 | 
						|
	if sync {
 | 
						|
		return func(req interface{}, quit chan bool) bool {
 | 
						|
			return self.addKey(req, priority, quit)
 | 
						|
		}
 | 
						|
	} else {
 | 
						|
		return func(req interface{}, quit chan bool) bool {
 | 
						|
			return self.doDelivery(req, priority, quit)
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// given a request, extends it to a full storeRequestMsgData
 | 
						|
// polimorphic: see addRequest for the types accepted
 | 
						|
func (self *syncer) newStoreRequestMsgData(req interface{}) (*storeRequestMsgData, error) {
 | 
						|
 | 
						|
	key, id, chunk, sreq, err := parseRequest(req)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	if sreq == nil {
 | 
						|
		if chunk == nil {
 | 
						|
			var err error
 | 
						|
			chunk, err = self.dbAccess.get(key)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		sreq = &storeRequestMsgData{
 | 
						|
			Id:    id,
 | 
						|
			Key:   chunk.Key,
 | 
						|
			SData: chunk.SData,
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return sreq, nil
 | 
						|
}
 | 
						|
 | 
						|
// parse request types and extracts, key, id, chunk, request if available
 | 
						|
// does not do chunk lookup !
 | 
						|
func parseRequest(req interface{}) (storage.Key, uint64, *storage.Chunk, *storeRequestMsgData, error) {
 | 
						|
	var key storage.Key
 | 
						|
	var entry *syncDbEntry
 | 
						|
	var chunk *storage.Chunk
 | 
						|
	var id uint64
 | 
						|
	var ok bool
 | 
						|
	var sreq *storeRequestMsgData
 | 
						|
	var err error
 | 
						|
 | 
						|
	if key, ok = req.(storage.Key); ok {
 | 
						|
		id = generateId()
 | 
						|
 | 
						|
	} else if entry, ok = req.(*syncDbEntry); ok {
 | 
						|
		id = binary.BigEndian.Uint64(entry.val[32:])
 | 
						|
		key = storage.Key(entry.val[:32])
 | 
						|
 | 
						|
	} else if chunk, ok = req.(*storage.Chunk); ok {
 | 
						|
		key = chunk.Key
 | 
						|
		id = generateId()
 | 
						|
 | 
						|
	} else if sreq, ok = req.(*storeRequestMsgData); ok {
 | 
						|
		key = sreq.Key
 | 
						|
	} else {
 | 
						|
		err = fmt.Errorf("type not allowed: %v (%T)", req, req)
 | 
						|
	}
 | 
						|
 | 
						|
	return key, id, chunk, sreq, err
 | 
						|
}
 |