diff --git a/net.cpp b/net.cpp index 6e9efff16..0a22a7e63 100644 --- a/net.cpp +++ b/net.cpp @@ -20,8 +20,8 @@ */ #include -#include #include +#include #include using namespace std; using namespace dev; @@ -29,58 +29,551 @@ using namespace dev::p2p; namespace ba = boost::asio; namespace bi = ba::ip; -class TestA: UDPSocketEvents, public Worker +/** + * Ping packet: Check if node is alive. + * PingNode is cached and regenerated after expiration - t, where t is timeout. + * + * signature: Signature of message. + * ipAddress: Our IP address. + * port: Our port. + * expiration: Triggers regeneration of packet. May also provide control over synchronization. + * + * Ping is used to implement evict. When a new node is seen for + * a given bucket which is full, the least-responsive node is pinged. + * If the pinged node doesn't respond then it is removed and the new + * node is inserted. + */ +struct PingNode: RLPDatagram +{ + bytes ipAddress; + uint16_t port; + uint64_t expiration; + + Signature signature; + +// void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << expiration; } +}; + +struct Pong: RLPDatagram +{ + // todo: weak-signed pong + Address from; + uint64_t replyTo; /// expiration from PingNode + + void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << from << replyTo; } +}; + +/** + * FindNeighbors Packet: Request k-nodes, closest to the target. + * FindNeighbors is cached and regenerated after expiration - t, where t is timeout. + * + * signature: Signature of message. + * target: Address of NodeId. The responding node will send back nodes closest to the target. + * expiration: Triggers regeneration of packet. May also provide control over synchronization. + * + */ +struct FindNeighbors: RLPDatagram +{ + h160 target; + uint64_t expiration; + + Signature signature; + + void streamRLP(RLPStream& _s) const { _s.appendList(2); _s << target << expiration; } +}; + +/** + * Node Packet: Multiple node packets are sent in response to FindNeighbors. + */ +struct Neighbors: RLPDatagram +{ + struct Node + { + bytes ipAddress; + uint16_t port; + NodeId node; +// void streamRLP(RLPStream& _s) const { _s.appendList(3); _s << ipAddress << port << node; } + }; + + std::set nodes; + h256 nonce; + + Signature signature; + +// void streamRLP(RLPStream& _s) const { _s.appendList(2); _s.appendList(nodes.size()); for (auto& n: nodes) n.streamRLP(_s); _s << nonce; } +}; + +/** + * NodeTable using S/Kademlia system for node discovery and preference. + * untouched buckets are refreshed if they have not been touched within an hour + * + * Thread-safety is ensured by modifying NodeEntry details via + * shared_ptr replacement instead of mutating values. + * + * @todo don't try to evict node if node isRequired. (support for makeRequired) + * @todo optimize (use tree for state (or set w/custom compare for cache)) + * @todo constructor support for m_node, m_secret + * @todo use s_bitsPerStep for find and refresh/ping + * @todo exclude bucket from refresh if we have node as peer + * @todo restore nodes + */ +class NodeTable: UDPSocketEvents, public std::enable_shared_from_this +{ + using nodeSocket = UDPSocket; + using timePoint = std::chrono::steady_clock::time_point; + + static unsigned const s_bucketSize = 16; // Denoted by k in [Kademlia]. Number of nodes stored in each bucket. +// const unsigned s_bitsPerStep = 5; // @todo Denoted by b in [Kademlia]. Bits by which address space will be divided for find responses. + static unsigned const s_alpha = 3; // Denoted by \alpha in [Kademlia]. Number of concurrent FindNeighbors requests. + const unsigned s_findTimout = 300; // How long to wait between find queries. +// const unsigned s_siblings = 5; // @todo Denoted by s in [S/Kademlia]. User-defined by sub-protocols. + const unsigned s_bucketRefresh = 3600; // Refresh interval prevents bucket from becoming stale. [Kademlia] + const unsigned s_bits = sizeof(Address); // Denoted by n. + const unsigned s_buckets = 8 * s_bits - 1; + const unsigned s_evictionCheckInterval = 75; // Interval by which eviction timeouts are checked. + const unsigned s_pingTimeout = 500; + static size_t const s_tableSize = Address::size * 8 - 1; // Address::size + +public: + static unsigned dist(Address const& _a, Address const& _b) { u160 d = _a ^ _b; unsigned ret; for (ret = 0; d >>= 1; ++ret) {}; return ret; } + +protected: + struct NodeDefaultEndpoint + { + NodeDefaultEndpoint(bi::udp::endpoint _udp): udp(_udp) {} + bi::udp::endpoint udp; + }; + + struct NodeEntry + { + NodeEntry(Address _id, Public _pubk, bi::udp::endpoint _udp): id(_id), pubk(_pubk), endpoint(NodeDefaultEndpoint(_udp)), distance(0) {} + NodeEntry(NodeEntry _src, Address _id, Public _pubk, bi::udp::endpoint _udp): id(_id), pubk(_pubk), endpoint(NodeDefaultEndpoint(_udp)), distance(dist(_src.id,_id)) {} + NodeEntry(NodeEntry _src, Address _id, Public _pubk, NodeDefaultEndpoint _gw): id(_id), pubk(_pubk), endpoint(_gw), distance(dist(_src.id,_id)) {} + Address id; + Public pubk; + NodeDefaultEndpoint endpoint; ///< How we've previously connected to this node. (must match node's reported endpoint) + const unsigned distance; + timePoint activePing; + }; + + struct NodeBucket + { + unsigned distance; + timePoint modified; + std::list> nodes; + }; + + using EvictionTimeout = std::pair,Address>; + +public: + NodeTable(ba::io_service& _io): + m_node(NodeEntry(Address(), Public(), bi::udp::endpoint())), + m_socket(new nodeSocket(_io, *this, 30300)), + m_socketPtr(m_socket.get()), + m_io(_io), + m_bucketRefreshTimer(m_io), + m_evictionCheckTimer(m_io) + { + for (unsigned i = 0; i < s_buckets; i++) + m_state[i].distance = i, m_state[i].modified = chrono::steady_clock::now() - chrono::seconds(1); + doRefreshBuckets(boost::system::error_code()); + } + + ~NodeTable() { + m_evictionCheckTimer.cancel(); + m_bucketRefreshTimer.cancel(); + m_socketPtr->disconnect(); + } + + void join() { doFindNode(m_node.id); } + + std::list
nodes() const + { + std::list
nodes; + Guard l(x_nodes); + for (auto& i: m_nodes) + nodes.push_back(i.second->id); + return std::move(nodes); + } + + NodeEntry operator[](Address _id) + { + Guard l(x_nodes); + return *m_nodes[_id]; + } + +protected: + void requestNeighbors(NodeEntry const& _node, Address _target) const + { + FindNeighbors p; + p.target = _target; + + p.to = _node.endpoint.udp; + p.seal(m_secret); + m_socketPtr->send(p); + } + + /// Dispatches udp requests in order to populate node table to be as close as possible to _node. + void doFindNode(Address _node, unsigned _round = 0, std::shared_ptr>> _tried = std::shared_ptr>>()) + { + if (!m_socketPtr->isOpen() || _round == 7) + return; + + auto nearest = findNearest(_node); + std::list> tried; + for (unsigned i = 0; i < nearest.size() && tried.size() < s_alpha; i++) + if (!_tried->count(nearest[i])) + { + tried.push_back(nearest[i]); + requestNeighbors(*nearest[i], _node); + } + else + continue; + + while (auto n = tried.front()) + { + _tried->insert(n); + tried.pop_front(); + } + + auto self(shared_from_this()); + m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(s_findTimout)); + m_evictionCheckTimer.async_wait([this, self, _node, _round, _tried](boost::system::error_code const& _ec) + { + if (_ec) + return; + doFindNode(_node, _round + 1, _tried); + }); + } + + std::vector> findNearest(Address _target) + { + // send s_alpha FindNeighbors packets to nodes we know, closest to target + unsigned head = dist(m_node.id, _target); + unsigned tail = (head - 1) % (s_tableSize - 1); + + // todo: optimize with tree + std::map>> found; + unsigned count = 0; + + // if d is 0, then we roll look forward, if last, we reverse, else, spread from d + if (head != 0 && tail != s_tableSize) + while (head != tail && count < s_bucketSize) + { + Guard l(x_state); + for (auto& n: m_state[head].nodes) + if (auto p = n.lock()) + { + if (count < s_bucketSize) + found[dist(_target, p->id)].push_back(p); + else + break; + } + + if (count < s_bucketSize && head) + for (auto& n: m_state[tail].nodes) + if (auto p = n.lock()) + { + if (count < s_bucketSize) + found[dist(_target, p->id)].push_back(p); + else + break; + } + head++; + tail = (tail - 1) % (s_tableSize - 1); + } + else if (head == 0) + while (head < s_bucketSize && count < s_bucketSize) + { + Guard l(x_state); + for (auto& n: m_state[head].nodes) + if (auto p = n.lock()) + { + if (count < s_bucketSize) + found[dist(_target, p->id)].push_back(p); + else + break; + } + head--; + } + else if (tail == s_tableSize - 1) + while (tail > 0 && count < s_bucketSize) + { + Guard l(x_state); + for (auto& n: m_state[tail].nodes) + if (auto p = n.lock()) + { + if (count < s_bucketSize) + found[dist(_target, p->id)].push_back(p); + else + break; + } + tail--; + } + + std::vector> ret; + for (auto& nodes: found) + for (auto& n: nodes.second) + ret.push_back(n); + return std::move(ret); + } + + void ping(bi::address _address, unsigned _port) const + { + PingNode p; + string ip = m_node.endpoint.udp.address().to_string(); + p.ipAddress = asBytes(ip); + p.port = m_node.endpoint.udp.port(); +// p.expiration; + p.seal(m_secret); + m_socketPtr->send(p); + } + + void ping(NodeEntry* _n) const + { + if (_n && _n->endpoint.udp.address().is_v4()) + ping(_n->endpoint.udp.address(), _n->endpoint.udp.port()); + } + + void evict(std::shared_ptr _leastSeen, std::shared_ptr _new) + { + if (!m_socketPtr->isOpen()) + return; + + Guard l(x_evictions); + m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); + if (m_evictions.size() == 1) + doCheckEvictions(boost::system::error_code()); + + m_evictions.push_back(EvictionTimeout(make_pair(_leastSeen->id,chrono::steady_clock::now()), _new->id)); + ping(_leastSeen.get()); + } + + void noteNode(Public _pubk, bi::udp::endpoint _endpoint) + { + Address id = right160(sha3(_pubk)); + std::shared_ptr node; + { + Guard l(x_nodes); + auto n = m_nodes.find(id); + if (n == m_nodes.end()) + { + m_nodes[id] = std::shared_ptr(new NodeEntry(m_node, id, _pubk, _endpoint)); + node = m_nodes[id]; + } + else + node = n->second; + } + + noteNode(node); + } + + void noteNode(std::shared_ptr _n) + { + std::shared_ptr contested; + { + NodeBucket s = bucket(_n.get()); + Guard l(x_state); + s.nodes.remove_if([&_n](std::weak_ptr n) + { + auto p = n.lock(); + if (!p || p == _n) + return true; + return false; + }); + + if (s.nodes.size() >= s_bucketSize) + { + contested = s.nodes.front().lock(); + if (!contested) + { + s.nodes.pop_front(); + s.nodes.push_back(_n); + } + } + else + s.nodes.push_back(_n); + } + + if (contested) + evict(contested, _n); + } + + void dropNode(std::shared_ptr _n) + { + NodeBucket s = bucket(_n.get()); + { + Guard l(x_state); + s.nodes.remove_if([&_n](std::weak_ptr n) { return n.lock() == _n; }); + } + Guard l(x_nodes); + m_nodes.erase(_n->id); + } + + NodeBucket const& bucket(NodeEntry* _n) const + { + return m_state[_n->distance]; + } + + void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet) + { + RLP rlp(_packet); + + + // whenever a pong is received, first check if it's in m_evictions, if so, remove it + Guard l(x_evictions); + } + + void onDisconnected(UDPSocketFace*) + { + + } + + void doCheckEvictions(boost::system::error_code const& _ec) + { + if (_ec || !m_socketPtr->isOpen()) + return; + + m_evictionCheckTimer.expires_from_now(boost::posix_time::milliseconds(s_evictionCheckInterval)); + auto self(shared_from_this()); + m_evictionCheckTimer.async_wait([this, self](boost::system::error_code const& _ec) + { + if (_ec) + return; + + bool evictionsRemain = false; + std::list> drop; + { + Guard l(x_evictions); + for (auto& e: m_evictions) + if (chrono::steady_clock::now() - e.first.second > chrono::milliseconds(s_pingTimeout)) + { + Guard l(x_nodes); + drop.push_back(m_nodes[e.second]); + } + evictionsRemain = m_evictions.size() - drop.size() > 0; + } + + for (auto& n: drop) + dropNode(n); + + if (evictionsRemain) + doCheckEvictions(boost::system::error_code()); + }); + } + + void doRefreshBuckets(boost::system::error_code const& _ec) + { + cout << "refreshing buckets" << endl; + if (_ec) + return; + + // first check if there are any pending evictions + + + bool connected = m_socketPtr->isOpen(); + bool refreshed = false; + if (connected) + { + Guard l(x_state); + for (auto& d: m_state) + if (chrono::steady_clock::now() - d.modified > chrono::seconds(s_bucketRefresh)) + while (!d.nodes.empty()) + { + auto n = d.nodes.front(); + if (auto p = n.lock()) + { + refreshed = true; + ping(p.get()); + break; + } + d.nodes.pop_front(); + } + } + + unsigned nextRefresh = connected ? (refreshed ? 200 : s_bucketRefresh*1000) : 10000; + auto runcb = [this](boost::system::error_code const& error) -> void { doRefreshBuckets(error); }; + m_bucketRefreshTimer.expires_from_now(boost::posix_time::milliseconds(nextRefresh)); + m_bucketRefreshTimer.async_wait(runcb); + } + +private: + NodeEntry m_node; ///< This node. + Secret m_secret; ///< This nodes secret key. + + mutable Mutex x_nodes; ///< Mutable for thread-safe copy in nodes() const. + std::map> m_nodes; ///< Address -> Node table (most common lookup path) + + Mutex x_state; + std::array m_state; ///< State table; logbinned nodes. + + Mutex x_evictions; + std::deque m_evictions; ///< Eviction timeouts. + + shared_ptr m_socket; ///< Shared pointer for our UDPSocket; ASIO requires shared_ptr. + nodeSocket* m_socketPtr; ///< Set to m_socket.get(). + ba::io_service& m_io; ///< Used by bucket refresh timer. + boost::asio::deadline_timer m_bucketRefreshTimer; ///< Timer which schedules and enacts bucket refresh. + boost::asio::deadline_timer m_evictionCheckTimer; ///< Timer for handling node evictions. +}; + +/** + * Only used for testing. Not useful beyond tests. + */ +class TestHost: public Worker { public: - TestA(): Worker("test",0), m_io(), m_socket(new UDPSocket(m_io, *this, 30300)) {} - ~TestA() { m_io.stop(); stopWorking(); } + TestHost(): Worker("test",0), m_io() {}; + ~TestHost() { m_io.stop(); stopWorking(); } + void start() { startWorking(); } + void doWork() { m_io.run(); } +protected: + ba::io_service m_io; +}; + +/** + * Only used for testing. Not useful beyond tests. + */ +class TestNodeHost: public TestHost +{ +public: + TestNodeHost(): m_nodes(m_io) {}; + ~TestNodeHost() { m_io.stop(); stopWorking(); } + void start() { startWorking(); } + void doWork() { m_io.run(); } + + NodeTable m_nodes; +}; + +class TestUDPSocket: UDPSocketEvents, public TestHost +{ +public: + TestUDPSocket(): m_socket(new UDPSocket(m_io, *this, 30300)) {} + ~TestUDPSocket() { m_io.stop(); stopWorking(); } void start() { startWorking(); } void doWork() { m_io.run(); } void onDisconnected(UDPSocketFace*) {}; - void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet) { if (_packet.toString() == "AAAA") success = true; } + void onReceived(UDPSocketFace*, bi::udp::endpoint const&, bytesConstRef _packet) { if (_packet.toString() == "AAAA") success = true; } - ba::io_service m_io; - shared_ptr> m_socket; + shared_ptr> m_socket; bool success = false; }; -//struct TestBProtocol: UDPSocketEvents -//{ -// void onDisconnected(UDPSocketFace*) {}; -// void onReceived(UDPSocketFace*, bi::udp::endpoint const& _from, bytesConstRef _packet) { cout << "received TestBProtocol" << endl; }; -//}; -// -//class TestB: TestBProtocol -//{ -//public: -// TestB(): m_io(), m_socket(m_io, *this, 30300) {} -////private: -// ba::io_service m_io; -// UDPSocket m_socket; -//}; -// -//class TestC -//{ -//public: -// TestC(): m_io(), m_socket(m_io, m_rpc, 30300) {} -////private: -// ba::io_service m_io; -// TestBProtocol m_rpc; -// UDPSocket m_socket; -//}; - BOOST_AUTO_TEST_SUITE(p2p) +BOOST_AUTO_TEST_CASE(kademlia) +{ + TestNodeHost nodeHost; + +} + BOOST_AUTO_TEST_CASE(test_txrx_one) { - UDPDatagram d; - d.to = boost::asio::ip::udp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 30300); - d.data = bytes({65,65,65,65}); - - TestA a; a.m_socket->connect(); a.start(); + UDPDatagram d(bi::udp::endpoint(boost::asio::ip::address::from_string("127.0.0.1"), 30300), bytes({65,65,65,65})); + TestUDPSocket a; a.m_socket->connect(); a.start(); a.m_socket->send(d); sleep(1); BOOST_REQUIRE_EQUAL(true, a.success);