ethstats: implement block history retrievals
This commit is contained in:
		
							parent
							
								
									13614f4e1c
								
							
						
					
					
						commit
						b2c226cb7d
					
				| @ -42,6 +42,10 @@ import ( | ||||
| 	"golang.org/x/net/websocket" | ||||
| ) | ||||
| 
 | ||||
| // historyUpdateRange is the number of blocks a node should report upon login or
 | ||||
| // history request.
 | ||||
| const historyUpdateRange = 50 | ||||
| 
 | ||||
| // Service implements an Ethereum netstats reporting daemon that pushes local
 | ||||
| // chain statistics up to a monitoring server.
 | ||||
| type Service struct { | ||||
| @ -54,6 +58,9 @@ type Service struct { | ||||
| 	node string // Name of the node to display on the monitoring page
 | ||||
| 	pass string // Password to authorize access to the monitoring page
 | ||||
| 	host string // Remote address of the monitoring service
 | ||||
| 
 | ||||
| 	pongCh chan struct{} // Pong notifications are fed into this channel
 | ||||
| 	histCh chan []uint64 // History request block numbers are fed into this channel
 | ||||
| } | ||||
| 
 | ||||
| // New returns a monitoring service ready for stats reporting.
 | ||||
| @ -66,11 +73,13 @@ func New(url string, ethServ *eth.Ethereum, lesServ *les.LightEthereum) (*Servic | ||||
| 	} | ||||
| 	// Assemble and return the stats service
 | ||||
| 	return &Service{ | ||||
| 		eth:  ethServ, | ||||
| 		les:  lesServ, | ||||
| 		node: parts[1], | ||||
| 		pass: parts[3], | ||||
| 		host: parts[4], | ||||
| 		eth:    ethServ, | ||||
| 		les:    lesServ, | ||||
| 		node:   parts[1], | ||||
| 		pass:   parts[3], | ||||
| 		host:   parts[4], | ||||
| 		pongCh: make(chan struct{}), | ||||
| 		histCh: make(chan []uint64, 1), | ||||
| 	}, nil | ||||
| } | ||||
| 
 | ||||
| @ -135,22 +144,34 @@ func (s *Service) loop() { | ||||
| 			time.Sleep(10 * time.Second) | ||||
| 			continue | ||||
| 		} | ||||
| 		if err = s.report(in, out); err != nil { | ||||
| 		go s.readLoop(conn, in) | ||||
| 
 | ||||
| 		// Send the initial stats so our node looks decent from the get go
 | ||||
| 		if err = s.report(out); err != nil { | ||||
| 			glog.V(logger.Warn).Infof("Initial stats report failed: %v", err) | ||||
| 			conn.Close() | ||||
| 			continue | ||||
| 		} | ||||
| 		if err = s.reportHistory(out, nil); err != nil { | ||||
| 			glog.V(logger.Warn).Infof("History report failed: %v", err) | ||||
| 			conn.Close() | ||||
| 			continue | ||||
| 		} | ||||
| 		// Keep sending status updates until the connection breaks
 | ||||
| 		fullReport := time.NewTicker(15 * time.Second) | ||||
| 
 | ||||
| 		for err == nil { | ||||
| 			select { | ||||
| 			case <-fullReport.C: | ||||
| 				if err = s.report(in, out); err != nil { | ||||
| 				if err = s.report(out); err != nil { | ||||
| 					glog.V(logger.Warn).Infof("Full stats report failed: %v", err) | ||||
| 				} | ||||
| 			case head := <-headSub.Chan(): | ||||
| 				if head == nil { // node stopped
 | ||||
| 			case list := <-s.histCh: | ||||
| 				if err = s.reportHistory(out, list); err != nil { | ||||
| 					glog.V(logger.Warn).Infof("Block history report failed: %v", err) | ||||
| 				} | ||||
| 			case head, ok := <-headSub.Chan(): | ||||
| 				if !ok { // node stopped
 | ||||
| 					conn.Close() | ||||
| 					return | ||||
| 				} | ||||
| @ -160,8 +181,8 @@ func (s *Service) loop() { | ||||
| 				if err = s.reportPending(out); err != nil { | ||||
| 					glog.V(logger.Warn).Infof("Post-block transaction stats report failed: %v", err) | ||||
| 				} | ||||
| 			case ev := <-txSub.Chan(): | ||||
| 				if ev == nil { // node stopped
 | ||||
| 			case _, ok := <-txSub.Chan(): | ||||
| 				if !ok { // node stopped
 | ||||
| 					conn.Close() | ||||
| 					return | ||||
| 				} | ||||
| @ -183,6 +204,76 @@ func (s *Service) loop() { | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // readLoop loops as long as the connection is alive and retrieves data packets
 | ||||
| // from the network socket. If any of them match an active request, it forwards
 | ||||
| // it, if they themselves are requests it initiates a reply, and lastly it drops
 | ||||
| // unknown packets.
 | ||||
| func (s *Service) readLoop(conn *websocket.Conn, in *json.Decoder) { | ||||
| 	// If the read loop exists, close the connection
 | ||||
| 	defer conn.Close() | ||||
| 
 | ||||
| 	for { | ||||
| 		// Retrieve the next generic network packet and bail out on error
 | ||||
| 		var msg map[string][]interface{} | ||||
| 		if err := in.Decode(&msg); err != nil { | ||||
| 			glog.V(logger.Warn).Infof("Failed to decode stats server message: %v", err) | ||||
| 			return | ||||
| 		} | ||||
| 		if len(msg["emit"]) == 0 { | ||||
| 			glog.V(logger.Warn).Infof("Stats server sent non-broadcast: %v", msg) | ||||
| 			return | ||||
| 		} | ||||
| 		command, ok := msg["emit"][0].(string) | ||||
| 		if !ok { | ||||
| 			glog.V(logger.Warn).Infof("Invalid stats server message type: %v", msg["emit"][0]) | ||||
| 			return | ||||
| 		} | ||||
| 		// If the message is a ping reply, deliver (someone must be listening!)
 | ||||
| 		if len(msg["emit"]) == 2 && command == "node-pong" { | ||||
| 			select { | ||||
| 			case s.pongCh <- struct{}{}: | ||||
| 				// Pong delivered, continue listening
 | ||||
| 				continue | ||||
| 			default: | ||||
| 				// Ping routine dead, abort
 | ||||
| 				glog.V(logger.Warn).Infof("Stats server pinger seems to have died") | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 		// If the message is a history request, forward to the event processor
 | ||||
| 		if len(msg["emit"]) == 2 && command == "history" { | ||||
| 			// Make sure the request is valid and doesn't crash us
 | ||||
| 			request, ok := msg["emit"][1].(map[string]interface{}) | ||||
| 			if !ok { | ||||
| 				glog.V(logger.Warn).Infof("Invalid history request: %v", msg["emit"][1]) | ||||
| 				return | ||||
| 			} | ||||
| 			list, ok := request["list"].([]interface{}) | ||||
| 			if !ok { | ||||
| 				glog.V(logger.Warn).Infof("Invalid history block list: %v", request["list"]) | ||||
| 				return | ||||
| 			} | ||||
| 			// Convert the block number list to an integer list
 | ||||
| 			numbers := make([]uint64, len(list)) | ||||
| 			for i, num := range list { | ||||
| 				n, ok := num.(float64) | ||||
| 				if !ok { | ||||
| 					glog.V(logger.Warn).Infof("Invalid history block number: %v", num) | ||||
| 					return | ||||
| 				} | ||||
| 				numbers[i] = uint64(n) | ||||
| 			} | ||||
| 			select { | ||||
| 			case s.histCh <- numbers: | ||||
| 				continue | ||||
| 			default: | ||||
| 			} | ||||
| 		} | ||||
| 		// Report anything else and continue
 | ||||
| 		glog.V(logger.Info).Infof("Unknown stats message: %v", msg) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // nodeInfo is the collection of metainformation about a node that is displayed
 | ||||
| // on the monitoring page.
 | ||||
| type nodeInfo struct { | ||||
| @ -195,6 +286,7 @@ type nodeInfo struct { | ||||
| 	Os       string `json:"os"` | ||||
| 	OsVer    string `json:"os_v"` | ||||
| 	Client   string `json:"client"` | ||||
| 	History  bool   `json:"canUpdateHistory"` | ||||
| } | ||||
| 
 | ||||
| // authMsg is the authentication infos needed to login to a monitoring server.
 | ||||
| @ -229,6 +321,7 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error { | ||||
| 			Os:       runtime.GOOS, | ||||
| 			OsVer:    runtime.GOARCH, | ||||
| 			Client:   "0.1.1", | ||||
| 			History:  true, | ||||
| 		}, | ||||
| 		Secret: s.pass, | ||||
| 	} | ||||
| @ -249,8 +342,8 @@ func (s *Service) login(in *json.Decoder, out *json.Encoder) error { | ||||
| // report collects all possible data to report and send it to the stats server.
 | ||||
| // This should only be used on reconnects or rarely to avoid overloading the
 | ||||
| // server. Use the individual methods for reporting subscribed events.
 | ||||
| func (s *Service) report(in *json.Decoder, out *json.Encoder) error { | ||||
| 	if err := s.reportLatency(in, out); err != nil { | ||||
| func (s *Service) report(out *json.Encoder) error { | ||||
| 	if err := s.reportLatency(out); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	if err := s.reportBlock(out, nil); err != nil { | ||||
| @ -267,7 +360,7 @@ func (s *Service) report(in *json.Decoder, out *json.Encoder) error { | ||||
| 
 | ||||
| // reportLatency sends a ping request to the server, measures the RTT time and
 | ||||
| // finally sends a latency update.
 | ||||
| func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error { | ||||
| func (s *Service) reportLatency(out *json.Encoder) error { | ||||
| 	// Send the current time to the ethstats server
 | ||||
| 	start := time.Now() | ||||
| 
 | ||||
| @ -281,9 +374,12 @@ func (s *Service) reportLatency(in *json.Decoder, out *json.Encoder) error { | ||||
| 		return err | ||||
| 	} | ||||
| 	// Wait for the pong request to arrive back
 | ||||
| 	var pong map[string][]interface{} | ||||
| 	if err := in.Decode(&pong); err != nil || len(pong["emit"]) != 2 || pong["emit"][0].(string) != "node-pong" { | ||||
| 		return errors.New("unexpected ping reply") | ||||
| 	select { | ||||
| 	case <-s.pongCh: | ||||
| 		// Pong delivered, report the latency
 | ||||
| 	case <-time.After(3 * time.Second): | ||||
| 		// Ping timeout, abort
 | ||||
| 		return errors.New("ping timed out") | ||||
| 	} | ||||
| 	// Send back the measured latency
 | ||||
| 	latency := map[string][]interface{}{ | ||||
| @ -336,9 +432,26 @@ func (s uncleStats) MarshalJSON() ([]byte, error) { | ||||
| 
 | ||||
| // reportBlock retrieves the current chain head and repors it to the stats server.
 | ||||
| func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error { | ||||
| 	// Gather the head block infos from the local blockchain
 | ||||
| 	// Assemble the block stats report and send it to the server
 | ||||
| 	stats := map[string]interface{}{ | ||||
| 		"id":    s.node, | ||||
| 		"block": s.assembleBlockStats(block), | ||||
| 	} | ||||
| 	report := map[string][]interface{}{ | ||||
| 		"emit": []interface{}{"block", stats}, | ||||
| 	} | ||||
| 	if err := out.Encode(report); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // assembleBlockStats retrieves any required metadata to report a single block
 | ||||
| // and assembles the block stats. If block is nil, the current head is processed.
 | ||||
| func (s *Service) assembleBlockStats(block *types.Block) *blockStats { | ||||
| 	// Gather the block infos from the local blockchain
 | ||||
| 	var ( | ||||
| 		head   *types.Header | ||||
| 		header *types.Header | ||||
| 		td     *big.Int | ||||
| 		txs    []*types.Transaction | ||||
| 		uncles []*types.Header | ||||
| @ -348,38 +461,77 @@ func (s *Service) reportBlock(out *json.Encoder, block *types.Block) error { | ||||
| 		if block == nil { | ||||
| 			block = s.eth.BlockChain().CurrentBlock() | ||||
| 		} | ||||
| 		head = block.Header() | ||||
| 		td = s.eth.BlockChain().GetTd(head.Hash(), head.Number.Uint64()) | ||||
| 		header = block.Header() | ||||
| 		td = s.eth.BlockChain().GetTd(header.Hash(), header.Number.Uint64()) | ||||
| 
 | ||||
| 		txs = block.Transactions() | ||||
| 		uncles = block.Uncles() | ||||
| 	} else { | ||||
| 		// Light nodes would need on-demand lookups for transactions/uncles, skip
 | ||||
| 		if block != nil { | ||||
| 			head = block.Header() | ||||
| 			header = block.Header() | ||||
| 		} else { | ||||
| 			header = s.les.BlockChain().CurrentHeader() | ||||
| 		} | ||||
| 		td = s.les.BlockChain().GetTd(header.Hash(), header.Number.Uint64()) | ||||
| 	} | ||||
| 	// Assemble and return the block stats
 | ||||
| 	return &blockStats{ | ||||
| 		Number:    header.Number, | ||||
| 		Hash:      header.Hash(), | ||||
| 		Timestamp: header.Time, | ||||
| 		Miner:     header.Coinbase, | ||||
| 		GasUsed:   new(big.Int).Set(header.GasUsed), | ||||
| 		GasLimit:  new(big.Int).Set(header.GasLimit), | ||||
| 		Diff:      header.Difficulty.String(), | ||||
| 		TotalDiff: td.String(), | ||||
| 		Txs:       txs, | ||||
| 		Uncles:    uncles, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // reportHistory retrieves the most recent batch of blocks and reports it to the
 | ||||
| // stats server.
 | ||||
| func (s *Service) reportHistory(out *json.Encoder, list []uint64) error { | ||||
| 	// Figure out the indexes that need reporting
 | ||||
| 	indexes := make([]uint64, 0, historyUpdateRange) | ||||
| 	if len(list) > 0 { | ||||
| 		// Specific indexes requested, send them back in particular
 | ||||
| 		for _, idx := range list { | ||||
| 			indexes = append(indexes, idx) | ||||
| 		} | ||||
| 	} else { | ||||
| 		// No indexes requested, send back the top ones
 | ||||
| 		var head *types.Header | ||||
| 		if s.eth != nil { | ||||
| 			head = s.eth.BlockChain().CurrentHeader() | ||||
| 		} else { | ||||
| 			head = s.les.BlockChain().CurrentHeader() | ||||
| 		} | ||||
| 		td = s.les.BlockChain().GetTd(head.Hash(), head.Number.Uint64()) | ||||
| 		start := head.Number.Int64() - historyUpdateRange | ||||
| 		if start < 0 { | ||||
| 			start = 0 | ||||
| 		} | ||||
| 		for i := uint64(start); i <= head.Number.Uint64(); i++ { | ||||
| 			indexes = append(indexes, i) | ||||
| 		} | ||||
| 	} | ||||
| 	// Assemble the block stats report and send it to the server
 | ||||
| 	// Gather the batch of blocks to report
 | ||||
| 	history := make([]*blockStats, len(indexes)) | ||||
| 	for i, number := range indexes { | ||||
| 		if s.eth != nil { | ||||
| 			history[i] = s.assembleBlockStats(s.eth.BlockChain().GetBlockByNumber(number)) | ||||
| 		} else { | ||||
| 			history[i] = s.assembleBlockStats(types.NewBlockWithHeader(s.les.BlockChain().GetHeaderByNumber(number))) | ||||
| 		} | ||||
| 	} | ||||
| 	// Assemble the history report and send it to the server
 | ||||
| 	stats := map[string]interface{}{ | ||||
| 		"id": s.node, | ||||
| 		"block": &blockStats{ | ||||
| 			Number:    head.Number, | ||||
| 			Hash:      head.Hash(), | ||||
| 			Timestamp: head.Time, | ||||
| 			Miner:     head.Coinbase, | ||||
| 			GasUsed:   new(big.Int).Set(head.GasUsed), | ||||
| 			GasLimit:  new(big.Int).Set(head.GasLimit), | ||||
| 			Diff:      head.Difficulty.String(), | ||||
| 			TotalDiff: td.String(), | ||||
| 			Txs:       txs, | ||||
| 			Uncles:    uncles, | ||||
| 		}, | ||||
| 		"id":      s.node, | ||||
| 		"history": history, | ||||
| 	} | ||||
| 	report := map[string][]interface{}{ | ||||
| 		"emit": []interface{}{"block", stats}, | ||||
| 		"emit": []interface{}{"history", stats}, | ||||
| 	} | ||||
| 	if err := out.Encode(report); err != nil { | ||||
| 		return err | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user