rendezvous_mediator.rs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720
  1. use std::{
  2. net::SocketAddr,
  3. sync::{
  4. atomic::{AtomicBool, Ordering},
  5. Arc,
  6. },
  7. time::Instant,
  8. };
  9. use uuid::Uuid;
  10. use hbb_common::{
  11. allow_err,
  12. anyhow::{self, bail},
  13. config::{self, keys::*, option2bool, Config, CONNECT_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT},
  14. futures::future::join_all,
  15. log,
  16. protobuf::Message as _,
  17. proxy::Proxy,
  18. rendezvous_proto::*,
  19. sleep,
  20. socket_client::{self, connect_tcp, is_ipv4},
  21. tcp::FramedStream,
  22. tokio::{self, select, sync::Mutex, time::interval},
  23. udp::FramedSocket,
  24. AddrMangle, IntoTargetAddr, ResultType, TargetAddr,
  25. };
  26. use crate::{
  27. check_port,
  28. server::{check_zombie, new as new_server, ServerPtr},
  29. ui_interface::get_builtin_option,
  30. };
  31. type Message = RendezvousMessage;
  32. lazy_static::lazy_static! {
  33. static ref SOLVING_PK_MISMATCH: Arc<Mutex<String>> = Default::default();
  34. }
  35. static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
  36. static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false);
  37. #[derive(Clone)]
  38. pub struct RendezvousMediator {
  39. addr: TargetAddr<'static>,
  40. host: String,
  41. host_prefix: String,
  42. keep_alive: i32,
  43. }
  44. impl RendezvousMediator {
  45. pub fn restart() {
  46. SHOULD_EXIT.store(true, Ordering::SeqCst);
  47. MANUAL_RESTARTED.store(true, Ordering::SeqCst);
  48. log::info!("server restart");
  49. }
  50. pub async fn start_all() {
  51. if config::is_outgoing_only() {
  52. loop {
  53. sleep(1.).await;
  54. }
  55. }
  56. crate::hbbs_http::sync::start();
  57. let mut nat_tested = false;
  58. check_zombie();
  59. let server = new_server();
  60. if Config::get_nat_type() == NatType::UNKNOWN_NAT as i32 {
  61. crate::test_nat_type();
  62. nat_tested = true;
  63. }
  64. if config::option2bool("stop-service", &Config::get_option("stop-service")) {
  65. crate::test_rendezvous_server();
  66. }
  67. let server_cloned = server.clone();
  68. tokio::spawn(async move {
  69. direct_server(server_cloned).await;
  70. });
  71. #[cfg(target_os = "android")]
  72. let start_lan_listening = true;
  73. #[cfg(not(any(target_os = "android", target_os = "ios")))]
  74. let start_lan_listening = crate::platform::is_installed();
  75. if start_lan_listening {
  76. std::thread::spawn(move || {
  77. allow_err!(super::lan::start_listening());
  78. });
  79. }
  80. // It is ok to run xdesktop manager when the headless function is not allowed.
  81. #[cfg(target_os = "linux")]
  82. if crate::is_server() {
  83. crate::platform::linux_desktop_manager::start_xdesktop();
  84. }
  85. scrap::codec::test_av1();
  86. loop {
  87. let conn_start_time = Instant::now();
  88. *SOLVING_PK_MISMATCH.lock().await = "".to_owned();
  89. if !config::option2bool("stop-service", &Config::get_option("stop-service"))
  90. && !crate::platform::installing_service()
  91. {
  92. if !nat_tested {
  93. crate::test_nat_type();
  94. nat_tested = true;
  95. }
  96. let mut futs = Vec::new();
  97. let servers = Config::get_rendezvous_servers();
  98. SHOULD_EXIT.store(false, Ordering::SeqCst);
  99. MANUAL_RESTARTED.store(false, Ordering::SeqCst);
  100. for host in servers.clone() {
  101. let server = server.clone();
  102. futs.push(tokio::spawn(async move {
  103. if let Err(err) = Self::start(server, host).await {
  104. log::error!("rendezvous mediator error: {err}");
  105. }
  106. // SHOULD_EXIT here is to ensure once one exits, the others also exit.
  107. SHOULD_EXIT.store(true, Ordering::SeqCst);
  108. }));
  109. }
  110. join_all(futs).await;
  111. } else {
  112. server.write().unwrap().close_connections();
  113. }
  114. Config::reset_online();
  115. if !MANUAL_RESTARTED.load(Ordering::SeqCst) {
  116. let elapsed = conn_start_time.elapsed().as_millis() as u64;
  117. if elapsed < CONNECT_TIMEOUT {
  118. sleep(((CONNECT_TIMEOUT - elapsed) / 1000) as _).await;
  119. }
  120. }
  121. }
  122. }
  123. fn get_host_prefix(host: &str) -> String {
  124. host.split(".")
  125. .next()
  126. .map(|x| {
  127. if x.parse::<i32>().is_ok() {
  128. host.to_owned()
  129. } else {
  130. x.to_owned()
  131. }
  132. })
  133. .unwrap_or(host.to_owned())
  134. }
  135. pub async fn start_udp(server: ServerPtr, host: String) -> ResultType<()> {
  136. let host = check_port(&host, RENDEZVOUS_PORT);
  137. let (mut socket, mut addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?;
  138. let mut rz = Self {
  139. addr: addr.clone(),
  140. host: host.clone(),
  141. host_prefix: Self::get_host_prefix(&host),
  142. keep_alive: crate::DEFAULT_KEEP_ALIVE,
  143. };
  144. let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
  145. const MIN_REG_TIMEOUT: i64 = 3_000;
  146. const MAX_REG_TIMEOUT: i64 = 30_000;
  147. let mut reg_timeout = MIN_REG_TIMEOUT;
  148. const MAX_FAILS1: i64 = 2;
  149. const MAX_FAILS2: i64 = 4;
  150. const DNS_INTERVAL: i64 = 60_000;
  151. let mut fails = 0;
  152. let mut last_register_resp: Option<Instant> = None;
  153. let mut last_register_sent: Option<Instant> = None;
  154. let mut last_dns_check = Instant::now();
  155. let mut old_latency = 0;
  156. let mut ema_latency = 0;
  157. loop {
  158. let mut update_latency = || {
  159. last_register_resp = Some(Instant::now());
  160. fails = 0;
  161. reg_timeout = MIN_REG_TIMEOUT;
  162. let mut latency = last_register_sent
  163. .map(|x| x.elapsed().as_micros() as i64)
  164. .unwrap_or(0);
  165. last_register_sent = None;
  166. if latency < 0 || latency > 1_000_000 {
  167. return;
  168. }
  169. if ema_latency == 0 {
  170. ema_latency = latency;
  171. } else {
  172. ema_latency = latency / 30 + (ema_latency * 29 / 30);
  173. latency = ema_latency;
  174. }
  175. let mut n = latency / 5;
  176. if n < 3000 {
  177. n = 3000;
  178. }
  179. if (latency - old_latency).abs() > n || old_latency <= 0 {
  180. Config::update_latency(&host, latency);
  181. log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.);
  182. old_latency = latency;
  183. }
  184. };
  185. select! {
  186. n = socket.next() => {
  187. match n {
  188. Some(Ok((bytes, _))) => {
  189. if let Ok(msg) = Message::parse_from_bytes(&bytes) {
  190. rz.handle_resp(msg.union, Sink::Framed(&mut socket, &addr), &server, &mut update_latency).await?;
  191. } else {
  192. log::debug!("Non-protobuf message bytes received: {:?}", bytes);
  193. }
  194. },
  195. Some(Err(e)) => bail!("Failed to receive next {}", e), // maybe socks5 tcp disconnected
  196. None => {
  197. bail!("Socket receive none. Maybe socks5 server is down.");
  198. },
  199. }
  200. },
  201. _ = timer.tick() => {
  202. if SHOULD_EXIT.load(Ordering::SeqCst) {
  203. break;
  204. }
  205. let now = Some(Instant::now());
  206. let expired = last_register_resp.map(|x| x.elapsed().as_millis() as i64 >= REG_INTERVAL).unwrap_or(true);
  207. let timeout = last_register_sent.map(|x| x.elapsed().as_millis() as i64 >= reg_timeout).unwrap_or(false);
  208. // temporarily disable exponential backoff for android before we add wakeup trigger to force connect in android
  209. #[cfg(not(any(target_os = "android", target_os = "ios")))]
  210. if crate::using_public_server() { // only turn on this for public server, may help DDNS self-hosting user.
  211. if timeout && reg_timeout < MAX_REG_TIMEOUT {
  212. reg_timeout += MIN_REG_TIMEOUT;
  213. }
  214. }
  215. if timeout || (last_register_sent.is_none() && expired) {
  216. if timeout {
  217. fails += 1;
  218. if fails >= MAX_FAILS2 {
  219. Config::update_latency(&host, -1);
  220. old_latency = 0;
  221. if last_dns_check.elapsed().as_millis() as i64 > DNS_INTERVAL {
  222. // in some case of network reconnect (dial IP network),
  223. // old UDP socket not work any more after network recover
  224. if let Some((s, new_addr)) = socket_client::rebind_udp_for(&rz.host).await? {
  225. socket = s;
  226. rz.addr = new_addr.clone();
  227. addr = new_addr;
  228. }
  229. last_dns_check = Instant::now();
  230. }
  231. } else if fails >= MAX_FAILS1 {
  232. Config::update_latency(&host, 0);
  233. old_latency = 0;
  234. }
  235. }
  236. rz.register_peer(Sink::Framed(&mut socket, &addr)).await?;
  237. last_register_sent = now;
  238. }
  239. }
  240. }
  241. }
  242. Ok(())
  243. }
  244. #[inline]
  245. async fn handle_resp(
  246. &mut self,
  247. msg: Option<rendezvous_message::Union>,
  248. sink: Sink<'_>,
  249. server: &ServerPtr,
  250. update_latency: &mut impl FnMut(),
  251. ) -> ResultType<()> {
  252. match msg {
  253. Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => {
  254. update_latency();
  255. if rpr.request_pk {
  256. log::info!("request_pk received from {}", self.host);
  257. self.register_pk(sink).await?;
  258. }
  259. }
  260. Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => {
  261. update_latency();
  262. match rpr.result.enum_value() {
  263. Ok(register_pk_response::Result::OK) => {
  264. Config::set_key_confirmed(true);
  265. Config::set_host_key_confirmed(&self.host_prefix, true);
  266. *SOLVING_PK_MISMATCH.lock().await = "".to_owned();
  267. }
  268. Ok(register_pk_response::Result::UUID_MISMATCH) => {
  269. self.handle_uuid_mismatch(sink).await?;
  270. }
  271. _ => {
  272. log::error!("unknown RegisterPkResponse");
  273. }
  274. }
  275. if rpr.keep_alive > 0 {
  276. self.keep_alive = rpr.keep_alive * 1000;
  277. log::info!("keep_alive: {}ms", self.keep_alive);
  278. }
  279. }
  280. Some(rendezvous_message::Union::PunchHole(ph)) => {
  281. let rz = self.clone();
  282. let server = server.clone();
  283. tokio::spawn(async move {
  284. allow_err!(rz.handle_punch_hole(ph, server).await);
  285. });
  286. }
  287. Some(rendezvous_message::Union::RequestRelay(rr)) => {
  288. let rz = self.clone();
  289. let server = server.clone();
  290. tokio::spawn(async move {
  291. allow_err!(rz.handle_request_relay(rr, server).await);
  292. });
  293. }
  294. Some(rendezvous_message::Union::FetchLocalAddr(fla)) => {
  295. let rz = self.clone();
  296. let server = server.clone();
  297. tokio::spawn(async move {
  298. allow_err!(rz.handle_intranet(fla, server).await);
  299. });
  300. }
  301. Some(rendezvous_message::Union::ConfigureUpdate(cu)) => {
  302. let v0 = Config::get_rendezvous_servers();
  303. Config::set_option(
  304. "rendezvous-servers".to_owned(),
  305. cu.rendezvous_servers.join(","),
  306. );
  307. Config::set_serial(cu.serial);
  308. if v0 != Config::get_rendezvous_servers() {
  309. Self::restart();
  310. }
  311. }
  312. _ => {}
  313. }
  314. Ok(())
  315. }
  316. pub async fn start_tcp(server: ServerPtr, host: String) -> ResultType<()> {
  317. let host = check_port(&host, RENDEZVOUS_PORT);
  318. let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?;
  319. let key = crate::get_key(true).await;
  320. crate::secure_tcp(&mut conn, &key).await?;
  321. let mut rz = Self {
  322. addr: conn.local_addr().into_target_addr()?,
  323. host: host.clone(),
  324. host_prefix: Self::get_host_prefix(&host),
  325. keep_alive: crate::DEFAULT_KEEP_ALIVE,
  326. };
  327. let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
  328. let mut last_register_sent: Option<Instant> = None;
  329. let mut last_recv_msg = Instant::now();
  330. // we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here.
  331. Config::set_host_key_confirmed(&host, false);
  332. loop {
  333. let mut update_latency = || {
  334. let latency = last_register_sent
  335. .map(|x| x.elapsed().as_micros() as i64)
  336. .unwrap_or(0);
  337. Config::update_latency(&host, latency);
  338. log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.);
  339. };
  340. select! {
  341. res = conn.next() => {
  342. last_recv_msg = Instant::now();
  343. let bytes = res.ok_or_else(|| anyhow::anyhow!("Rendezvous connection is reset by the peer"))??;
  344. if bytes.is_empty() {
  345. conn.send_bytes(bytes::Bytes::new()).await?;
  346. continue; // heartbeat
  347. }
  348. let msg = Message::parse_from_bytes(&bytes)?;
  349. rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
  350. }
  351. _ = timer.tick() => {
  352. if SHOULD_EXIT.load(Ordering::SeqCst) {
  353. break;
  354. }
  355. // https://www.emqx.com/en/blog/mqtt-keep-alive
  356. if last_recv_msg.elapsed().as_millis() as u64 > rz.keep_alive as u64 * 3 / 2 {
  357. bail!("Rendezvous connection is timeout");
  358. }
  359. if (!Config::get_key_confirmed() ||
  360. !Config::get_host_key_confirmed(&host)) &&
  361. last_register_sent.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL) >= REG_INTERVAL {
  362. rz.register_pk(Sink::Stream(&mut conn)).await?;
  363. last_register_sent = Some(Instant::now());
  364. }
  365. }
  366. }
  367. }
  368. Ok(())
  369. }
  370. pub async fn start(server: ServerPtr, host: String) -> ResultType<()> {
  371. log::info!("start rendezvous mediator of {}", host);
  372. //If the investment agent type is http or https, then tcp forwarding is enabled.
  373. let is_http_proxy = if let Some(conf) = Config::get_socks() {
  374. let proxy = Proxy::from_conf(&conf, None)?;
  375. proxy.is_http_or_https()
  376. } else {
  377. false
  378. };
  379. if (cfg!(debug_assertions) && option_env!("TEST_TCP").is_some())
  380. || is_http_proxy
  381. || get_builtin_option(config::keys::OPTION_DISABLE_UDP) == "Y"
  382. {
  383. Self::start_tcp(server, host).await
  384. } else {
  385. Self::start_udp(server, host).await
  386. }
  387. }
  388. async fn handle_request_relay(&self, rr: RequestRelay, server: ServerPtr) -> ResultType<()> {
  389. self.create_relay(
  390. rr.socket_addr.into(),
  391. rr.relay_server,
  392. rr.uuid,
  393. server,
  394. rr.secure,
  395. false,
  396. )
  397. .await
  398. }
  399. async fn create_relay(
  400. &self,
  401. socket_addr: Vec<u8>,
  402. relay_server: String,
  403. uuid: String,
  404. server: ServerPtr,
  405. secure: bool,
  406. initiate: bool,
  407. ) -> ResultType<()> {
  408. let peer_addr = AddrMangle::decode(&socket_addr);
  409. log::info!(
  410. "create_relay requested from {:?}, relay_server: {}, uuid: {}, secure: {}",
  411. peer_addr,
  412. relay_server,
  413. uuid,
  414. secure,
  415. );
  416. let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
  417. let mut msg_out = Message::new();
  418. let mut rr = RelayResponse {
  419. socket_addr: socket_addr.into(),
  420. version: crate::VERSION.to_owned(),
  421. ..Default::default()
  422. };
  423. if initiate {
  424. rr.uuid = uuid.clone();
  425. rr.relay_server = relay_server.clone();
  426. rr.set_id(Config::get_id());
  427. }
  428. msg_out.set_relay_response(rr);
  429. socket.send(&msg_out).await?;
  430. crate::create_relay_connection(
  431. server,
  432. relay_server,
  433. uuid,
  434. peer_addr,
  435. secure,
  436. is_ipv4(&self.addr),
  437. )
  438. .await;
  439. Ok(())
  440. }
  441. async fn handle_intranet(&self, fla: FetchLocalAddr, server: ServerPtr) -> ResultType<()> {
  442. let relay_server = self.get_relay_server(fla.relay_server.clone());
  443. // nat64, go relay directly, because current hbbs will crash if demangle ipv6 address
  444. if is_ipv4(&self.addr) && !config::is_disable_tcp_listen() && !Config::is_proxy() {
  445. if let Err(err) = self
  446. .handle_intranet_(fla.clone(), server.clone(), relay_server.clone())
  447. .await
  448. {
  449. log::debug!("Failed to handle intranet: {:?}, will try relay", err);
  450. } else {
  451. return Ok(());
  452. }
  453. }
  454. let uuid = Uuid::new_v4().to_string();
  455. self.create_relay(
  456. fla.socket_addr.into(),
  457. relay_server,
  458. uuid,
  459. server,
  460. true,
  461. true,
  462. )
  463. .await
  464. }
  465. async fn handle_intranet_(
  466. &self,
  467. fla: FetchLocalAddr,
  468. server: ServerPtr,
  469. relay_server: String,
  470. ) -> ResultType<()> {
  471. let peer_addr = AddrMangle::decode(&fla.socket_addr);
  472. log::debug!("Handle intranet from {:?}", peer_addr);
  473. let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
  474. let local_addr = socket.local_addr();
  475. // we saw invalid local_addr while using proxy, local_addr.ip() == "::1"
  476. let local_addr: SocketAddr =
  477. format!("{}:{}", local_addr.ip(), local_addr.port()).parse()?;
  478. let mut msg_out = Message::new();
  479. msg_out.set_local_addr(LocalAddr {
  480. id: Config::get_id(),
  481. socket_addr: AddrMangle::encode(peer_addr).into(),
  482. local_addr: AddrMangle::encode(local_addr).into(),
  483. relay_server,
  484. version: crate::VERSION.to_owned(),
  485. ..Default::default()
  486. });
  487. let bytes = msg_out.write_to_bytes()?;
  488. socket.send_raw(bytes).await?;
  489. crate::accept_connection(server.clone(), socket, peer_addr, true).await;
  490. Ok(())
  491. }
  492. async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> {
  493. let relay_server = self.get_relay_server(ph.relay_server);
  494. if ph.nat_type.enum_value() == Ok(NatType::SYMMETRIC)
  495. || Config::get_nat_type() == NatType::SYMMETRIC as i32
  496. || config::is_disable_tcp_listen()
  497. {
  498. let uuid = Uuid::new_v4().to_string();
  499. return self
  500. .create_relay(
  501. ph.socket_addr.into(),
  502. relay_server,
  503. uuid,
  504. server,
  505. true,
  506. true,
  507. )
  508. .await;
  509. }
  510. let peer_addr = AddrMangle::decode(&ph.socket_addr);
  511. log::debug!("Punch hole to {:?}", peer_addr);
  512. let mut socket = {
  513. let socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
  514. let local_addr = socket.local_addr();
  515. // key important here for punch hole to tell my gateway incoming peer is safe.
  516. // it can not be async here, because local_addr can not be reused, we must close the connection before use it again.
  517. allow_err!(socket_client::connect_tcp_local(peer_addr, Some(local_addr), 30).await);
  518. socket
  519. };
  520. let mut msg_out = Message::new();
  521. use hbb_common::protobuf::Enum;
  522. let nat_type = NatType::from_i32(Config::get_nat_type()).unwrap_or(NatType::UNKNOWN_NAT);
  523. msg_out.set_punch_hole_sent(PunchHoleSent {
  524. socket_addr: ph.socket_addr,
  525. id: Config::get_id(),
  526. relay_server,
  527. nat_type: nat_type.into(),
  528. version: crate::VERSION.to_owned(),
  529. ..Default::default()
  530. });
  531. let bytes = msg_out.write_to_bytes()?;
  532. socket.send_raw(bytes).await?;
  533. crate::accept_connection(server.clone(), socket, peer_addr, true).await;
  534. Ok(())
  535. }
  536. async fn register_pk(&mut self, socket: Sink<'_>) -> ResultType<()> {
  537. let mut msg_out = Message::new();
  538. let pk = Config::get_key_pair().1;
  539. let uuid = hbb_common::get_uuid();
  540. let id = Config::get_id();
  541. msg_out.set_register_pk(RegisterPk {
  542. id,
  543. uuid: uuid.into(),
  544. pk: pk.into(),
  545. ..Default::default()
  546. });
  547. socket.send(&msg_out).await?;
  548. Ok(())
  549. }
  550. async fn handle_uuid_mismatch(&mut self, socket: Sink<'_>) -> ResultType<()> {
  551. {
  552. let mut solving = SOLVING_PK_MISMATCH.lock().await;
  553. if solving.is_empty() || *solving == self.host {
  554. log::info!("UUID_MISMATCH received from {}", self.host);
  555. Config::set_key_confirmed(false);
  556. Config::update_id();
  557. *solving = self.host.clone();
  558. } else {
  559. return Ok(());
  560. }
  561. }
  562. self.register_pk(socket).await
  563. }
  564. async fn register_peer(&mut self, socket: Sink<'_>) -> ResultType<()> {
  565. let solving = SOLVING_PK_MISMATCH.lock().await;
  566. if !(solving.is_empty() || *solving == self.host) {
  567. return Ok(());
  568. }
  569. drop(solving);
  570. if !Config::get_key_confirmed() || !Config::get_host_key_confirmed(&self.host_prefix) {
  571. log::info!(
  572. "register_pk of {} due to key not confirmed",
  573. self.host_prefix
  574. );
  575. return self.register_pk(socket).await;
  576. }
  577. let id = Config::get_id();
  578. log::trace!(
  579. "Register my id {:?} to rendezvous server {:?}",
  580. id,
  581. self.addr,
  582. );
  583. let mut msg_out = Message::new();
  584. let serial = Config::get_serial();
  585. msg_out.set_register_peer(RegisterPeer {
  586. id,
  587. serial,
  588. ..Default::default()
  589. });
  590. socket.send(&msg_out).await?;
  591. Ok(())
  592. }
  593. fn get_relay_server(&self, provided_by_rendezvous_server: String) -> String {
  594. let mut relay_server = Config::get_option("relay-server");
  595. if relay_server.is_empty() {
  596. relay_server = provided_by_rendezvous_server;
  597. }
  598. if relay_server.is_empty() {
  599. relay_server = crate::increase_port(&self.host, 1);
  600. }
  601. relay_server
  602. }
  603. }
  604. fn get_direct_port() -> i32 {
  605. let mut port = Config::get_option("direct-access-port")
  606. .parse::<i32>()
  607. .unwrap_or(0);
  608. if port <= 0 {
  609. port = RENDEZVOUS_PORT + 2;
  610. }
  611. port
  612. }
  613. async fn direct_server(server: ServerPtr) {
  614. let mut listener = None;
  615. let mut port = 0;
  616. loop {
  617. let disabled = !option2bool(
  618. OPTION_DIRECT_SERVER,
  619. &Config::get_option(OPTION_DIRECT_SERVER),
  620. ) || option2bool("stop-service", &Config::get_option("stop-service"));
  621. if !disabled && listener.is_none() {
  622. port = get_direct_port();
  623. match hbb_common::tcp::listen_any(port as _).await {
  624. Ok(l) => {
  625. listener = Some(l);
  626. log::info!(
  627. "Direct server listening on: {:?}",
  628. listener.as_ref().map(|l| l.local_addr())
  629. );
  630. }
  631. Err(err) => {
  632. // to-do: pass to ui
  633. log::error!(
  634. "Failed to start direct server on port: {}, error: {}",
  635. port,
  636. err
  637. );
  638. loop {
  639. if port != get_direct_port() {
  640. break;
  641. }
  642. sleep(1.).await;
  643. }
  644. }
  645. }
  646. }
  647. if let Some(l) = listener.as_mut() {
  648. if disabled || port != get_direct_port() {
  649. log::info!("Exit direct access listen");
  650. listener = None;
  651. continue;
  652. }
  653. if let Ok(Ok((stream, addr))) = hbb_common::timeout(1000, l.accept()).await {
  654. stream.set_nodelay(true).ok();
  655. log::info!("direct access from {}", addr);
  656. let local_addr = stream
  657. .local_addr()
  658. .unwrap_or(Config::get_any_listen_addr(true));
  659. let server = server.clone();
  660. tokio::spawn(async move {
  661. allow_err!(
  662. crate::server::create_tcp_connection(
  663. server,
  664. hbb_common::Stream::from(stream, local_addr),
  665. addr,
  666. false,
  667. )
  668. .await
  669. );
  670. });
  671. } else {
  672. sleep(0.1).await;
  673. }
  674. } else {
  675. sleep(1.).await;
  676. }
  677. }
  678. }
  679. enum Sink<'a> {
  680. Framed(&'a mut FramedSocket, &'a TargetAddr<'a>),
  681. Stream(&'a mut FramedStream),
  682. }
  683. impl Sink<'_> {
  684. async fn send(self, msg: &Message) -> ResultType<()> {
  685. match self {
  686. Sink::Framed(socket, addr) => socket.send(msg, addr.to_owned()).await,
  687. Sink::Stream(stream) => stream.send(msg).await,
  688. }
  689. }
  690. }