cmd/swarm-smoke: check if chunks are at most prox host
swarm/network: measure how many chunks a node delivers (#1358)
This commit is contained in:
		
							parent
							
								
									a1cd7e6e92
								
							
						
					
					
						commit
						f8eb8fe64c
					
				| @ -19,10 +19,12 @@ package main | ||||
| import ( | ||||
| 	"bytes" | ||||
| 	"context" | ||||
| 	"encoding/hex" | ||||
| 	"fmt" | ||||
| 	"io/ioutil" | ||||
| 	"math/rand" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"sync/atomic" | ||||
| 	"time" | ||||
| @ -30,6 +32,7 @@ import ( | ||||
| 	"github.com/ethereum/go-ethereum/log" | ||||
| 	"github.com/ethereum/go-ethereum/metrics" | ||||
| 	"github.com/ethereum/go-ethereum/rpc" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/chunk" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/storage" | ||||
| 	"github.com/ethereum/go-ethereum/swarm/testutil" | ||||
| 
 | ||||
| @ -88,6 +91,10 @@ func trackChunks(testData []byte, submitMetrics bool) error { | ||||
| 	var wg sync.WaitGroup | ||||
| 	wg.Add(len(hosts)) | ||||
| 
 | ||||
| 	var mu sync.Mutex                    // mutex protecting the allHostsChunks and bzzAddrs maps
 | ||||
| 	allHostChunks := map[string]string{} // host->bitvector of presence for chunks
 | ||||
| 	bzzAddrs := map[string]string{}      // host->bzzAddr
 | ||||
| 
 | ||||
| 	for _, host := range hosts { | ||||
| 		host := host | ||||
| 		go func() { | ||||
| @ -96,6 +103,7 @@ func trackChunks(testData []byte, submitMetrics bool) error { | ||||
| 
 | ||||
| 			ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) | ||||
| 			defer cancel() | ||||
| 
 | ||||
| 			rpcClient, err := rpc.DialContext(ctx, httpHost) | ||||
| 			if rpcClient != nil { | ||||
| 				defer rpcClient.Close() | ||||
| @ -106,14 +114,25 @@ func trackChunks(testData []byte, submitMetrics bool) error { | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			var hostChunks string | ||||
| 			err = rpcClient.Call(&hostChunks, "bzz_has", addrs) | ||||
| 			hostChunks, err := getChunksBitVectorFromHost(rpcClient, addrs) | ||||
| 			if err != nil { | ||||
| 				log.Error("error calling rpc client", "err", err, "host", httpHost) | ||||
| 				log.Error("error getting chunks bit vector from host", "err", err, "host", httpHost) | ||||
| 				hasErr = true | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			bzzAddr, err := getBzzAddrFromHost(rpcClient) | ||||
| 			if err != nil { | ||||
| 				log.Error("error getting bzz addrs from host", "err", err, "host", httpHost) | ||||
| 				hasErr = true | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			mu.Lock() | ||||
| 			allHostChunks[host] = hostChunks | ||||
| 			bzzAddrs[host] = bzzAddr | ||||
| 			mu.Unlock() | ||||
| 
 | ||||
| 			yes, no := 0, 0 | ||||
| 			for _, val := range hostChunks { | ||||
| 				if val == '1' { | ||||
| @ -140,6 +159,8 @@ func trackChunks(testData []byte, submitMetrics bool) error { | ||||
| 
 | ||||
| 	wg.Wait() | ||||
| 
 | ||||
| 	checkChunksVsMostProxHosts(addrs, allHostChunks, bzzAddrs) | ||||
| 
 | ||||
| 	if !hasErr && submitMetrics { | ||||
| 		// remove the chunks stored on the uploader node
 | ||||
| 		globalYes -= len(addrs) | ||||
| @ -152,6 +173,82 @@ func trackChunks(testData []byte, submitMetrics bool) error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // getChunksBitVectorFromHost returns a bit vector of presence for a given slice of chunks from a given host
 | ||||
| func getChunksBitVectorFromHost(client *rpc.Client, addrs []storage.Address) (string, error) { | ||||
| 	var hostChunks string | ||||
| 
 | ||||
| 	err := client.Call(&hostChunks, "bzz_has", addrs) | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	return hostChunks, nil | ||||
| } | ||||
| 
 | ||||
| // getBzzAddrFromHost returns the bzzAddr for a given host
 | ||||
| func getBzzAddrFromHost(client *rpc.Client) (string, error) { | ||||
| 	var hive string | ||||
| 
 | ||||
| 	err := client.Call(&hive, "bzz_hive") | ||||
| 	if err != nil { | ||||
| 		return "", err | ||||
| 	} | ||||
| 
 | ||||
| 	// we make an ugly assumption about the output format of the hive.String() method
 | ||||
| 	// ideally we should replace this with an API call that returns the bzz addr for a given host,
 | ||||
| 	// but this also works for now (provided we don't change the hive.String() method, which we haven't in some time
 | ||||
| 	return strings.Split(strings.Split(hive, "\n")[3], " ")[10], nil | ||||
| } | ||||
| 
 | ||||
| // checkChunksVsMostProxHosts is checking:
 | ||||
| // 1. whether a chunk has been found at less than 2 hosts. Considering our NN size, this should not happen.
 | ||||
| // 2. if a chunk is not found at its closest node. This should also not happen.
 | ||||
| // Together with the --only-upload flag, we could run this smoke test and make sure that our syncing
 | ||||
| // functionality is correct (without even trying to retrieve the content).
 | ||||
| //
 | ||||
| // addrs - a slice with all uploaded chunk refs
 | ||||
| // allHostChunks - host->bit vector, showing what chunks are present on what hosts
 | ||||
| // bzzAddrs - host->bzz address, used when determining the most proximate host for a given chunk
 | ||||
| func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[string]string, bzzAddrs map[string]string) { | ||||
| 	for k, v := range bzzAddrs { | ||||
| 		log.Trace("bzzAddr", "bzz", v, "host", k) | ||||
| 	} | ||||
| 
 | ||||
| 	for i := range addrs { | ||||
| 		var foundAt int | ||||
| 		maxProx := -1 | ||||
| 		var maxProxHost string | ||||
| 		for host := range allHostChunks { | ||||
| 			if allHostChunks[host][i] == '1' { | ||||
| 				foundAt++ | ||||
| 			} | ||||
| 
 | ||||
| 			ba, err := hex.DecodeString(bzzAddrs[host]) | ||||
| 			if err != nil { | ||||
| 				panic(err) | ||||
| 			} | ||||
| 
 | ||||
| 			// calculate the host closest to any chunk
 | ||||
| 			prox := chunk.Proximity(addrs[i], ba) | ||||
| 			if prox > maxProx { | ||||
| 				maxProx = prox | ||||
| 				maxProxHost = host | ||||
| 			} | ||||
| 		} | ||||
| 
 | ||||
| 		if allHostChunks[maxProxHost][i] == '0' { | ||||
| 			log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) | ||||
| 		} else { | ||||
| 			log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost]) | ||||
| 		} | ||||
| 
 | ||||
| 		// if chunk found at less than 2 hosts
 | ||||
| 		if foundAt < 2 { | ||||
| 			log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i]) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func getAllRefs(testData []byte) (storage.AddressCollection, error) { | ||||
| 	datadir, err := ioutil.TempDir("", "chunk-debug") | ||||
| 	if err != nil { | ||||
|  | ||||
| @ -134,7 +134,7 @@ func NewPeer(peer *protocols.Peer, streamer *Registry) *Peer { | ||||
| func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, syncing bool) error { | ||||
| 	var msg interface{} | ||||
| 
 | ||||
| 	spanName := "send.chunk.delivery" | ||||
| 	metrics.GetOrRegisterCounter("peer.deliver", nil).Inc(1) | ||||
| 
 | ||||
| 	//we send different types of messages if delivery is for syncing or retrievals,
 | ||||
| 	//even if handling and content of the message are the same,
 | ||||
| @ -144,16 +144,13 @@ func (p *Peer) Deliver(ctx context.Context, chunk storage.Chunk, priority uint8, | ||||
| 			Addr:  chunk.Address(), | ||||
| 			SData: chunk.Data(), | ||||
| 		} | ||||
| 		spanName += ".syncing" | ||||
| 	} else { | ||||
| 		msg = &ChunkDeliveryMsgRetrieval{ | ||||
| 			Addr:  chunk.Address(), | ||||
| 			SData: chunk.Data(), | ||||
| 		} | ||||
| 		spanName += ".retrieval" | ||||
| 	} | ||||
| 
 | ||||
| 	ctx = context.WithValue(ctx, "stream_send_tag", nil) | ||||
| 	return p.SendPriority(ctx, msg, priority) | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user