3 Commits fcd8a25233 ... 021880d2e2

Author SHA1 Message Date
  Jakub Mitoraj 021880d2e2 reorder networking services 5 months ago
  Jakub Mitoraj d5b13baf38 continue rework networking 5 months ago
  Jakub Mitoraj ecd50cfdd3 fix communication via gnome proxy 5 months ago

+ 6 - 18
src/lib.rs

@@ -4,7 +4,7 @@ use prelude::Decrypter;
 pub use std::net::IpAddr;
 use swarm_consensus::GnomeId;
 use swarm_consensus::NetworkSettings;
-use swarm_consensus::Request;
+// use swarm_consensus::Request;
 // use std::time::Duration;
 use swarm_consensus::start;
 // use swarm_consensus::GnomeId;
@@ -16,8 +16,9 @@ mod crypto;
 mod data_conversion;
 mod networking;
 use networking::run_networking_tasks;
+use std::sync::mpsc::channel;
 use std::sync::mpsc::Receiver;
-use std::sync::mpsc::{channel, Sender};
+use swarm_consensus::NotificationBundle;
 
 pub mod prelude {
     pub use crate::crypto::{
@@ -37,15 +38,7 @@ pub mod prelude {
 pub fn create_manager_and_receiver(
     gnome_id: GnomeId,
     network_settings: Option<NetworkSettings>,
-) -> (
-    Manager,
-    Receiver<(
-        String,
-        Sender<Request>,
-        Sender<u32>,
-        Receiver<NetworkSettings>,
-    )>,
-) {
+) -> (Manager, Receiver<NotificationBundle>) {
     let (networking_sender, networking_receiver) = channel();
     // let network_settings = None;
     let mgr = start(gnome_id, network_settings, networking_sender);
@@ -59,18 +52,13 @@ pub async fn activate_gnome(
     port: u16,
     buffer_size_bytes: u32,
     uplink_bandwith_bytes_sec: u32,
-    receiver: Receiver<(
-        String,
-        Sender<Request>,
-        Sender<u32>,
-        Receiver<NetworkSettings>,
-    )>,
+    receiver: Receiver<NotificationBundle>,
     decrypter: Decrypter,
     pub_key_pem: String,
 ) {
     run_networking_tasks(
         // gnome_id,
-        ip,
+        // ip,
         // broadcast,
         port,
         buffer_size_bytes,

+ 81 - 72
src/networking/client.rs

@@ -10,13 +10,12 @@ use crate::networking::subscription::Subscription;
 use crate::prelude::Decrypter;
 use crate::prelude::Encrypter;
 use async_std::net::UdpSocket;
-use async_std::task::{spawn, yield_now};
-use std::net::{IpAddr, SocketAddr};
+use async_std::task::spawn;
+use std::net::SocketAddr;
 use std::sync::mpsc::{channel, Receiver, Sender};
 use swarm_consensus::GnomeId;
 
 pub async fn run_client(
-    host_ip: IpAddr,
     mut receiver: Receiver<Subscription>,
     sender: Sender<Subscription>,
     // req_sender: Sender<Vec<u8>>,
@@ -33,7 +32,9 @@ pub async fn run_client(
     //     so that in operates on dedicated socket it receives as argument
 
     println!("SKT CLIENT");
-    let result = UdpSocket::bind(SocketAddr::new(host_ip, 0)).await;
+    // let result = UdpSocket::bind(SocketAddr::new(host_ip, 0)).await;
+    let client_addr: SocketAddr = SocketAddr::new("0.0.0.0".parse().unwrap(), 0);
+    let result = UdpSocket::bind(client_addr).await;
     if result.is_err() {
         println!("SKT couldn't bind to address");
         return;
@@ -86,83 +87,91 @@ async fn establish_secure_connection(
     // ) -> Receiver<[u8; 32]> {
 ) {
     let mut remote_gnome_id: GnomeId = GnomeId(0);
-    let mut session_key: SessionKey = SessionKey::from_key(&[0; 32]);
-    let mut remote_addr: SocketAddr = "0.0.0.0:0".parse().unwrap();
+    let session_key: SessionKey; // = SessionKey::from_key(&[0; 32]);
+    let mut remote_addr: SocketAddr; // = "0.0.0.0:0".parse().unwrap();
+    let mut count;
     let mut recv_buf = [0u8; 1100];
+    loop {
+        let recv_result = socket.recv_from(&mut recv_buf).await;
+        if recv_result.is_ok() {
+            (count, remote_addr) = recv_result.unwrap();
+            if count > 1 {
+                break;
+            }
+        }
+    }
 
-    let recv_result = socket.recv_from(&mut recv_buf).await;
     // let mut decoded_key: Option<[u8; 32]> = None;
     // println!("Dec key: {:?}", decoded_key);
-    if let Ok((count, remote_adr)) = recv_result {
-        remote_addr = remote_adr;
-        println!("Received {}bytes", count);
-        let decoded_key = decrypter.decrypt(&recv_buf[..count]);
+    // if let Ok((count, remote_adr)) = recv_result {
+    //     remote_addr = remote_adr;
+    println!("Received {} bytes", count);
+    let decoded_key = decrypter.decrypt(&recv_buf[..count]);
 
-        // let _res = req_sender.send(Vec::from(&recv_buf[..count]));
-        // println!("Sent decode request: {:?}", _res);
-        // loop {
-        //     let response = resp_receiver.try_recv();
-        //     if let Ok(symmetric_key) = response {
-        //         // match subs_resp {
-        //         //     Subscription::KeyDecoded(symmetric_key) => {
-        //         //         // decoded_port = port;
-        //         decoded_key = Some(symmetric_key);
-        //         break;
-        //         //     }
-        //         //     Subscription::DecodeFailure => {
-        //         //         println!("Failed decoding symmetric key!");
-        //         //         break;
-        //         //     }
-        //         //     _ => println!("Unexpected message: {:?}", subs_resp),
-        //         // }
-        //     } else {
-        //         // println!("rec: {:?}", response);
-        //     }
-        //     yield_now().await
-        // }
-        if let Ok(sym_key) = decoded_key {
-            println!("Got session key: {:?}", sym_key);
-            session_key = SessionKey::from_key(&sym_key.try_into().unwrap());
-        } else {
-            println!("Unable to decode key");
-            // return resp_receiver;
-            return;
-        }
+    // let _res = req_sender.send(Vec::from(&recv_buf[..count]));
+    // println!("Sent decode request: {:?}", _res);
+    // loop {
+    //     let response = resp_receiver.try_recv();
+    //     if let Ok(symmetric_key) = response {
+    //         // match subs_resp {
+    //         //     Subscription::KeyDecoded(symmetric_key) => {
+    //         //         // decoded_port = port;
+    //         decoded_key = Some(symmetric_key);
+    //         break;
+    //         //     }
+    //         //     Subscription::DecodeFailure => {
+    //         //         println!("Failed decoding symmetric key!");
+    //         //         break;
+    //         //     }
+    //         //     _ => println!("Unexpected message: {:?}", subs_resp),
+    //         // }
+    //     } else {
+    //         // println!("rec: {:?}", response);
+    //     }
+    //     yield_now().await
+    // }
+    if let Ok(sym_key) = decoded_key {
+        println!("Got session key: {:?}", sym_key);
+        session_key = SessionKey::from_key(&sym_key.try_into().unwrap());
+    } else {
+        println!("Unable to decode key");
+        // return resp_receiver;
+        return;
+    }
 
-        let dedicated_socket =
-            UdpSocket::bind(SocketAddr::new(socket.local_addr().unwrap().ip(), 0))
-                .await
-                .unwrap();
-        dedicated_socket.connect(remote_addr).await.unwrap();
-        let _ = dedicated_socket.send(&[0u8]).await;
+    let dedicated_socket = UdpSocket::bind(SocketAddr::new(socket.local_addr().unwrap().ip(), 0))
+        .await
+        .unwrap();
+    dedicated_socket.connect(remote_addr).await.unwrap();
+    let _ = dedicated_socket.send(&[0u8]).await;
 
-        let mut recv_buf = [0u8; 1100];
-        let recv_result = dedicated_socket.recv(&mut recv_buf).await;
-        if let Ok(count) = recv_result {
-            println!("Received {}bytes", count);
-            let decr_res = session_key.decrypt(&recv_buf[..count]);
-            if let Ok(remote_pubkey_pem) = decr_res {
-                let remote_id_pub_key_pem = std::str::from_utf8(&remote_pubkey_pem).unwrap();
-                let encr = Encrypter::create_from_data(remote_id_pub_key_pem).unwrap();
-                remote_gnome_id = GnomeId(encr.hash());
-                println!("Remote GnomeId: {}", remote_gnome_id);
-                println!(
-                    "Decrypted PEM using session key:\n {:?}",
-                    remote_id_pub_key_pem
-                );
-            }
+    let mut recv_buf = [0u8; 1100];
+    let recv_result = dedicated_socket.recv(&mut recv_buf).await;
+    if let Ok(count) = recv_result {
+        println!("Received {} bytes", count);
+        let decr_res = session_key.decrypt(&recv_buf[..count]);
+        if let Ok(remote_pubkey_pem) = decr_res {
+            let remote_id_pub_key_pem = std::str::from_utf8(&remote_pubkey_pem).unwrap();
+            let encr = Encrypter::create_from_data(remote_id_pub_key_pem).unwrap();
+            remote_gnome_id = GnomeId(encr.hash());
+            println!("Remote GnomeId: {}", remote_gnome_id);
+            println!(
+                "Decrypted PEM using session key:\n {:?}",
+                remote_id_pub_key_pem
+            );
         }
-
-        spawn(prepare_and_serve(
-            dedicated_socket,
-            remote_gnome_id,
-            session_key,
-            swarm_names,
-            sender.clone(),
-            pipes_sender.clone(),
-        ));
-        return;
     }
+
+    spawn(prepare_and_serve(
+        dedicated_socket,
+        remote_gnome_id,
+        session_key,
+        swarm_names,
+        sender.clone(),
+        pipes_sender.clone(),
+    ));
+    return;
+    // }
     // resp_receiver
 }
 

+ 85 - 15
src/networking/common.rs

@@ -2,7 +2,7 @@ use crate::networking::stun::{
     build_request, stun_decode, stun_send, StunChangeRequest, StunMessage,
 };
 use crate::networking::subscription::Subscription;
-use async_std::net::UdpSocket;
+use async_std::net::{IpAddr, Ipv4Addr, UdpSocket};
 use async_std::task::{sleep, yield_now};
 use bytes::{BufMut, BytesMut};
 use futures::{
@@ -133,20 +133,77 @@ pub fn create_a_neighbor_for_each_swarm(
 // what type of NAT we are behind.
 pub async fn are_we_behind_a_nat(socket: &UdpSocket) -> Result<(bool, SocketAddr), String> {
     let request = build_request(None);
-    let _send_result = stun_send(socket, request, None, None).await;
-    let mut bytes: [u8; 128] = [0; 128];
-    let recv_result = socket.recv_from(&mut bytes).await;
-    if let Ok((_count, _from)) = recv_result {
+    let ip = IpAddr::V4(Ipv4Addr::new(108, 177, 15, 127));
+    let port = 3478;
+    let _send_result = stun_send(socket, request, Some(ip), Some(port)).await;
+    // let mut bytes: [u8; 128] = [0; 128];
+
+    let t1 = time_out(Duration::from_secs(3), None).fuse();
+    // TODO: serv pairs of sender-receiver
+    let t2 = wait_for_response(socket, SocketAddr::new(ip, port)).fuse();
+
+    pin_mut!(t1, t2);
+
+    let (received, bytes) = select! {
+        _result1 = t1 =>  (false,[0;128]),
+        result2 = t2 => (true,result2),
+    };
+    //new above
+    // let recv_result = socket.recv_from(&mut bytes).await;
+    // if let Ok((_count, _from)) = recv_result {
+    if received {
         let msg = stun_decode(&bytes);
         // println!("Received {} bytes from {:?}:\n{:?}", count, from, msg);
         let mapped_address = msg.mapped_address().unwrap();
-        if mapped_address == socket.local_addr().unwrap() {
+        if UdpSocket::bind((mapped_address.ip(), 0)).await.is_ok() {
+            // if mapped_address == socket.local_addr().unwrap() {
             Ok((false, mapped_address))
         } else {
             Ok((true, mapped_address))
         }
     } else {
-        Err(format!("Did not receive STUN response: {:?}", recv_result))
+        Err("Timed out while waiting for STUN response".to_string())
+    }
+}
+
+pub async fn wait_for_bytes(socket: &UdpSocket) {
+    // println!("waiting for bytes");
+    let mut bytes: [u8; 10] = [0; 10];
+    loop {
+        let recv_res = socket.recv_from(&mut bytes).await;
+        // println!("Recv: {:?}", recv_res);
+        if let Ok((count, from)) = recv_res {
+            // println!("Recv: {} from {:?}", bytes[0], from);
+            if count == 1 && bytes[0] == 1 {
+                return;
+            }
+        }
+    }
+}
+
+async fn wait_for_response(socket: &UdpSocket, addr: SocketAddr) -> [u8; 128] {
+    // println!("waiting for bytes");
+    let mut bytes: [u8; 128] = [0; 128];
+    loop {
+        let _recv_res = socket.recv_from(&mut bytes).await;
+
+        match _recv_res {
+            Ok((_count, from)) => {
+                if addr == from {
+                    // println!("Socket recv: {:?} {:?}", _recv_res, bytes);
+                    return bytes;
+                }
+            }
+            _ => continue,
+        }
+
+        // println!("Recv: {:?}", recv_res);
+        //     if let Ok((count, from)) = recv_res {
+        //         // println!("Recv: {} from {:?}", bytes[0], from);
+        //         if count == 1 && bytes[0] == 1 {
+        //             return;
+        //         }
+        //     }
     }
 }
 // This procedure for NAT identification has two stages:
@@ -189,7 +246,7 @@ pub async fn identify_nat(socket: &UdpSocket) -> Nat {
         yield_now().await;
     }
     if received_responses.is_empty() {
-        println!("NAT type: Symmetric");
+        print!("NAT type: Symmetric ");
         Nat::Symmetric
     } else {
         let mut first_response_received = false;
@@ -202,13 +259,13 @@ pub async fn identify_nat(socket: &UdpSocket) -> Nat {
             }
         }
         if first_response_received {
-            println!("NAT type: FullCone");
+            print!("NAT type: FullCone ");
             Nat::FullCone
         } else if second_response_received {
-            println!("NAT type: AddressRestrictedCone");
+            print!("NAT type: AddressRestrictedCone ");
             Nat::AddressRestrictedCone
         } else {
-            println!("NAT type: Unknown");
+            print!("NAT type: Unknown ");
             Nat::Unknown
         }
     }
@@ -223,6 +280,19 @@ async fn receive_response(socket: &UdpSocket) -> StunMessage {
     response
 }
 
+pub async fn time_out(mut time: Duration, sender: Option<Sender<()>>) {
+    let time_step = Duration::from_millis(100);
+    while time > Duration::ZERO {
+        print!(".");
+        time -= time_step;
+        sleep(time_step).await;
+    }
+    println!("Timed out after: {:?}", time);
+    if let Some(sender) = sender {
+        let _ = sender.send(());
+    }
+}
+
 // In order to find how ports are assigned we send four messages to STUN server:
 // First one is identical to the very first request we have sent to STUN server
 // and can be skipped
@@ -255,7 +325,7 @@ pub async fn discover_port_allocation_rule(socket: &UdpSocket) -> (PortAllocatio
         let msg = stun_decode(&bytes);
         if let Some(changed_address) = msg.changed_address() {
             let m_addr_1 = msg.mapped_address().unwrap();
-            println!("Mapped address 1: {:?}", m_addr_1);
+            // println!("Mapped address 1: {:?}", m_addr_1);
             let port_1 = m_addr_1.port();
             stun_send(
                 socket,
@@ -268,7 +338,7 @@ pub async fn discover_port_allocation_rule(socket: &UdpSocket) -> (PortAllocatio
             let _recv_result = socket.recv_from(&mut bytes).await;
             let msg = stun_decode(&bytes);
             let m_addr_2 = msg.mapped_address().unwrap();
-            println!("Mapped address 2: {:?}", m_addr_2);
+            // println!("Mapped address 2: {:?}", m_addr_2);
             let port_2 = m_addr_2.port();
             stun_send(
                 socket,
@@ -281,7 +351,7 @@ pub async fn discover_port_allocation_rule(socket: &UdpSocket) -> (PortAllocatio
             let _recv_result = socket.recv_from(&mut bytes).await;
             let msg = stun_decode(&bytes);
             let m_addr_3 = msg.mapped_address().unwrap();
-            println!("Mapped address 3: {:?}", m_addr_3);
+            // println!("Mapped address 3: {:?}", m_addr_3);
             let port_3 = m_addr_3.port();
             stun_send(
                 socket,
@@ -294,7 +364,7 @@ pub async fn discover_port_allocation_rule(socket: &UdpSocket) -> (PortAllocatio
             let _recv_result = socket.recv_from(&mut bytes).await;
             let msg = stun_decode(&bytes);
             let m_addr_4 = msg.mapped_address().unwrap();
-            println!("Mapped address 4: {:?}", m_addr_4);
+            // println!("Mapped address 4: {:?}", m_addr_4);
             let port_4 = m_addr_4.port();
             let p1_and_p2_eq = port_1 == port_2;
             let p3_and_p4_eq = port_3 == port_4;

+ 94 - 53
src/networking/direct_punch.rs

@@ -1,18 +1,21 @@
-use super::common::are_we_behind_a_nat;
+// use super::common::are_we_behind_a_nat;
 use super::common::discover_network_settings;
 use super::token::Token;
-use crate::networking::{holepunch::punch_it, subscription::Subscription};
-use crate::prelude::{Decrypter, Encrypter};
-use crate::GnomeId;
+use crate::networking::{
+    holepunch::{punch_it, start_communication},
+    subscription::Subscription,
+};
+use crate::prelude::Decrypter;
+// use crate::GnomeId;
 use async_std::net::UdpSocket;
 use async_std::task::{spawn, yield_now};
 use std::collections::HashMap;
-use std::net::IpAddr;
+use std::net::{IpAddr, Ipv4Addr};
 use std::sync::mpsc::{channel, Receiver, Sender};
-use swarm_consensus::{Nat, NetworkSettings, PortAllocationRule, Request};
+use swarm_consensus::{NetworkSettings, Request};
 
 pub async fn direct_punching_service(
-    host_ip: IpAddr,
+    // host_ip: IpAddr,
     sub_sender: Sender<Subscription>,
     decrypter: Decrypter,
     pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
@@ -20,9 +23,6 @@ pub async fn direct_punching_service(
     pub_key_pem: String,
 ) {
     println!("Waiting for direct connect requests.");
-    let loc_encr = Encrypter::create_from_data(&pub_key_pem).unwrap();
-    let gnome_id = GnomeId(loc_encr.hash());
-    drop(loc_encr);
 
     // TODO: here we need to write a new procedure.
     // It all depends on NAT and port assignment rule,
@@ -56,20 +56,23 @@ pub async fn direct_punching_service(
         HashMap::with_capacity(10);
     let (send_other, recv_other) = channel();
     let (send_my, recv_my) = channel();
+    // println!("before sm spawn");
     spawn(socket_maintainer(
-        host_ip,
+        // host_ip,
         pub_key_pem.clone(),
-        gnome_id,
+        // gnome_id,
         sub_sender.clone(),
         decrypter.clone(),
         pipes_sender.clone(),
         send_my,
         recv_other,
     ));
+    // println!("after sm spawn");
 
     let mut waiting_for_my_settings = false;
     let mut request_sender: Option<Sender<Request>> = None;
     loop {
+        // print!("dps");
         if let Ok((swarm_name, req_sender, net_set_recv)) = pipes_receiver.try_recv() {
             swarms.insert(swarm_name, (req_sender, net_set_recv));
         }
@@ -105,69 +108,107 @@ pub async fn direct_punching_service(
 }
 
 async fn socket_maintainer(
-    host_ip: IpAddr,
+    // host_ip: IpAddr,
     pub_key_pem: String,
-    gnome_id: GnomeId,
+    // gnome_id: GnomeId,
     sub_sender: Sender<Subscription>,
     decrypter: Decrypter,
     pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
     send_my: Sender<NetworkSettings>,
     recv_other: Receiver<(String, NetworkSettings)>,
 ) {
-    let bind_port = 0;
-    let bind_addr = (host_ip, bind_port);
+    // println!("SM start");
+    // let bind_port = 0;
+    // let bind_addr = (host_ip, bind_port);
+    // let bind_addr = (IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0u16);
+    let bind_addr = (IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0u16);
     let mut socket = UdpSocket::bind(bind_addr).await.unwrap();
     let mut my_settings = discover_network_settings(&mut socket).await;
 
     // TODO: race two tasks: either timeout or receive NetworkSettings from pipe
+    //       if timeout - we need to refresh the socket by sending to STUN server
     loop {
+        // print!("sm");
         let recv_result = recv_other.try_recv();
         if let Ok((swarm_name, other_settings)) = recv_result {
             // TODO: discover with stun server
-            spawn(punch_it(
+            println!("recvd other!");
+            let _ = send_my.send(my_settings);
+            spawn(punch_and_communicate(
                 socket,
-                swarm_name.clone(),
+                bind_addr,
                 pub_key_pem.clone(),
-                gnome_id,
+                // gnome_id.clone(),
+                sub_sender.clone(),
                 decrypter.clone(),
-                // my_settings.port_range,
-                // other_settings.port_range,
-                (my_settings, other_settings),
-                // req_sender.clone(),
                 pipes_sender.clone(),
-                sub_sender.clone(),
+                swarm_name,
+                (my_settings, other_settings),
             ));
-            match my_settings.port_allocation {
-                (PortAllocationRule::AddressSensitive, value) => {
-                    if value > 0 {
-                        my_settings.pub_port += value as u16
-                    } else {
-                        my_settings.pub_port -= value.abs() as u16
-                    }
-                }
-                (PortAllocationRule::PortSensitive, value) => {
-                    if value > 0 {
-                        my_settings.pub_port += value as u16
-                    } else {
-                        my_settings.pub_port -= value.abs() as u16
-                    }
-                }
-                _ => {}
-            }
-            let _ = send_my.send(my_settings);
             socket = UdpSocket::bind(bind_addr).await.unwrap();
-            if my_settings.nat_type != Nat::None {
-                let behind_a_nat = are_we_behind_a_nat(&socket).await;
-                if let Ok((_is_there_nat, public_addr)) = behind_a_nat {
-                    let ip = public_addr.ip();
-                    my_settings.pub_ip = ip;
-                    let port = public_addr.port();
-                    my_settings.pub_port = port;
-                }
-            } else {
-                my_settings.pub_port = socket.local_addr().unwrap().port();
-            }
+            my_settings = discover_network_settings(&mut socket).await;
+            // if my_settings.nat_type != Nat::None {
+            //     let behind_a_nat = are_we_behind_a_nat(&socket).await;
+            //     if let Ok((_is_there_nat, public_addr)) = behind_a_nat {
+            //         let ip = public_addr.ip();
+            //         my_settings.pub_ip = ip;
+            //         let port = public_addr.port();
+            //         my_settings.pub_port = port;
+            //     }
+            // } else {
+            //     my_settings.pub_port = socket.local_addr().unwrap().port();
+            // }
         }
         yield_now().await;
     }
 }
+
+async fn punch_and_communicate(
+    socket: UdpSocket,
+    bind_addr: (IpAddr, u16),
+    pub_key_pem: String,
+    // gnome_id: GnomeId,
+    sub_sender: Sender<Subscription>,
+    decrypter: Decrypter,
+    pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
+    swarm_name: String,
+    (my_settings, other_settings): (NetworkSettings, NetworkSettings),
+) {
+    println!("DPunch {:?} and {:?}", bind_addr, other_settings);
+    // let (socket_sender, socket_receiver) = channel();
+    // spawn(punch_it(
+    let punch_it_result = punch_it(
+        socket,
+        // socket_sender,
+        // my_settings.port_range,
+        // other_settings.port_range,
+        (my_settings, other_settings),
+        // req_sender.clone(),
+    )
+    .await;
+    // let socket_recv_result = socket_receiver.recv();
+    if let Some(dedicated_socket) = punch_it_result {
+        spawn(start_communication(
+            dedicated_socket,
+            swarm_name.clone(),
+            pub_key_pem.clone(),
+            // gnome_id,
+            decrypter.clone(),
+            pipes_sender.clone(),
+            sub_sender.clone(),
+        ));
+        // return;
+    };
+
+    // if let (
+    //     PortAllocationRule::AddressSensitive | PortAllocationRule::PortSensitive,
+    //     value,
+    // ) = my_settings.port_allocation
+    // {
+    //     if value > 0 {
+    //         my_settings.pub_port += value as u16
+    //     } else {
+    //         my_settings.pub_port -= (value as i16).unsigned_abs()
+    //     }
+    // };
+}

+ 456 - 216
src/networking/holepunch.rs

@@ -1,6 +1,7 @@
 use super::Token;
 use crate::crypto::{generate_symmetric_key, SessionKey};
 use crate::networking::client::prepare_and_serve;
+use crate::networking::common::{discover_network_settings, time_out, wait_for_bytes};
 use crate::networking::subscription::Subscription;
 use crate::prelude::{Decrypter, Encrypter};
 use async_std::net::UdpSocket;
@@ -10,13 +11,13 @@ use futures::{
     pin_mut,
     select,
 };
-use std::collections::VecDeque;
+use std::collections::{HashMap, VecDeque};
 use std::net::{IpAddr, SocketAddr};
 use std::sync::mpsc::{channel, TryRecvError};
 use std::sync::mpsc::{Receiver, Sender};
 use std::time::Duration;
-use swarm_consensus::Nat;
 use swarm_consensus::{GnomeId, NetworkSettings};
+use swarm_consensus::{Nat, PortAllocationRule};
 
 // let puncher = "tudbut.de:4277";
 // let swarm_name = "irdm".to_string();
@@ -41,8 +42,6 @@ pub async fn holepunch(
     puncher: &str,
     host_ip: IpAddr,
     sub_sender: Sender<Subscription>,
-    // req_sender: Sender<Vec<u8>>,
-    // mut resp_receiver: Receiver<[u8; 32]>,
     decrypter: Decrypter,
     pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
     receiver: Receiver<String>,
@@ -51,8 +50,7 @@ pub async fn holepunch(
     // ) -> (SocketAddr, UdpSocket, bool) {
     println!("Holepunch started");
     let mut magic_ports = MagicPorts::new();
-    let loc_encr = Encrypter::create_from_data(&pub_key_pem).unwrap();
-    let gnome_id = GnomeId(loc_encr.hash());
+    let sleep_duration = Duration::from_millis(50);
 
     loop {
         let recv_result = receiver.try_recv();
@@ -61,7 +59,8 @@ pub async fn holepunch(
             match err {
                 TryRecvError::Disconnected => break,
                 _ => {
-                    yield_now().await;
+                    sleep(sleep_duration).await;
+                    // yield_now().await;
                     continue;
                 }
             }
@@ -69,173 +68,332 @@ pub async fn holepunch(
             // return;
         }
         let swarm_name: String = recv_result.unwrap();
-        println!("Establishing connection with helper...");
         let bind_port = magic_ports.next();
-        let bind_addr = (host_ip, bind_port);
-        let holepunch = UdpSocket::bind(bind_addr)
-            .await
-            .expect("unable to create socket");
-        // holepunch
-        //     .connect(puncher)
-        //     .await
-        //     .expect("unable to connect to helper");
-        let bytes = swarm_name.as_bytes();
-        let mut buf = vec![0_u8; 200];
-        buf[..bytes.len().min(200)].copy_from_slice(&bytes[..bytes.len().min(200)]);
-        // Initial request
-        holepunch
-            .send_to(&buf, puncher)
-            .await
-            .expect("unable to talk to helper");
-        println!("Waiting for UDPunch server to respond");
-        holepunch
-            .recv_from(&mut buf)
-            .await
-            .expect("unable to receive from helper");
-        // println!("Holepunch responded: {:?}", buf);
-        // TODO: here we should temporarily send some invalid packet to proxy
-        // let next_port = holepunch.local_addr().unwrap().port().saturating_add(1);
-        // let holepunch = UdpSocket::bind((host_ip, next_port))
-        //     .await
-        //     .expect("unable to create second socket");
-        // // Second request
-        // holepunch
-        //     .send_to(&buf, puncher)
-        //     .await
-        //     .expect("unable to talk to helper");
-        // println!("Waiting for UDPunch server to respond");
-        // holepunch
-        //     .recv_from(&mut buf)
-        //     .await
-        //     .expect("unable to receive from helper");
-        // // buf should now contain our partner's address data.
-        println!("Holepunch responded: {:?}", buf);
-        buf.retain(|e| *e != 0);
-        let remote_addr = String::from_utf8_lossy(buf.as_slice()).to_string();
-        let remote_1_addr = remote_addr
-            .split(':')
-            .next()
-            .unwrap()
-            .parse::<IpAddr>()
-            .expect("Unable to parse remote address");
-        let remote_1_port = remote_addr
-            .split(':')
-            .nth(1)
-            .unwrap()
-            .parse::<u16>()
-            .expect("Unable to parse remote port");
-        // // Third request
-        // let holepunch = UdpSocket::bind((host_ip, next_port.saturating_add(1)))
-        //     .await
-        //     .expect("unable to create third socket");
-        // holepunch
-        //     .send_to(&buf, puncher)
-        //     .await
-        //     .expect("unable to talk to helper");
-        // println!("Waiting for UDPunch server to respond");
-        // holepunch
-        //     .recv_from(&mut buf)
-        //     .await
-        //     .expect("unable to receive from helper");
-        // println!("Holepunch responded: {:?}", buf);
-        // // buf should now contain our partner's address data.
-        // buf.retain(|e| *e != 0);
-        // let remote_addr = String::from_utf8_lossy(buf.as_slice()).to_string();
-        // let remote_2_addr = remote_addr
-        //     .split(':')
-        //     .next()
-        //     .unwrap()
-        //     .parse::<IpAddr>()
-        //     .expect("Unable to parse remote address");
-        // let remote_2_port = remote_addr
-        //     .split(':')
-        //     .nth(1)
-        //     .unwrap()
-        //     .parse::<u16>()
-        //     .expect("Unable to parse remote port");
-        // // println!("to be bind addr: {}", bind_addr);
-        // let delta_port = remote_2_port - remote_1_port;
-        // let remote_e_port = remote_2_port + delta_port;
-        let remote_controls_port = magic_ports.is_magic(remote_1_port);
-        println!(
-            "Holepunching {} (magic?: {:?}) and :{} (you) for {}.",
-            remote_addr,
-            remote_controls_port,
-            holepunch.local_addr().unwrap().port(),
-            swarm_name
-        );
-
-        // // TODO: end this brutality, follow the rigtheous path
-        // let (remote_port_start, remote_port_end) = if remote_controls_port {
-        //     (remote_1_port, remote_1_port + 50) //.collect::<std::vec::Vec<u16>>();
-        //                                         // (remote_e_port, remote_e_port + 50) //.collect::<std::vec::Vec<u16>>();
-        //                                         // VecDeque::from(vec)
-        // } else {
-        //     // let vec =
-        //     (remote_1_port - 25, remote_1_port + 25) //.collect::<std::vec::Vec<u16>>();
-        //                                              // (remote_e_port - 25, remote_e_port + 25) //.collect::<std::vec::Vec<u16>>();
-        //                                              // VecDeque::from(vec)
-        // };
-        spawn(punch_it(
-            holepunch,
+        spawn(holepunch_task(
+            // host_ip,
+            puncher.to_string(),
+            sub_sender.clone(),
+            decrypter.clone(),
+            pipes_sender.clone(),
+            pub_key_pem.clone(),
             swarm_name,
+            (host_ip, bind_port),
+        ));
+    }
+}
+
+async fn holepunch_task(
+    // host_ip: IpAddr,
+    puncher: String,
+    sub_sender: Sender<Subscription>,
+    decrypter: Decrypter,
+    pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
+    pub_key_pem: String,
+    swarm_name: String,
+    mut bind_addr: (IpAddr, u16),
+) {
+    let magic_ports = MagicPorts::new();
+    println!("Round '1'");
+    // TODO: read only magic_ports!
+    let mut holepunch = UdpSocket::bind(bind_addr)
+        .await
+        .expect("unable to create socket");
+
+    let my_settings = discover_network_settings(&mut holepunch).await;
+    let remote_socket_opt =
+        ask_proxy_for_remote_socket(&holepunch, puncher.clone(), swarm_name.clone()).await;
+    if remote_socket_opt.is_none() {
+        println!("Unable to receive remote socket addr");
+        return;
+    }
+    let (remote_1_addr, remote_1_port) = remote_socket_opt.unwrap();
+    let remote_controls_port = magic_ports.is_magic(remote_1_port);
+    let (remote_nat, remote_port_rule) = if remote_controls_port {
+        (
+            Nat::SymmetricWithPortControl,
+            (PortAllocationRule::FullCone, 0),
+        )
+    } else {
+        //TODO: we do not know PortAllocationRule
+        (Nat::Unknown, (PortAllocationRule::PortSensitive, 1))
+    };
+    println!(
+        "Holepunching {} (magic?: {:?}) and :{} (you) for {}.",
+        remote_1_addr,
+        remote_controls_port,
+        holepunch.local_addr().unwrap().port(),
+        swarm_name
+    );
+    let punch_it_result = punch_it(
+        holepunch,
+        (
+            my_settings,
+            NetworkSettings {
+                pub_ip: remote_1_addr,
+                pub_port: remote_1_port,
+                nat_type: remote_nat,
+                port_allocation: remote_port_rule,
+            },
+        ),
+    )
+    .await;
+    if let Some(dedicated_socket) = punch_it_result {
+        spawn(start_communication(
+            dedicated_socket,
+            swarm_name.clone(),
             pub_key_pem.clone(),
-            gnome_id,
+            // gnome_id,
             decrypter.clone(),
-            // (bind_port + 1, bind_port + 50),
-            // (remote_port_start, remote_port_end),
-            // (remote_2_addr, remote_e_port),
-            (
-                NetworkSettings::default(),
-                NetworkSettings {
-                    pub_ip: remote_1_addr,
-                    pub_port: remote_1_port,
-                    ..Default::default()
-                },
-            ),
-            // (remote_1_addr, remote_1_port),
-            // req_sender.clone(),
             pipes_sender.clone(),
             sub_sender.clone(),
         ));
+        return;
+    };
+
+    println!("Round '2'");
+    // 1. We need to define a common phrase for two endpoints - based on both ip addresses
+    // Since we know our public IP and remote public IP
+    let my_pub_ip = my_settings.pub_ip.to_string();
+    let his_pub_ip = remote_1_addr.to_string();
+    let mut common_phrase;
+    if my_pub_ip > his_pub_ip {
+        common_phrase = my_pub_ip;
+        common_phrase.push_str(&his_pub_ip);
+    } else {
+        common_phrase = his_pub_ip;
+        common_phrase.push_str(&my_pub_ip);
+    }
+    // 2. Then we create a new socket and send via that socket a request to connect with
+    // other host via that common phrase
+    bind_addr.1 -= 3;
+    holepunch = UdpSocket::bind(bind_addr)
+        .await
+        .expect("unable to create socket");
 
-        // println!("SADR: {:?}", s_addr);
-        // holepunch
-        //     .set_read_timeout(Some(Duration::from_secs(5)))
-        //     .unwrap();
-        // holepunch
-        //     .set_write_timeout(Some(Duration::from_secs(1)))
-        //     .unwrap();
-        // println!("Holepunch and connection successful.");
-        // let mut my_external_addr = vec![0; 200];
-        // let _result = holepunch.recv(&mut my_external_addr).await;
-        // my_external_addr.retain(|e| *e != 0);
-        // if !my_external_addr.is_empty() {
-        //     let my_addr = String::from_utf8_lossy(&my_external_addr).to_string();
-        //     println!("Received external addr: {}", my_addr);
-        //     let my_e_port = my_addr
-        //         .split(':')
-        //         .nth(1)
-        //         .unwrap()
-        //         .parse::<u16>()
-        //         .expect("Unable to parse external port");
-        //     let _ = holepunch.send(bind_addr.as_bytes()).await;
-        //     // println!("Sent: {:?}", bind_addr);
-        //     if holepunch.local_addr().expect("Dunno my local addr").port() == my_e_port {
-        //         println!("I can define my external port number!");
-        //     }
-        //     let _result = holepunch.recv(&mut my_external_addr).await;
-        //     let should_be_server = my_e_port < remote_adr.port();
-        //     (
-        //         // holepunch.local_addr().unwrap(),
-        //         remote_adr,
-        //         holepunch,
-        //         should_be_server,
-        //     )
-        // } else {
-        //     panic!("Did not receive data from remote host");
-        // }
+    let remote_socket_opt =
+        ask_proxy_for_remote_socket(&holepunch, puncher.clone(), common_phrase.clone()).await;
+    if remote_socket_opt.is_none() {
+        println!("Unable to receive remote socket addr");
+        return;
+    }
+    // 3. We should receive a public IP and PORT of remote host
+    let (remote_1_addr, remote_1_port) = remote_socket_opt.unwrap();
+    println!("First remote socket: {:?}:{}", remote_1_addr, remote_1_port);
+    // 4. We repeat steps 1, 2, 3 and now we have two remote socket addresses.
+    bind_addr.1 += 1;
+    holepunch = UdpSocket::bind(bind_addr)
+        .await
+        .expect("unable to create socket");
+    common_phrase.push_str("bar");
+    let remote_socket_opt =
+        ask_proxy_for_remote_socket(&holepunch, puncher.clone(), common_phrase).await;
+    if remote_socket_opt.is_none() {
+        println!("Unable to receive remote socket addr");
+        return;
     }
+    let (remote_2_addr, remote_2_port) = remote_socket_opt.unwrap();
+    println!(
+        "Second remote socket: {:?}:{}",
+        remote_2_addr, remote_2_port
+    );
+
+    // 5. We calculate delta_port based on two received public ports from proxy
+    let delta_port = remote_2_port - remote_1_port;
+    println!("Delta port: {}", delta_port);
+    // 6. We add delta_port to last port number we received from proxy
+    let target_remote_port = remote_2_port + delta_port;
+    // 7. We create a new socket and try to punch through to remote host using
+    //    known remote public IP and PORT calculated in point 6.
+    bind_addr.1 += 1;
+    holepunch = UdpSocket::bind(bind_addr)
+        .await
+        .expect("unable to create socket");
+    // 8. We pass that socket to punch_it function and await it.
+    let punch_it_result = punch_it(
+        holepunch,
+        (
+            my_settings,
+            NetworkSettings {
+                pub_ip: remote_1_addr,
+                pub_port: target_remote_port,
+                nat_type: remote_nat,
+                port_allocation: remote_port_rule, //TODO maybe we can learn some more on this?
+            },
+        ),
+    )
+    .await;
+    if let Some(dedicated_socket) = punch_it_result {
+        spawn(start_communication(
+            dedicated_socket,
+            swarm_name.clone(),
+            pub_key_pem.clone(),
+            decrypter.clone(),
+            pipes_sender.clone(),
+            sub_sender.clone(),
+        ));
+        return;
+    };
+
+    println!("Round '3'");
+    let my_port_min = bind_addr.1 + 2;
+    let his_port_min = target_remote_port + delta_port;
+    let his_port_max = target_remote_port + (50 * delta_port);
+    let timeout = Duration::from_secs(120);
+    let punch_it_result = cluster_punch_it(
+        bind_addr.0,
+        remote_1_addr,
+        my_port_min,
+        50,
+        (his_port_min, delta_port, his_port_max),
+        timeout,
+    )
+    .await;
+    if let Some(dedicated_socket) = punch_it_result {
+        spawn(start_communication(
+            dedicated_socket,
+            swarm_name.clone(),
+            pub_key_pem.clone(),
+            decrypter.clone(),
+            pipes_sender.clone(),
+            sub_sender.clone(),
+        ));
+        // return;
+    };
+
+    // }
+    // // TODO: end this brutality, follow the rigtheous path
+    // let (remote_port_start, remote_port_end) = if remote_controls_port {
+    //     (remote_1_port, remote_1_port + 50) //.collect::<std::vec::Vec<u16>>();
+    //                                         // (remote_e_port, remote_e_port + 50) //.collect::<std::vec::Vec<u16>>();
+    //                                         // VecDeque::from(vec)
+    // } else {
+    //     // let vec =
+    //     (remote_1_port - 25, remote_1_port + 25) //.collect::<std::vec::Vec<u16>>();
+    //                                              // (remote_e_port - 25, remote_e_port + 25) //.collect::<std::vec::Vec<u16>>();
+    //                                              // VecDeque::from(vec)
+    // };
+    // println!("SADR: {:?}", s_addr);
+    // holepunch
+    //     .set_read_timeout(Some(Duration::from_secs(5)))
+    //     .unwrap();
+    // holepunch
+    //     .set_write_timeout(Some(Duration::from_secs(1)))
+    //     .unwrap();
+    // println!("Holepunch and connection successful.");
+    // let mut my_external_addr = vec![0; 200];
+    // let _result = holepunch.recv(&mut my_external_addr).await;
+    // my_external_addr.retain(|e| *e != 0);
+    // if !my_external_addr.is_empty() {
+    //     let my_addr = String::from_utf8_lossy(&my_external_addr).to_string();
+    //     println!("Received external addr: {}", my_addr);
+    //     let my_e_port = my_addr
+    //         .split(':')
+    //         .nth(1)
+    //         .unwrap()
+    //         .parse::<u16>()
+    //         .expect("Unable to parse external port");
+    //     let _ = holepunch.send(bind_addr.as_bytes()).await;
+    //     // println!("Sent: {:?}", bind_addr);
+    //     if holepunch.local_addr().expect("Dunno my local addr").port() == my_e_port {
+    //         println!("I can define my external port number!");
+    //     }
+    //     let _result = holepunch.recv(&mut my_external_addr).await;
+    //     let should_be_server = my_e_port < remote_adr.port();
+    //     (
+    //         // holepunch.local_addr().unwrap(),
+    //         remote_adr,
+    //         holepunch,
+    //         should_be_server,
+    //     )
+    // } else {
+    //     panic!("Did not receive data from remote host");
+    // }
+    // }
+}
+
+async fn ask_proxy_for_remote_socket(
+    holepunch: &UdpSocket,
+    puncher: String,
+    swarm_name: String,
+) -> Option<(IpAddr, u16)> {
+    let bytes = swarm_name.as_bytes();
+    let mut buf = vec![0_u8; 200];
+    buf[..bytes.len().min(200)].copy_from_slice(&bytes[..bytes.len().min(200)]);
+    // Initial request
+    holepunch
+        .send_to(&buf, puncher)
+        .await
+        .expect("unable to talk to helper");
+    println!("Waiting for UDPunch server to respond");
+    holepunch
+        .recv_from(&mut buf)
+        .await
+        .expect("unable to receive from helper");
+    // println!("Holepunch responded: {:?}", buf);
+    // TODO: here we should temporarily send some invalid packet to proxy
+    // let next_port = holepunch.local_addr().unwrap().port().saturating_add(1);
+    // let holepunch = UdpSocket::bind((host_ip, next_port))
+    //     .await
+    //     .expect("unable to create second socket");
+    // // Second request
+    // holepunch
+    //     .send_to(&buf, puncher)
+    //     .await
+    //     .expect("unable to talk to helper");
+    // println!("Waiting for UDPunch server to respond");
+    // holepunch
+    //     .recv_from(&mut buf)
+    //     .await
+    //     .expect("unable to receive from helper");
+    // // buf should now contain our partner's address data.
+    // println!("Holepunch responded: {:?}", buf);
+    buf.retain(|e| *e != 0);
+    let remote_addr = String::from_utf8_lossy(buf.as_slice()).to_string();
+    let remote_1_addr = remote_addr
+        .split(':')
+        .next()
+        .unwrap()
+        .parse::<IpAddr>()
+        .expect("Unable to parse remote address");
+    let remote_1_port = remote_addr
+        .split(':')
+        .nth(1)
+        .unwrap()
+        .parse::<u16>()
+        .expect("Unable to parse remote port");
+    Some((remote_1_addr, remote_1_port))
+    // // Third request
+    // let holepunch = UdpSocket::bind((host_ip, next_port.saturating_add(1)))
+    //     .await
+    //     .expect("unable to create third socket");
+    // holepunch
+    //     .send_to(&buf, puncher)
+    //     .await
+    //     .expect("unable to talk to helper");
+    // println!("Waiting for UDPunch server to respond");
+    // holepunch
+    //     .recv_from(&mut buf)
+    //     .await
+    //     .expect("unable to receive from helper");
+    // println!("Holepunch responded: {:?}", buf);
+    // // buf should now contain our partner's address data.
+    // buf.retain(|e| *e != 0);
+    // let remote_addr = String::from_utf8_lossy(buf.as_slice()).to_string();
+    // let remote_2_addr = remote_addr
+    //     .split(':')
+    //     .next()
+    //     .unwrap()
+    //     .parse::<IpAddr>()
+    //     .expect("Unable to parse remote address");
+    // let remote_2_port = remote_addr
+    //     .split(':')
+    //     .nth(1)
+    //     .unwrap()
+    //     .parse::<u16>()
+    //     .expect("Unable to parse remote port");
+    // // println!("to be bind addr: {}", bind_addr);
+    // let delta_port = remote_2_port - remote_1_port;
+    // let remote_e_port = remote_2_port + delta_port;
 }
 
 struct MagicPorts(VecDeque<u16>);
@@ -319,23 +477,23 @@ async fn try_communicate(socket: &UdpSocket, remote_addr: SocketAddr) -> Result<
 async fn probe_socket(
     socket: UdpSocket,
     remote_addr: SocketAddr,
-    // port_list: VecDeque<u16>,
     sender: Sender<(UdpSocket, Option<SocketAddr>)>,
 ) {
-    println!("running probe socket");
+    // println!("running probe socket");
     let sleep_time = Duration::from_millis(100);
-    let wait_time = Duration::from_millis(4100);
+    let wait_time = Duration::from_millis(1100);
+    print!("sent ");
     for i in (1..10).rev() {
         let _ = socket.send_to(&[i as u8], remote_addr).await;
-        println!("sent {}", i);
+        print!("{} ", i);
         sleep(sleep_time).await;
     }
     let mut socket_found = false;
     let mut timed_out = false;
+    // println!("waiting for response");
     while !socket_found && !timed_out {
-        println!("still not found");
         let t1 = wait_for_bytes(&socket).fuse();
-        let t2 = time_out(wait_time).fuse();
+        let t2 = time_out(wait_time, None).fuse();
 
         pin_mut!(t1, t2);
 
@@ -344,17 +502,17 @@ async fn probe_socket(
             _result2 = t2 => (false,true),
         };
     }
-    if socket_found {
-        println!("socket found!");
-    } else {
-        println!("timed out");
-    } // let sleep_time = Duration::from_millis(rand::random::<u8>() as u64 + 200);
-      // loop {
-      //     let new_port = port_list.pop_front().unwrap();
-      //     remote_addr.set_port(new_port);
-      //     port_list.push_back(new_port);
-      //     let t1 = timeout(&sleep_time).fuse();
-      //     let t2 = try_communicate(&socket, remote_addr).fuse();
+    // if socket_found {
+    //     println!("socket found!");
+    // } else {
+    //     println!("timed out");
+    // } // let sleep_time = Duration::from_millis(rand::random::<u8>() as u64 + 200);
+    // loop {
+    //     let new_port = port_list.pop_front().unwrap();
+    //     remote_addr.set_port(new_port);
+    //     port_list.push_back(new_port);
+    //     let t1 = timeout(&sleep_time).fuse();
+    //     let t2 = try_communicate(&socket, remote_addr).fuse();
 
     //     pin_mut!(t1, t2);
 
@@ -390,25 +548,6 @@ async fn probe_socket(
     }
 }
 
-async fn wait_for_bytes(socket: &UdpSocket) {
-    println!("waiting for bytes");
-    let mut bytes: [u8; 10] = [0; 10];
-    loop {
-        let recv_res = socket.recv_from(&mut bytes).await;
-        println!("Recv: {:?}", recv_res);
-        if let Ok((count, from)) = recv_res {
-            println!("Recv: {} from {:?}", bytes[0], from);
-            if count == 1 && bytes[0] == 1 {
-                return;
-            }
-        }
-    }
-}
-
-async fn time_out(time: Duration) {
-    sleep(time).await
-}
-
 async fn punch_back(
     socket: &UdpSocket,
     remote_addr: SocketAddr,
@@ -436,17 +575,12 @@ async fn punch_back(
 // establishment depending on our and their NetworkSettings
 pub async fn punch_it(
     socket: UdpSocket,
-    swarm_name: String,
-    pub_key_pem: String,
-    gnome_id: GnomeId,
-    decrypter: Decrypter,
+    // socket_sender: Sender<Option<UdpSocket>>,
     (my_settings, other_settings): (NetworkSettings, NetworkSettings),
-    pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
-    sub_sender: Sender<Subscription>,
-) {
+) -> Option<UdpSocket> {
     let mut remote_adr = SocketAddr::new(other_settings.pub_ip, other_settings.pub_port);
     println!(
-        "punching: {:?}:{:?}",
+        "Punching: {:?}:{:?}",
         other_settings.pub_ip, other_settings.pub_port
     );
     let (sender, reciever) = channel();
@@ -458,12 +592,17 @@ pub async fn punch_it(
         responsive_socket_result = reciever.try_recv();
         match responsive_socket_result {
             Ok((socket, None)) => {
-                println!("none");
-                if my_settings.nat_at_most_address_sensitive()
+                // println!("none");
+                // if my_settings.nat_at_most_address_sensitive()
+                if my_settings.port_allocation_predictable()
                     && other_settings.port_allocation_predictable()
                 {
+                    println!("\nHit me babe one more time");
                     // TODO: we try to cover case when NAT assigned next port
                     // while we were trying to initialize communication
+                    let mut local_addr = socket.local_addr().unwrap();
+                    local_addr.set_port(local_addr.port() + 1);
+                    let socket = UdpSocket::bind(local_addr).await.unwrap();
                     let next_port = other_settings.port_increment(remote_adr.port());
                     remote_adr.set_port(next_port);
                     spawn(probe_socket(socket, remote_adr, sender.clone()));
@@ -472,6 +611,7 @@ pub async fn punch_it(
                     break;
                 }
             }
+
             Ok((socket, Some(remote_addr))) => {
                 println!("Got a sock!");
                 let conn_result = socket.connect(remote_addr).await;
@@ -488,11 +628,108 @@ pub async fn punch_it(
         yield_now().await;
     }
     drop(reciever);
-    if dedicated_socket.is_none() {
-        println!("Failed to communicate with remote gnome");
-        return;
+    // if dedicated_socket.is_none() {
+    //     println!("Failed to communicate with remote gnome");
+    //     return;
+    // }else{
+    // let _ = socket_sender.send(dedicated_socket);
+    dedicated_socket
+}
+
+pub async fn cluster_punch_it(
+    my_ip: IpAddr,
+    his_ip: IpAddr,
+    my_port_min: u16,
+    my_count: u16,
+    (his_port_min, his_step, his_port_max): (u16, u16, u16),
+    timeout_sec: Duration,
+) -> Option<UdpSocket> {
+    println!("Performing clusterpunch");
+    println!(
+        "my sockets: {:?} ports: {}-{}",
+        my_ip,
+        my_port_min,
+        my_port_min + my_count
+    );
+    println!(
+        "his sockets: {:?} ports: {}-{}",
+        his_ip, his_port_min, his_port_max
+    );
+    let mut next_remote_port: HashMap<u16, u16> = HashMap::new();
+    let mut his_current = his_port_min;
+    let (send, recv) = channel();
+    for i in 0..my_count {
+        let a_socket = UdpSocket::bind((my_ip, my_port_min + i)).await.unwrap();
+        spawn(probe_socket(
+            a_socket,
+            SocketAddr::new(his_ip, his_current),
+            send.clone(),
+        ));
+        his_current += his_step;
+        if his_current > his_port_max {
+            his_current = his_port_min;
+        }
+        next_remote_port.insert(my_port_min + i, his_current);
+    }
+
+    // TODO: we need to create multiple socket instances: 50 maybe 100
+    // for each instance we need to track what remote we are currently trying to reach
+    // if a given socket has timed out, we increment the target socket port by step and
+    // probe again, if port > max, we start from min
+    // we repeat until timeout if given
+    let sleep_duration = Duration::from_millis(50);
+    let mut dedicated_socket = None;
+    let (t_send, timeout) = channel();
+    spawn(time_out(timeout_sec, Some(t_send)));
+    while timeout.try_recv().is_err() {
+        let recv_result = recv.try_recv();
+        if recv_result.is_err() {
+            sleep(sleep_duration).await;
+            continue;
+        }
+        match recv_result {
+            Ok((socket, None)) => {
+                let local_port = socket.local_addr().unwrap().port();
+                let mut his_next = next_remote_port.remove(&local_port).unwrap();
+                spawn(probe_socket(
+                    socket,
+                    SocketAddr::new(his_ip, his_next),
+                    send.clone(),
+                ));
+                his_next += his_step;
+                if his_next > his_port_max {
+                    his_next = his_port_min;
+                }
+                next_remote_port.insert(local_port, his_next);
+            }
+            Ok((socket, Some(remote_addr))) => {
+                let _ = socket.connect(remote_addr).await;
+                dedicated_socket = Some(socket);
+                break;
+            }
+            other => {
+                println!("Unexpected probe result: {:?}", other);
+            }
+        }
     }
-    let dedicated_socket = dedicated_socket.unwrap();
+    drop(recv);
+    drop(timeout);
+    dedicated_socket
+}
+
+pub async fn start_communication(
+    dedicated_socket: UdpSocket,
+    swarm_name: String,
+    pub_key_pem: String,
+    decrypter: Decrypter,
+    pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
+    sub_sender: Sender<Subscription>,
+) {
+    // TODO: above is just one of possible procedures used for holepunching
+    // below we need to wrap it into an async fn and call it when
+    // above procedure or similar returns a punched socket and remote address
+    // that are ready for message exchange
+    // let dedicated_socket = dedicated_socket.unwrap();
     let remote_addr = dedicated_socket.peer_addr().unwrap();
     println!("We did it: {:?}", remote_addr);
     // if s_addr.is_err() {
@@ -505,7 +742,7 @@ pub async fn punch_it(
     // sleep(sleep_time).await;
     println!(
         "Send PUB key to: {:?} from: {:?} result: {:?}",
-        remote_adr,
+        remote_addr, //changed here
         dedicated_socket.local_addr().unwrap(),
         _send_result
     );
@@ -537,6 +774,9 @@ pub async fn punch_it(
 
     let key = generate_symmetric_key();
     let mut session_key = SessionKey::from_key(&key);
+    let loc_encr = Encrypter::create_from_data(&pub_key_pem).unwrap();
+    let gnome_id = GnomeId(loc_encr.hash());
+    drop(loc_encr);
     if remote_gnome_id.0 < gnome_id.0 {
         // TODO maybe send remote external IP here?
         let bytes_to_send = Vec::from(&key);

+ 83 - 73
src/networking/mod.rs

@@ -9,18 +9,17 @@ mod stun;
 mod subscription;
 mod token;
 use self::client::run_client;
-// use self::common::are_we_behind_a_nat;
-// use self::common::discover_port_allocation_rule;
+use self::common::are_we_behind_a_nat;
 use self::server::run_server;
 use self::sock::serve_socket;
 use self::subscription::subscriber;
 use self::token::{token_dispenser, Token};
+use crate::crypto::Decrypter;
 use async_std::net::UdpSocket;
 use async_std::task::spawn;
-use holepunch::holepunch;
-use std::sync::mpsc::{channel, Receiver, Sender};
-use swarm_consensus::{NetworkSettings, Request};
-// use swarm_consensus::Message;
+use std::net::SocketAddr;
+use std::sync::mpsc::{channel, Receiver};
+use swarm_consensus::NotificationBundle;
 
 // #[derive(Debug)]
 // enum ConnError {
@@ -35,30 +34,18 @@ use swarm_consensus::{NetworkSettings, Request};
 //         }
 //     }
 // }
-
 // impl Error for ConnError {}
 
-use std::net::{IpAddr, SocketAddr};
-
-use crate::crypto::Decrypter;
-// use crate::networking::common::identify_nat;
-
 pub async fn run_networking_tasks(
-    host_ip: IpAddr,
+    // host_ip: IpAddr,
     server_port: u16,
     buffer_size_bytes: u32,
     uplink_bandwith_bytes_sec: u32,
-    notification_receiver: Receiver<(
-        String,
-        Sender<Request>,
-        Sender<u32>,
-        Receiver<NetworkSettings>,
-    )>,
+    notification_receiver: Receiver<NotificationBundle>,
     decrypter: Decrypter,
     pub_key_pem: String,
 ) {
     let server_addr: SocketAddr = SocketAddr::new("0.0.0.0".parse().unwrap(), server_port);
-    // let server_addr: SocketAddr = SocketAddr::new(host_ip, server_port);
     let bind_result = UdpSocket::bind(server_addr).await;
     let (sub_send_one, sub_recv_one) = channel();
     let (sub_send_two, sub_recv_two) = channel();
@@ -67,32 +54,18 @@ pub async fn run_networking_tasks(
     let (token_pipes_sender, token_pipes_receiver) = channel();
     let (send_pair, recv_pair) = channel();
     spawn(subscriber(
-        host_ip,
+        // host_ip,
         sub_send_one,
-        decrypter.clone(),
-        token_pipes_sender.clone(),
-        pub_key_pem.clone(),
+        // decrypter.clone(),
+        // token_pipes_sender.clone(),
+        // pub_key_pem.clone(),
         sub_recv_two,
-        sub_send_two.clone(),
+        // sub_send_two.clone(),
         notification_receiver,
         token_dispenser_send,
         holepunch_sender,
         send_pair,
     ));
-    spawn(direct_punching_service(
-        host_ip,
-        sub_send_two.clone(),
-        // req_sender.clone(),
-        // resp_receiver,
-        decrypter.clone(),
-        token_pipes_sender.clone(),
-        recv_pair,
-        // receiver,
-        pub_key_pem.clone(),
-        // swarm_name.clone(),
-        // net_set_recv,
-        // None,
-    ));
     spawn(token_dispenser(
         buffer_size_bytes,
         uplink_bandwith_bytes_sec,
@@ -101,55 +74,92 @@ pub async fn run_networking_tasks(
         token_dispenser_recv,
     ));
 
-    // let (decode_req_send, decode_req_recv) = channel();
-    // let (decode_resp_send, decode_resp_recv) = channel();
-    // println!("bifor");
-    // spawn(decrypter_service(
-    //     decrypter,
-    //     decode_resp_send,
-    //     decode_req_recv,
-    // ));
-    // println!("after");
     if let Ok(socket) = bind_result {
-        let puncher = "tudbut.de:4277";
-        spawn(holepunch(
-            puncher,
-            host_ip,
+        spawn(run_server(
+            // host_ip,
+            socket,
             sub_send_two.clone(),
-            // decode_req_send,
-            // decode_resp_recv,
-            decrypter.clone(),
+            sub_recv_one,
             token_pipes_sender.clone(),
-            holepunch_receiver,
             pub_key_pem.clone(),
-        ));
-
-        run_server(
-            host_ip,
-            socket,
-            sub_send_two,
-            sub_recv_one,
-            token_pipes_sender,
-            pub_key_pem,
-        )
-        .await;
+        ))
+        // .await;
     } else {
-        run_client(
+        spawn(run_client(
             // server_addr,
             // socket,
-            host_ip,
+            // host_ip,
             sub_recv_one,
-            sub_send_two,
+            sub_send_two.clone(),
             // token_send_two,
             // token_recv,
             decrypter.clone(),
             // decode_req_send,
             // decode_resp_recv,
+            token_pipes_sender.clone(),
+            pub_key_pem.clone(),
+        ))
+        // .await;
+    };
+    //TODO: We need to organize how and which networking services get started.
+    // 1. We always need to run basic services like token_dispenser and subscriber.
+    // 2. We also always need to run_client and try to run_server.
+    // 2. We need te establish if we have a public_ip.
+    let behind_nat_result = are_we_behind_a_nat(
+        &UdpSocket::bind(SocketAddr::new("0.0.0.0".parse().unwrap(), 0))
+            .await
+            .unwrap(),
+    )
+    .await;
+    let we_are_behind_nat;
+    // let mut our_public_ip = None;
+    if let Ok((behind_a_nat, _public_addr)) = behind_nat_result {
+        we_are_behind_nat = behind_a_nat;
+        // our_public_ip = Some(public_addr.ip());
+    } else {
+        we_are_behind_nat = true;
+    }
+    // 3. a) If we are not behind a NAT then we are done.
+    //    b) Other way we are behind a NAT.
+    if we_are_behind_nat {
+        // In case we are behind a NAT we need to run direct_punch and holepunch
+        // Both of those services need a sophisticated procedure for connection establishment.
+        spawn(direct_punching_service(
+            // host_ip,
+            sub_send_two,
+            // req_sender.clone(),
+            // resp_receiver,
+            decrypter.clone(),
             token_pipes_sender,
+            recv_pair,
+            // receiver,
             pub_key_pem,
-        )
-        .await;
-    };
+            // swarm_name.clone(),
+            // net_set_recv,
+            // None,
+        ));
+        // let puncher = "tudbut.de:4277";
+        // spawn(holepunch(
+        //     puncher,
+        //     host_ip,
+        //     sub_send_two.clone(),
+        //     // decode_req_send,
+        //     // decode_resp_recv,
+        //     decrypter.clone(),
+        //     token_pipes_sender.clone(),
+        //     holepunch_receiver,
+        //     pub_key_pem.clone(),
+        // ));
+    }
+    // let (decode_req_send, decode_req_recv) = channel();
+    // let (decode_resp_send, decode_resp_recv) = channel();
+    // println!("bifor");
+    // spawn(decrypter_service(
+    //     decrypter,
+    //     decode_resp_send,
+    //     decode_req_recv,
+    // ));
+    // println!("after");
 }
 
 // async fn decrypter_service(

+ 82 - 79
src/networking/server.rs

@@ -10,12 +10,12 @@ use crate::networking::subscription::Subscription;
 use crate::prelude::Encrypter;
 use async_std::net::UdpSocket;
 use async_std::task::spawn;
-use std::net::{IpAddr, SocketAddr};
+use std::net::SocketAddr;
 use std::sync::mpsc::{channel, Receiver, Sender};
 use swarm_consensus::GnomeId;
 
 pub async fn run_server(
-    host_ip: IpAddr,
+    // host_ip: IpAddr,
     socket: UdpSocket,
     sub_sender: Sender<Subscription>,
     mut sub_receiver: Receiver<Subscription>,
@@ -26,7 +26,7 @@ pub async fn run_server(
     println!("- - - - - - - - SERVER - - - - - - - -");
     println!("- Listens on: {:?}   -", socket.local_addr().unwrap());
     println!("--------------------------------------");
-    println!("My Pubkey PEM:\n {:?}", pub_key_pem);
+    // println!("My Pubkey PEM:\n {:?}", pub_key_pem);
     let loc_encr = Encrypter::create_from_data(&pub_key_pem).unwrap();
     let gnome_id = GnomeId(loc_encr.hash());
     println!("My GnomeId: {}", gnome_id);
@@ -34,7 +34,7 @@ pub async fn run_server(
         let mut remote_gnome_id: GnomeId = GnomeId(0);
         let mut session_key: SessionKey = SessionKey::from_key(&[0; 32]);
         let optional_sock = establish_secure_connection(
-            host_ip,
+            // host_ip,
             &socket,
             &mut remote_gnome_id,
             &mut session_key,
@@ -65,20 +65,26 @@ pub async fn run_server(
 }
 
 async fn establish_secure_connection(
-    host_ip: IpAddr,
+    // host_ip: IpAddr,
     socket: &UdpSocket,
     remote_gnome_id: &mut GnomeId,
     session_key: &mut SessionKey,
     pub_key_pem: &str,
 ) -> Option<UdpSocket> {
     let mut bytes = [0u8; 1100];
-    // let mut bytes = buf.split();
-    let result = socket.recv_from(&mut bytes).await;
-    if result.is_err() {
-        println!("Failed to receive data on socket: {:?}", result);
-        return None;
+    let mut count;
+    let mut remote_addr;
+    loop {
+        let result = socket.recv_from(&mut bytes).await;
+        if result.is_err() {
+            println!("Failed to receive data on socket: {:?}", result);
+            return None;
+        }
+        (count, remote_addr) = result.unwrap();
+        if count > 1 {
+            break;
+        }
     }
-    let (count, remote_addr) = result.unwrap();
     println!("SKT Received {} bytes", count);
     let id_pub_key_pem = std::str::from_utf8(&bytes[..count]).unwrap();
     let result = Encrypter::create_from_data(id_pub_key_pem);
@@ -88,7 +94,9 @@ async fn establish_secure_connection(
     }
     let encr = result.unwrap();
 
-    let dedicated_socket = UdpSocket::bind(SocketAddr::new(host_ip, 0)).await.unwrap();
+    let dedicated_socket = UdpSocket::bind(SocketAddr::new(socket.local_addr().unwrap().ip(), 0))
+        .await
+        .unwrap();
     // let dedicated_port = dedicated_socket.local_addr().unwrap().port();
     // let mut bytes_to_send = Vec::from(dedicated_port.to_be_bytes());
     let key = generate_symmetric_key();
@@ -113,15 +121,6 @@ async fn establish_secure_connection(
 
     *session_key = SessionKey::from_key(&key);
 
-    //TODO: put here new socket creation and work on that socket from now on
-    // let dedi_sock_option =
-    //     create_dedicated_socket(socket, dedicated_socket, host_ip, remote_addr, session_key).await;
-    // if dedi_sock_option.is_none() {
-    //     println!("Unable to create dedicated socket to Neighbor");
-    //     return None;
-    // }
-    // let dedicated_socket = dedi_sock_option.unwrap();
-
     let mut r_buf = [0u8; 32];
     let r_res = dedicated_socket.recv_from(&mut r_buf).await;
     if r_res.is_err() {
@@ -129,7 +128,11 @@ async fn establish_secure_connection(
         return None;
     }
     let (_count, remote_addr) = r_res.unwrap();
-    dedicated_socket.connect(remote_addr).await.unwrap();
+    let conn_result = dedicated_socket.connect(remote_addr).await;
+    if conn_result.is_err() {
+        println!("Unable to connect dedicated socket: {:?}", conn_result);
+        return None;
+    }
 
     let my_encrypted_pubkey = session_key.encrypt(pub_key_pem.as_bytes());
     let res2 = dedicated_socket.send(&my_encrypted_pubkey).await;
@@ -144,63 +147,63 @@ async fn establish_secure_connection(
     Some(dedicated_socket)
 }
 
-async fn create_dedicated_socket(
-    socket: &UdpSocket,
-    dedicated_socket: UdpSocket,
-    // host_ip: IpAddr,
-    mut remote_addr: SocketAddr,
-    session_key: &SessionKey,
-) -> Option<UdpSocket> {
-    // let send_result = socket
-    //     .send_to(
-    //         dedicated_socket
-    //             .local_addr()
-    //             .unwrap()
-    //             .to_string()
-    //             .as_bytes(),
-    //         remote_addr,
-    //     )
-    //     .await;
-    // if send_result.is_err() {
-    //     println!("Unable to send new socket addr: {:?}", send_result);
-    //     return None;
-    // }
-    let mut bytes = [0; 128];
-    let recv_result = socket.recv_from(&mut bytes).await;
-
-    if let Ok((count, rem_addr)) = recv_result {
-        println!("Received {} bytes: {:?}", count, bytes);
-        let decryp_res = session_key.decrypt(&bytes[..count]);
-        if decryp_res.is_err() {
-            println!(
-                "Unable to decrypt new socket addr with session key:\n{:?}",
-                decryp_res
-            );
-            return None;
-        }
-
-        let recv_str_res = String::from_utf8(decryp_res.unwrap());
-        if recv_str_res.is_err() {
-            println!("Unable to parse String: {:?}", recv_str_res);
-            return None;
-        }
-        let port_res = recv_str_res.unwrap().parse::<u16>();
-        if port_res.is_err() {
-            println!("Failed constructing u16 port number: {:?}", port_res);
-            return None;
-        }
-        let port = port_res.unwrap();
-        remote_addr = SocketAddr::new(rem_addr.ip(), port);
-    }
-    let conn_result = dedicated_socket.connect(remote_addr).await;
-    if conn_result.is_err() {
-        println!("Unable to connect dedicated socket: {:?}", conn_result);
-        return None;
-    }
-
-    println!("SKT Connected to client");
-    Some(dedicated_socket)
-}
+// async fn create_dedicated_socket(
+//     socket: &UdpSocket,
+//     dedicated_socket: UdpSocket,
+//     // host_ip: IpAddr,
+//     mut remote_addr: SocketAddr,
+//     session_key: &SessionKey,
+// ) -> Option<UdpSocket> {
+//     // let send_result = socket
+//     //     .send_to(
+//     //         dedicated_socket
+//     //             .local_addr()
+//     //             .unwrap()
+//     //             .to_string()
+//     //             .as_bytes(),
+//     //         remote_addr,
+//     //     )
+//     //     .await;
+//     // if send_result.is_err() {
+//     //     println!("Unable to send new socket addr: {:?}", send_result);
+//     //     return None;
+//     // }
+//     let mut bytes = [0; 128];
+//     let recv_result = socket.recv_from(&mut bytes).await;
+
+//     if let Ok((count, rem_addr)) = recv_result {
+//         println!("Received {} bytes: {:?}", count, bytes);
+//         let decryp_res = session_key.decrypt(&bytes[..count]);
+//         if decryp_res.is_err() {
+//             println!(
+//                 "Unable to decrypt new socket addr with session key:\n{:?}",
+//                 decryp_res
+//             );
+//             return None;
+//         }
+
+//         let recv_str_res = String::from_utf8(decryp_res.unwrap());
+//         if recv_str_res.is_err() {
+//             println!("Unable to parse String: {:?}", recv_str_res);
+//             return None;
+//         }
+//         let port_res = recv_str_res.unwrap().parse::<u16>();
+//         if port_res.is_err() {
+//             println!("Failed constructing u16 port number: {:?}", port_res);
+//             return None;
+//         }
+//         let port = port_res.unwrap();
+//         remote_addr = SocketAddr::new(rem_addr.ip(), port);
+//     }
+//     let conn_result = dedicated_socket.connect(remote_addr).await;
+//     if conn_result.is_err() {
+//         println!("Unable to connect dedicated socket: {:?}", conn_result);
+//         return None;
+//     }
+
+//     println!("SKT Connected to client");
+//     Some(dedicated_socket)
+// }
 
 async fn prepare_and_serve(
     dedicated_socket: UdpSocket,

+ 2 - 2
src/networking/stun.rs

@@ -204,13 +204,13 @@ pub async fn stun_send(sock: &UdpSocket, msg: StunMessage, ip: Option<IpAddr>, p
     let ip = if ip.is_some() {
         ip.unwrap()
     } else {
-        IpAddr::V4(Ipv4Addr::new(193, 43, 148, 37))
+        IpAddr::V4(Ipv4Addr::new(108, 177, 15, 127))
     };
     let port = if port.is_some() { port.unwrap() } else { 3478 };
     let to: SocketAddr = SocketAddr::new(ip, port);
     let buf = msg.bytes();
     let _result = sock.send_to(&buf, to).await;
-    println!("Send to stun: {:?}", to);
+    // println!("Send to stun: {:?}", to);
 }
 
 pub type Length = u16;

+ 19 - 27
src/networking/subscription.rs

@@ -1,12 +1,9 @@
 // use crate::crypto::Decrypter;
-use crate::prelude::Decrypter;
-use async_std::task::{spawn, yield_now};
+use async_std::task::yield_now;
 use std::collections::HashMap;
 use std::net::IpAddr;
 use std::sync::mpsc::{Receiver, Sender};
-use swarm_consensus::{Nat, Neighbor, NetworkSettings, Request};
-
-use super::token::Token;
+use swarm_consensus::{Nat, Neighbor, NetworkSettings, NotificationBundle, Request};
 
 #[derive(Debug)]
 pub enum Subscription {
@@ -22,19 +19,14 @@ pub enum Subscription {
 }
 
 pub async fn subscriber(
-    host_ip: IpAddr,
+    // host_ip: IpAddr,
     sub_sender: Sender<Subscription>,
-    decrypter: Decrypter,
-    pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
-    pub_key_pem: String,
+    // decrypter: Decrypter,
+    // pipes_sender: Sender<(Sender<Token>, Receiver<Token>)>,
+    // pub_key_pem: String,
     sub_receiver: Receiver<Subscription>,
-    sub_sender_two: Sender<Subscription>,
-    notification_receiver: Receiver<(
-        String,
-        Sender<Request>,
-        Sender<u32>,
-        Receiver<NetworkSettings>,
-    )>,
+    // sub_sender_two: Sender<Subscription>,
+    notification_receiver: Receiver<NotificationBundle>,
     token_dispenser_send: Sender<Sender<u32>>,
     holepunch_sender: Sender<String>,
     direct_punch_sender: Sender<(String, Sender<Request>, Receiver<NetworkSettings>)>,
@@ -44,29 +36,29 @@ pub async fn subscriber(
     println!("Subscriber service started");
     let mut notify_holepunch = true;
     loop {
-        // println!("loop");
+        // print!("sub");
         let recv_result = notification_receiver.try_recv();
         match recv_result {
-            Ok((swarm_name, req_sender, band_sender, net_set_recv)) => {
+            Ok(notif_bundle) => {
                 // TODO: only one punching service for all swarms!
                 let _ = direct_punch_sender.send((
-                    swarm_name.clone(),
-                    req_sender.clone(),
-                    net_set_recv,
+                    notif_bundle.swarm_name.clone(),
+                    notif_bundle.request_sender.clone(),
+                    notif_bundle.network_settings_receiver,
                 ));
-                swarms.insert(swarm_name.clone(), req_sender);
-                names.push(swarm_name.clone());
+                swarms.insert(notif_bundle.swarm_name.clone(), notif_bundle.request_sender);
+                names.push(notif_bundle.swarm_name.clone());
                 // TODO: inform existing sockets about new subscription
-                println!("Added swarm: {}", swarm_name);
+                println!("Added swarm: {}", notif_bundle.swarm_name);
                 // TODO: serve err results
-                let _ = sub_sender.send(Subscription::Added(swarm_name.clone()));
+                let _ = sub_sender.send(Subscription::Added(notif_bundle.swarm_name.clone()));
                 if notify_holepunch {
-                    let h_res = holepunch_sender.send(swarm_name);
+                    let h_res = holepunch_sender.send(notif_bundle.swarm_name);
                     if h_res.is_err() {
                         notify_holepunch = false;
                     }
                 }
-                let _ = token_dispenser_send.send(band_sender);
+                let _ = token_dispenser_send.send(notif_bundle.token_sender);
                 // TODO: sockets should be able to respond if they want to join
             }
             Err(std::sync::mpsc::TryRecvError::Disconnected) => {