package pss import ( "context" "encoding/binary" "encoding/json" "errors" "fmt" "io/ioutil" "os" "strconv" "strings" "sync" "testing" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/simulations" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/swarm/network" "github.com/ethereum/go-ethereum/swarm/network/simulation" "github.com/ethereum/go-ethereum/swarm/pot" "github.com/ethereum/go-ethereum/swarm/state" ) // needed to make the enode id of the receiving node available to the handler for triggers type handlerContextFunc func(*testData, *adapters.NodeConfig) *handler // struct to notify reception of messages to simulation driver // TODO To make code cleaner: // - consider a separate pss unwrap to message event in sim framework (this will make eventual message propagation analysis with pss easier/possible in the future) // - consider also test api calls to inspect handling results of messages type handlerNotification struct { id enode.ID serial uint64 } type testData struct { mu sync.Mutex sim *simulation.Simulation handlerDone bool // set to true on termination of the simulation run requiredMessages int allowedMessages int messageCount int kademlias map[enode.ID]*network.Kademlia nodeAddrs map[enode.ID][]byte // make predictable overlay addresses from the generated random enode ids recipients map[int][]enode.ID // for logging output only allowed map[int][]enode.ID // allowed recipients expectedMsgs map[enode.ID][]uint64 // message serials we expect respective nodes to receive allowedMsgs map[enode.ID][]uint64 // message serials we expect respective nodes to receive senders map[int]enode.ID // originating nodes of the messages (intention is to choose as far as possible from the receiving neighborhood) handlerC chan handlerNotification // passes message from pss message handler to simulation driver doneC chan struct{} // terminates the handler channel listener errC chan error // error to pass to main sim thread msgC chan handlerNotification // message receipt notification to main sim thread msgs [][]byte // recipient addresses of messages } var ( pof = pot.DefaultPof(256) // generate messages and index them topic = BytesToTopic([]byte{0xf3, 0x9e, 0x06, 0x82}) ) func (d *testData) getMsgCount() int { d.mu.Lock() defer d.mu.Unlock() return d.messageCount } func (d *testData) incrementMsgCount() int { d.mu.Lock() defer d.mu.Unlock() d.messageCount++ return d.messageCount } func (d *testData) isDone() bool { d.mu.Lock() defer d.mu.Unlock() return d.handlerDone } func (d *testData) setDone() { d.mu.Lock() defer d.mu.Unlock() d.handlerDone = true } func getCmdParams(t *testing.T) (int, int) { args := strings.Split(t.Name(), "/") msgCount, err := strconv.ParseInt(args[2], 10, 16) if err != nil { t.Fatal(err) } nodeCount, err := strconv.ParseInt(args[1], 10, 16) if err != nil { t.Fatal(err) } return int(msgCount), int(nodeCount) } func readSnapshot(t *testing.T, nodeCount int) simulations.Snapshot { f, err := os.Open(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount)) if err != nil { t.Fatal(err) } defer f.Close() jsonbyte, err := ioutil.ReadAll(f) if err != nil { t.Fatal(err) } var snap simulations.Snapshot err = json.Unmarshal(jsonbyte, &snap) if err != nil { t.Fatal(err) } return snap } func newTestData() *testData { return &testData{ kademlias: make(map[enode.ID]*network.Kademlia), nodeAddrs: make(map[enode.ID][]byte), recipients: make(map[int][]enode.ID), allowed: make(map[int][]enode.ID), expectedMsgs: make(map[enode.ID][]uint64), allowedMsgs: make(map[enode.ID][]uint64), senders: make(map[int]enode.ID), handlerC: make(chan handlerNotification), doneC: make(chan struct{}), errC: make(chan error), msgC: make(chan handlerNotification), } } func (d *testData) init(msgCount int) { log.Debug("TestProxNetwork start") for _, nodeId := range d.sim.NodeIDs() { d.nodeAddrs[nodeId] = nodeIDToAddr(nodeId) } for i := 0; i < int(msgCount); i++ { msgAddr := pot.RandomAddress() // we choose message addresses randomly d.msgs = append(d.msgs, msgAddr.Bytes()) smallestPo := 256 var targets []enode.ID var closestPO int // loop through all nodes and find the required and allowed recipients of each message // (for more information, please see the comment to the main test function) for _, nod := range d.sim.Net.GetNodes() { po, _ := pof(d.msgs[i], d.nodeAddrs[nod.ID()], 0) depth := d.kademlias[nod.ID()].NeighbourhoodDepth() // only nodes with closest IDs (wrt the msg address) will be required recipients if po > closestPO { closestPO = po targets = nil targets = append(targets, nod.ID()) } else if po == closestPO { targets = append(targets, nod.ID()) } if po >= depth { d.allowedMessages++ d.allowed[i] = append(d.allowed[i], nod.ID()) d.allowedMsgs[nod.ID()] = append(d.allowedMsgs[nod.ID()], uint64(i)) } // a node with the smallest PO (wrt msg) will be the sender, // in order to increase the distance the msg must travel if po < smallestPo { smallestPo = po d.senders[i] = nod.ID() } } d.requiredMessages += len(targets) for _, id := range targets { d.recipients[i] = append(d.recipients[i], id) d.expectedMsgs[id] = append(d.expectedMsgs[id], uint64(i)) } log.Debug("nn for msg", "targets", len(d.recipients[i]), "msgidx", i, "msg", common.Bytes2Hex(msgAddr[:8]), "sender", d.senders[i], "senderpo", smallestPo) } log.Debug("msgs to receive", "count", d.requiredMessages) } // Here we test specific functionality of the pss, setting the prox property of // the handler. The tests generate a number of messages with random addresses. // Then, for each message it calculates which nodes have the msg address // within its nearest neighborhood depth, and stores those nodes as possible // recipients. Those nodes that are the closest to the message address (nodes // belonging to the deepest PO wrt the msg address) are stored as required // recipients. The difference between allowed and required recipients results // from the fact that the nearest neighbours are not necessarily reciprocal. // Upon sending the messages, the test verifies that the respective message is // passed to the message handlers of these required recipients. The test fails // if a message is handled by recipient which is not listed among the allowed // recipients of this particular message. It also fails after timeout, if not // all the required recipients have received their respective messages. // // For example, if proximity order of certain msg address is 4, and node X // has PO=5 wrt the message address, and nodes Y and Z have PO=6, then: // nodes Y and Z will be considered required recipients of the msg, // whereas nodes X, Y and Z will be allowed recipients. func TestProxNetwork(t *testing.T) { t.Run("16/16", testProxNetwork) } // params in run name: nodes/msgs func TestProxNetworkLong(t *testing.T) { if !*longrunning { t.Skip("run with --longrunning flag to run extensive network tests") } t.Run("8/100", testProxNetwork) t.Run("16/100", testProxNetwork) t.Run("32/100", testProxNetwork) t.Run("64/100", testProxNetwork) t.Run("128/100", testProxNetwork) } func testProxNetwork(t *testing.T) { tstdata := newTestData() msgCount, nodeCount := getCmdParams(t) handlerContextFuncs := make(map[Topic]handlerContextFunc) handlerContextFuncs[topic] = nodeMsgHandler services := newProxServices(tstdata, true, handlerContextFuncs, tstdata.kademlias) tstdata.sim = simulation.New(services) defer tstdata.sim.Close() err := tstdata.sim.UploadSnapshot(fmt.Sprintf("testdata/snapshot_%d.json", nodeCount)) if err != nil { t.Fatal(err) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) defer cancel() snap := readSnapshot(t, nodeCount) err = tstdata.sim.WaitTillSnapshotRecreated(ctx, snap) if err != nil { t.Fatalf("failed to recreate snapshot: %s", err) } tstdata.init(msgCount) // initialize the test data wrapper := func(c context.Context, _ *simulation.Simulation) error { return testRoutine(tstdata, c) } result := tstdata.sim.Run(ctx, wrapper) // call the main test function if result.Error != nil { // context deadline exceeded // however, it might just mean that not all possible messages are received // now we must check if all required messages are received cnt := tstdata.getMsgCount() log.Debug("TestProxNetwork finnished", "rcv", cnt) if cnt < tstdata.requiredMessages { t.Fatal(result.Error) } } t.Logf("completed %d", result.Duration) } func (tstdata *testData) sendAllMsgs() { for i, msg := range tstdata.msgs { log.Debug("sending msg", "idx", i, "from", tstdata.senders[i]) nodeClient, err := tstdata.sim.Net.GetNode(tstdata.senders[i]).Client() if err != nil { tstdata.errC <- err } var uvarByte [8]byte binary.PutUvarint(uvarByte[:], uint64(i)) nodeClient.Call(nil, "pss_sendRaw", hexutil.Encode(msg), hexutil.Encode(topic[:]), hexutil.Encode(uvarByte[:])) } log.Debug("all messages sent") } // testRoutine is the main test function, called by Simulation.Run() func testRoutine(tstdata *testData, ctx context.Context) error { go handlerChannelListener(tstdata, ctx) go tstdata.sendAllMsgs() received := 0 // collect incoming messages and terminate with corresponding status when message handler listener ends for { select { case err := <-tstdata.errC: return err case hn := <-tstdata.msgC: received++ log.Debug("msg received", "msgs_received", received, "total_expected", tstdata.requiredMessages, "id", hn.id, "serial", hn.serial) if received == tstdata.allowedMessages { close(tstdata.doneC) return nil } } } return nil } func handlerChannelListener(tstdata *testData, ctx context.Context) { for { select { case <-tstdata.doneC: // graceful exit tstdata.setDone() tstdata.errC <- nil return case <-ctx.Done(): // timeout or cancel tstdata.setDone() tstdata.errC <- ctx.Err() return // incoming message from pss message handler case handlerNotification := <-tstdata.handlerC: // check if recipient has already received all its messages and notify to fail the test if so aMsgs := tstdata.allowedMsgs[handlerNotification.id] if len(aMsgs) == 0 { tstdata.setDone() tstdata.errC <- fmt.Errorf("too many messages received by recipient %x", handlerNotification.id) return } // check if message serial is in expected messages for this recipient and notify to fail the test if not idx := -1 for i, msg := range aMsgs { if handlerNotification.serial == msg { idx = i break } } if idx == -1 { tstdata.setDone() tstdata.errC <- fmt.Errorf("message %d received by wrong recipient %v", handlerNotification.serial, handlerNotification.id) return } // message is ok, so remove that message serial from the recipient expectation array and notify the main sim thread aMsgs[idx] = aMsgs[len(aMsgs)-1] aMsgs = aMsgs[:len(aMsgs)-1] tstdata.msgC <- handlerNotification } } } func nodeMsgHandler(tstdata *testData, config *adapters.NodeConfig) *handler { return &handler{ f: func(msg []byte, p *p2p.Peer, asymmetric bool, keyid string) error { cnt := tstdata.incrementMsgCount() log.Debug("nodeMsgHandler rcv", "cnt", cnt) // using simple serial in message body, makes it easy to keep track of who's getting what serial, c := binary.Uvarint(msg) if c <= 0 { log.Crit(fmt.Sprintf("corrupt message received by %x (uvarint parse returned %d)", config.ID, c)) } if tstdata.isDone() { return errors.New("handlers aborted") // terminate if simulation is over } // pass message context to the listener in the simulation tstdata.handlerC <- handlerNotification{ id: config.ID, serial: serial, } return nil }, caps: &handlerCaps{ raw: true, // we use raw messages for simplicity prox: true, }, } } // an adaptation of the same services setup as in pss_test.go // replaces pss_test.go when those tests are rewritten to the new swarm/network/simulation package func newProxServices(tstdata *testData, allowRaw bool, handlerContextFuncs map[Topic]handlerContextFunc, kademlias map[enode.ID]*network.Kademlia) map[string]simulation.ServiceFunc { stateStore := state.NewInmemoryStore() kademlia := func(id enode.ID) *network.Kademlia { if k, ok := kademlias[id]; ok { return k } params := network.NewKadParams() params.MaxBinSize = 3 params.MinBinSize = 1 params.MaxRetries = 1000 params.RetryExponent = 2 params.RetryInterval = 1000000 kademlias[id] = network.NewKademlia(id[:], params) return kademlias[id] } return map[string]simulation.ServiceFunc{ "bzz": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { // normally translation of enode id to swarm address is concealed by the network package // however, we need to keep track of it in the test driver as well. // if the translation in the network package changes, that can cause these tests to unpredictably fail // therefore we keep a local copy of the translation here addr := network.NewAddr(ctx.Config.Node()) addr.OAddr = nodeIDToAddr(ctx.Config.Node().ID()) hp := network.NewHiveParams() hp.Discovery = false config := &network.BzzConfig{ OverlayAddr: addr.Over(), UnderlayAddr: addr.Under(), HiveParams: hp, } return network.NewBzz(config, kademlia(ctx.Config.ID), stateStore, nil, nil), nil, nil }, "pss": func(ctx *adapters.ServiceContext, b *sync.Map) (node.Service, func(), error) { // execadapter does not exec init() initTest() // create keys in whisper and set up the pss object ctxlocal, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() keys, err := wapi.NewKeyPair(ctxlocal) privkey, err := w.GetPrivateKey(keys) pssp := NewPssParams().WithPrivateKey(privkey) pssp.AllowRaw = allowRaw pskad := kademlia(ctx.Config.ID) ps, err := NewPss(pskad, pssp) if err != nil { return nil, nil, err } b.Store(simulation.BucketKeyKademlia, pskad) // register the handlers we've been passed var deregisters []func() for tpc, hndlrFunc := range handlerContextFuncs { deregisters = append(deregisters, ps.Register(&tpc, hndlrFunc(tstdata, ctx.Config))) } // if handshake mode is set, add the controller // TODO: This should be hooked to the handshake test file if useHandshake { SetHandshakeController(ps, NewHandshakeParams()) } // we expose some api calls for cheating ps.addAPI(rpc.API{ Namespace: "psstest", Version: "0.3", Service: NewAPITest(ps), Public: false, }) // return Pss and cleanups return ps, func() { // run the handler deregister functions in reverse order for i := len(deregisters); i > 0; i-- { deregisters[i-1]() } }, nil }, } } // makes sure we create the addresses the same way in driver and service setup func nodeIDToAddr(id enode.ID) []byte { return id.Bytes() }