123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720 |
- use std::{
- net::SocketAddr,
- sync::{
- atomic::{AtomicBool, Ordering},
- Arc,
- },
- time::Instant,
- };
- use uuid::Uuid;
- use hbb_common::{
- allow_err,
- anyhow::{self, bail},
- config::{self, keys::*, option2bool, Config, CONNECT_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT},
- futures::future::join_all,
- log,
- protobuf::Message as _,
- proxy::Proxy,
- rendezvous_proto::*,
- sleep,
- socket_client::{self, connect_tcp, is_ipv4},
- tcp::FramedStream,
- tokio::{self, select, sync::Mutex, time::interval},
- udp::FramedSocket,
- AddrMangle, IntoTargetAddr, ResultType, TargetAddr,
- };
- use crate::{
- check_port,
- server::{check_zombie, new as new_server, ServerPtr},
- ui_interface::get_builtin_option,
- };
- type Message = RendezvousMessage;
- lazy_static::lazy_static! {
- static ref SOLVING_PK_MISMATCH: Arc<Mutex<String>> = Default::default();
- }
- static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
- static MANUAL_RESTARTED: AtomicBool = AtomicBool::new(false);
- #[derive(Clone)]
- pub struct RendezvousMediator {
- addr: TargetAddr<'static>,
- host: String,
- host_prefix: String,
- keep_alive: i32,
- }
- impl RendezvousMediator {
- pub fn restart() {
- SHOULD_EXIT.store(true, Ordering::SeqCst);
- MANUAL_RESTARTED.store(true, Ordering::SeqCst);
- log::info!("server restart");
- }
- pub async fn start_all() {
- if config::is_outgoing_only() {
- loop {
- sleep(1.).await;
- }
- }
- crate::hbbs_http::sync::start();
- let mut nat_tested = false;
- check_zombie();
- let server = new_server();
- if Config::get_nat_type() == NatType::UNKNOWN_NAT as i32 {
- crate::test_nat_type();
- nat_tested = true;
- }
- if config::option2bool("stop-service", &Config::get_option("stop-service")) {
- crate::test_rendezvous_server();
- }
- let server_cloned = server.clone();
- tokio::spawn(async move {
- direct_server(server_cloned).await;
- });
- #[cfg(target_os = "android")]
- let start_lan_listening = true;
- #[cfg(not(any(target_os = "android", target_os = "ios")))]
- let start_lan_listening = crate::platform::is_installed();
- if start_lan_listening {
- std::thread::spawn(move || {
- allow_err!(super::lan::start_listening());
- });
- }
- // It is ok to run xdesktop manager when the headless function is not allowed.
- #[cfg(target_os = "linux")]
- if crate::is_server() {
- crate::platform::linux_desktop_manager::start_xdesktop();
- }
- scrap::codec::test_av1();
- loop {
- let conn_start_time = Instant::now();
- *SOLVING_PK_MISMATCH.lock().await = "".to_owned();
- if !config::option2bool("stop-service", &Config::get_option("stop-service"))
- && !crate::platform::installing_service()
- {
- if !nat_tested {
- crate::test_nat_type();
- nat_tested = true;
- }
- let mut futs = Vec::new();
- let servers = Config::get_rendezvous_servers();
- SHOULD_EXIT.store(false, Ordering::SeqCst);
- MANUAL_RESTARTED.store(false, Ordering::SeqCst);
- for host in servers.clone() {
- let server = server.clone();
- futs.push(tokio::spawn(async move {
- if let Err(err) = Self::start(server, host).await {
- log::error!("rendezvous mediator error: {err}");
- }
- // SHOULD_EXIT here is to ensure once one exits, the others also exit.
- SHOULD_EXIT.store(true, Ordering::SeqCst);
- }));
- }
- join_all(futs).await;
- } else {
- server.write().unwrap().close_connections();
- }
- Config::reset_online();
- if !MANUAL_RESTARTED.load(Ordering::SeqCst) {
- let elapsed = conn_start_time.elapsed().as_millis() as u64;
- if elapsed < CONNECT_TIMEOUT {
- sleep(((CONNECT_TIMEOUT - elapsed) / 1000) as _).await;
- }
- }
- }
- }
- fn get_host_prefix(host: &str) -> String {
- host.split(".")
- .next()
- .map(|x| {
- if x.parse::<i32>().is_ok() {
- host.to_owned()
- } else {
- x.to_owned()
- }
- })
- .unwrap_or(host.to_owned())
- }
- pub async fn start_udp(server: ServerPtr, host: String) -> ResultType<()> {
- let host = check_port(&host, RENDEZVOUS_PORT);
- let (mut socket, mut addr) = socket_client::new_udp_for(&host, CONNECT_TIMEOUT).await?;
- let mut rz = Self {
- addr: addr.clone(),
- host: host.clone(),
- host_prefix: Self::get_host_prefix(&host),
- keep_alive: crate::DEFAULT_KEEP_ALIVE,
- };
- let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
- const MIN_REG_TIMEOUT: i64 = 3_000;
- const MAX_REG_TIMEOUT: i64 = 30_000;
- let mut reg_timeout = MIN_REG_TIMEOUT;
- const MAX_FAILS1: i64 = 2;
- const MAX_FAILS2: i64 = 4;
- const DNS_INTERVAL: i64 = 60_000;
- let mut fails = 0;
- let mut last_register_resp: Option<Instant> = None;
- let mut last_register_sent: Option<Instant> = None;
- let mut last_dns_check = Instant::now();
- let mut old_latency = 0;
- let mut ema_latency = 0;
- loop {
- let mut update_latency = || {
- last_register_resp = Some(Instant::now());
- fails = 0;
- reg_timeout = MIN_REG_TIMEOUT;
- let mut latency = last_register_sent
- .map(|x| x.elapsed().as_micros() as i64)
- .unwrap_or(0);
- last_register_sent = None;
- if latency < 0 || latency > 1_000_000 {
- return;
- }
- if ema_latency == 0 {
- ema_latency = latency;
- } else {
- ema_latency = latency / 30 + (ema_latency * 29 / 30);
- latency = ema_latency;
- }
- let mut n = latency / 5;
- if n < 3000 {
- n = 3000;
- }
- if (latency - old_latency).abs() > n || old_latency <= 0 {
- Config::update_latency(&host, latency);
- log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.);
- old_latency = latency;
- }
- };
- select! {
- n = socket.next() => {
- match n {
- Some(Ok((bytes, _))) => {
- if let Ok(msg) = Message::parse_from_bytes(&bytes) {
- rz.handle_resp(msg.union, Sink::Framed(&mut socket, &addr), &server, &mut update_latency).await?;
- } else {
- log::debug!("Non-protobuf message bytes received: {:?}", bytes);
- }
- },
- Some(Err(e)) => bail!("Failed to receive next {}", e), // maybe socks5 tcp disconnected
- None => {
- bail!("Socket receive none. Maybe socks5 server is down.");
- },
- }
- },
- _ = timer.tick() => {
- if SHOULD_EXIT.load(Ordering::SeqCst) {
- break;
- }
- let now = Some(Instant::now());
- let expired = last_register_resp.map(|x| x.elapsed().as_millis() as i64 >= REG_INTERVAL).unwrap_or(true);
- let timeout = last_register_sent.map(|x| x.elapsed().as_millis() as i64 >= reg_timeout).unwrap_or(false);
- // temporarily disable exponential backoff for android before we add wakeup trigger to force connect in android
- #[cfg(not(any(target_os = "android", target_os = "ios")))]
- if crate::using_public_server() { // only turn on this for public server, may help DDNS self-hosting user.
- if timeout && reg_timeout < MAX_REG_TIMEOUT {
- reg_timeout += MIN_REG_TIMEOUT;
- }
- }
- if timeout || (last_register_sent.is_none() && expired) {
- if timeout {
- fails += 1;
- if fails >= MAX_FAILS2 {
- Config::update_latency(&host, -1);
- old_latency = 0;
- if last_dns_check.elapsed().as_millis() as i64 > DNS_INTERVAL {
- // in some case of network reconnect (dial IP network),
- // old UDP socket not work any more after network recover
- if let Some((s, new_addr)) = socket_client::rebind_udp_for(&rz.host).await? {
- socket = s;
- rz.addr = new_addr.clone();
- addr = new_addr;
- }
- last_dns_check = Instant::now();
- }
- } else if fails >= MAX_FAILS1 {
- Config::update_latency(&host, 0);
- old_latency = 0;
- }
- }
- rz.register_peer(Sink::Framed(&mut socket, &addr)).await?;
- last_register_sent = now;
- }
- }
- }
- }
- Ok(())
- }
- #[inline]
- async fn handle_resp(
- &mut self,
- msg: Option<rendezvous_message::Union>,
- sink: Sink<'_>,
- server: &ServerPtr,
- update_latency: &mut impl FnMut(),
- ) -> ResultType<()> {
- match msg {
- Some(rendezvous_message::Union::RegisterPeerResponse(rpr)) => {
- update_latency();
- if rpr.request_pk {
- log::info!("request_pk received from {}", self.host);
- self.register_pk(sink).await?;
- }
- }
- Some(rendezvous_message::Union::RegisterPkResponse(rpr)) => {
- update_latency();
- match rpr.result.enum_value() {
- Ok(register_pk_response::Result::OK) => {
- Config::set_key_confirmed(true);
- Config::set_host_key_confirmed(&self.host_prefix, true);
- *SOLVING_PK_MISMATCH.lock().await = "".to_owned();
- }
- Ok(register_pk_response::Result::UUID_MISMATCH) => {
- self.handle_uuid_mismatch(sink).await?;
- }
- _ => {
- log::error!("unknown RegisterPkResponse");
- }
- }
- if rpr.keep_alive > 0 {
- self.keep_alive = rpr.keep_alive * 1000;
- log::info!("keep_alive: {}ms", self.keep_alive);
- }
- }
- Some(rendezvous_message::Union::PunchHole(ph)) => {
- let rz = self.clone();
- let server = server.clone();
- tokio::spawn(async move {
- allow_err!(rz.handle_punch_hole(ph, server).await);
- });
- }
- Some(rendezvous_message::Union::RequestRelay(rr)) => {
- let rz = self.clone();
- let server = server.clone();
- tokio::spawn(async move {
- allow_err!(rz.handle_request_relay(rr, server).await);
- });
- }
- Some(rendezvous_message::Union::FetchLocalAddr(fla)) => {
- let rz = self.clone();
- let server = server.clone();
- tokio::spawn(async move {
- allow_err!(rz.handle_intranet(fla, server).await);
- });
- }
- Some(rendezvous_message::Union::ConfigureUpdate(cu)) => {
- let v0 = Config::get_rendezvous_servers();
- Config::set_option(
- "rendezvous-servers".to_owned(),
- cu.rendezvous_servers.join(","),
- );
- Config::set_serial(cu.serial);
- if v0 != Config::get_rendezvous_servers() {
- Self::restart();
- }
- }
- _ => {}
- }
- Ok(())
- }
- pub async fn start_tcp(server: ServerPtr, host: String) -> ResultType<()> {
- let host = check_port(&host, RENDEZVOUS_PORT);
- let mut conn = connect_tcp(host.clone(), CONNECT_TIMEOUT).await?;
- let key = crate::get_key(true).await;
- crate::secure_tcp(&mut conn, &key).await?;
- let mut rz = Self {
- addr: conn.local_addr().into_target_addr()?,
- host: host.clone(),
- host_prefix: Self::get_host_prefix(&host),
- keep_alive: crate::DEFAULT_KEEP_ALIVE,
- };
- let mut timer = crate::rustdesk_interval(interval(crate::TIMER_OUT));
- let mut last_register_sent: Option<Instant> = None;
- let mut last_recv_msg = Instant::now();
- // we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here.
- Config::set_host_key_confirmed(&host, false);
- loop {
- let mut update_latency = || {
- let latency = last_register_sent
- .map(|x| x.elapsed().as_micros() as i64)
- .unwrap_or(0);
- Config::update_latency(&host, latency);
- log::debug!("Latency of {}: {}ms", host, latency as f64 / 1000.);
- };
- select! {
- res = conn.next() => {
- last_recv_msg = Instant::now();
- let bytes = res.ok_or_else(|| anyhow::anyhow!("Rendezvous connection is reset by the peer"))??;
- if bytes.is_empty() {
- conn.send_bytes(bytes::Bytes::new()).await?;
- continue; // heartbeat
- }
- let msg = Message::parse_from_bytes(&bytes)?;
- rz.handle_resp(msg.union, Sink::Stream(&mut conn), &server, &mut update_latency).await?
- }
- _ = timer.tick() => {
- if SHOULD_EXIT.load(Ordering::SeqCst) {
- break;
- }
- // https://www.emqx.com/en/blog/mqtt-keep-alive
- if last_recv_msg.elapsed().as_millis() as u64 > rz.keep_alive as u64 * 3 / 2 {
- bail!("Rendezvous connection is timeout");
- }
- if (!Config::get_key_confirmed() ||
- !Config::get_host_key_confirmed(&host)) &&
- last_register_sent.map(|x| x.elapsed().as_millis() as i64).unwrap_or(REG_INTERVAL) >= REG_INTERVAL {
- rz.register_pk(Sink::Stream(&mut conn)).await?;
- last_register_sent = Some(Instant::now());
- }
- }
- }
- }
- Ok(())
- }
- pub async fn start(server: ServerPtr, host: String) -> ResultType<()> {
- log::info!("start rendezvous mediator of {}", host);
- //If the investment agent type is http or https, then tcp forwarding is enabled.
- let is_http_proxy = if let Some(conf) = Config::get_socks() {
- let proxy = Proxy::from_conf(&conf, None)?;
- proxy.is_http_or_https()
- } else {
- false
- };
- if (cfg!(debug_assertions) && option_env!("TEST_TCP").is_some())
- || is_http_proxy
- || get_builtin_option(config::keys::OPTION_DISABLE_UDP) == "Y"
- {
- Self::start_tcp(server, host).await
- } else {
- Self::start_udp(server, host).await
- }
- }
- async fn handle_request_relay(&self, rr: RequestRelay, server: ServerPtr) -> ResultType<()> {
- self.create_relay(
- rr.socket_addr.into(),
- rr.relay_server,
- rr.uuid,
- server,
- rr.secure,
- false,
- )
- .await
- }
- async fn create_relay(
- &self,
- socket_addr: Vec<u8>,
- relay_server: String,
- uuid: String,
- server: ServerPtr,
- secure: bool,
- initiate: bool,
- ) -> ResultType<()> {
- let peer_addr = AddrMangle::decode(&socket_addr);
- log::info!(
- "create_relay requested from {:?}, relay_server: {}, uuid: {}, secure: {}",
- peer_addr,
- relay_server,
- uuid,
- secure,
- );
- let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
- let mut msg_out = Message::new();
- let mut rr = RelayResponse {
- socket_addr: socket_addr.into(),
- version: crate::VERSION.to_owned(),
- ..Default::default()
- };
- if initiate {
- rr.uuid = uuid.clone();
- rr.relay_server = relay_server.clone();
- rr.set_id(Config::get_id());
- }
- msg_out.set_relay_response(rr);
- socket.send(&msg_out).await?;
- crate::create_relay_connection(
- server,
- relay_server,
- uuid,
- peer_addr,
- secure,
- is_ipv4(&self.addr),
- )
- .await;
- Ok(())
- }
- async fn handle_intranet(&self, fla: FetchLocalAddr, server: ServerPtr) -> ResultType<()> {
- let relay_server = self.get_relay_server(fla.relay_server.clone());
- // nat64, go relay directly, because current hbbs will crash if demangle ipv6 address
- if is_ipv4(&self.addr) && !config::is_disable_tcp_listen() && !Config::is_proxy() {
- if let Err(err) = self
- .handle_intranet_(fla.clone(), server.clone(), relay_server.clone())
- .await
- {
- log::debug!("Failed to handle intranet: {:?}, will try relay", err);
- } else {
- return Ok(());
- }
- }
- let uuid = Uuid::new_v4().to_string();
- self.create_relay(
- fla.socket_addr.into(),
- relay_server,
- uuid,
- server,
- true,
- true,
- )
- .await
- }
- async fn handle_intranet_(
- &self,
- fla: FetchLocalAddr,
- server: ServerPtr,
- relay_server: String,
- ) -> ResultType<()> {
- let peer_addr = AddrMangle::decode(&fla.socket_addr);
- log::debug!("Handle intranet from {:?}", peer_addr);
- let mut socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
- let local_addr = socket.local_addr();
- // we saw invalid local_addr while using proxy, local_addr.ip() == "::1"
- let local_addr: SocketAddr =
- format!("{}:{}", local_addr.ip(), local_addr.port()).parse()?;
- let mut msg_out = Message::new();
- msg_out.set_local_addr(LocalAddr {
- id: Config::get_id(),
- socket_addr: AddrMangle::encode(peer_addr).into(),
- local_addr: AddrMangle::encode(local_addr).into(),
- relay_server,
- version: crate::VERSION.to_owned(),
- ..Default::default()
- });
- let bytes = msg_out.write_to_bytes()?;
- socket.send_raw(bytes).await?;
- crate::accept_connection(server.clone(), socket, peer_addr, true).await;
- Ok(())
- }
- async fn handle_punch_hole(&self, ph: PunchHole, server: ServerPtr) -> ResultType<()> {
- let relay_server = self.get_relay_server(ph.relay_server);
- if ph.nat_type.enum_value() == Ok(NatType::SYMMETRIC)
- || Config::get_nat_type() == NatType::SYMMETRIC as i32
- || config::is_disable_tcp_listen()
- {
- let uuid = Uuid::new_v4().to_string();
- return self
- .create_relay(
- ph.socket_addr.into(),
- relay_server,
- uuid,
- server,
- true,
- true,
- )
- .await;
- }
- let peer_addr = AddrMangle::decode(&ph.socket_addr);
- log::debug!("Punch hole to {:?}", peer_addr);
- let mut socket = {
- let socket = connect_tcp(&*self.host, CONNECT_TIMEOUT).await?;
- let local_addr = socket.local_addr();
- // key important here for punch hole to tell my gateway incoming peer is safe.
- // it can not be async here, because local_addr can not be reused, we must close the connection before use it again.
- allow_err!(socket_client::connect_tcp_local(peer_addr, Some(local_addr), 30).await);
- socket
- };
- let mut msg_out = Message::new();
- use hbb_common::protobuf::Enum;
- let nat_type = NatType::from_i32(Config::get_nat_type()).unwrap_or(NatType::UNKNOWN_NAT);
- msg_out.set_punch_hole_sent(PunchHoleSent {
- socket_addr: ph.socket_addr,
- id: Config::get_id(),
- relay_server,
- nat_type: nat_type.into(),
- version: crate::VERSION.to_owned(),
- ..Default::default()
- });
- let bytes = msg_out.write_to_bytes()?;
- socket.send_raw(bytes).await?;
- crate::accept_connection(server.clone(), socket, peer_addr, true).await;
- Ok(())
- }
- async fn register_pk(&mut self, socket: Sink<'_>) -> ResultType<()> {
- let mut msg_out = Message::new();
- let pk = Config::get_key_pair().1;
- let uuid = hbb_common::get_uuid();
- let id = Config::get_id();
- msg_out.set_register_pk(RegisterPk {
- id,
- uuid: uuid.into(),
- pk: pk.into(),
- ..Default::default()
- });
- socket.send(&msg_out).await?;
- Ok(())
- }
- async fn handle_uuid_mismatch(&mut self, socket: Sink<'_>) -> ResultType<()> {
- {
- let mut solving = SOLVING_PK_MISMATCH.lock().await;
- if solving.is_empty() || *solving == self.host {
- log::info!("UUID_MISMATCH received from {}", self.host);
- Config::set_key_confirmed(false);
- Config::update_id();
- *solving = self.host.clone();
- } else {
- return Ok(());
- }
- }
- self.register_pk(socket).await
- }
- async fn register_peer(&mut self, socket: Sink<'_>) -> ResultType<()> {
- let solving = SOLVING_PK_MISMATCH.lock().await;
- if !(solving.is_empty() || *solving == self.host) {
- return Ok(());
- }
- drop(solving);
- if !Config::get_key_confirmed() || !Config::get_host_key_confirmed(&self.host_prefix) {
- log::info!(
- "register_pk of {} due to key not confirmed",
- self.host_prefix
- );
- return self.register_pk(socket).await;
- }
- let id = Config::get_id();
- log::trace!(
- "Register my id {:?} to rendezvous server {:?}",
- id,
- self.addr,
- );
- let mut msg_out = Message::new();
- let serial = Config::get_serial();
- msg_out.set_register_peer(RegisterPeer {
- id,
- serial,
- ..Default::default()
- });
- socket.send(&msg_out).await?;
- Ok(())
- }
- fn get_relay_server(&self, provided_by_rendezvous_server: String) -> String {
- let mut relay_server = Config::get_option("relay-server");
- if relay_server.is_empty() {
- relay_server = provided_by_rendezvous_server;
- }
- if relay_server.is_empty() {
- relay_server = crate::increase_port(&self.host, 1);
- }
- relay_server
- }
- }
- fn get_direct_port() -> i32 {
- let mut port = Config::get_option("direct-access-port")
- .parse::<i32>()
- .unwrap_or(0);
- if port <= 0 {
- port = RENDEZVOUS_PORT + 2;
- }
- port
- }
- async fn direct_server(server: ServerPtr) {
- let mut listener = None;
- let mut port = 0;
- loop {
- let disabled = !option2bool(
- OPTION_DIRECT_SERVER,
- &Config::get_option(OPTION_DIRECT_SERVER),
- ) || option2bool("stop-service", &Config::get_option("stop-service"));
- if !disabled && listener.is_none() {
- port = get_direct_port();
- match hbb_common::tcp::listen_any(port as _).await {
- Ok(l) => {
- listener = Some(l);
- log::info!(
- "Direct server listening on: {:?}",
- listener.as_ref().map(|l| l.local_addr())
- );
- }
- Err(err) => {
- // to-do: pass to ui
- log::error!(
- "Failed to start direct server on port: {}, error: {}",
- port,
- err
- );
- loop {
- if port != get_direct_port() {
- break;
- }
- sleep(1.).await;
- }
- }
- }
- }
- if let Some(l) = listener.as_mut() {
- if disabled || port != get_direct_port() {
- log::info!("Exit direct access listen");
- listener = None;
- continue;
- }
- if let Ok(Ok((stream, addr))) = hbb_common::timeout(1000, l.accept()).await {
- stream.set_nodelay(true).ok();
- log::info!("direct access from {}", addr);
- let local_addr = stream
- .local_addr()
- .unwrap_or(Config::get_any_listen_addr(true));
- let server = server.clone();
- tokio::spawn(async move {
- allow_err!(
- crate::server::create_tcp_connection(
- server,
- hbb_common::Stream::from(stream, local_addr),
- addr,
- false,
- )
- .await
- );
- });
- } else {
- sleep(0.1).await;
- }
- } else {
- sleep(1.).await;
- }
- }
- }
- enum Sink<'a> {
- Framed(&'a mut FramedSocket, &'a TargetAddr<'a>),
- Stream(&'a mut FramedStream),
- }
- impl Sink<'_> {
- async fn send(self, msg: &Message) -> ResultType<()> {
- match self {
- Sink::Framed(socket, addr) => socket.send(msg, addr.to_owned()).await,
- Sink::Stream(stream) => stream.send(msg).await,
- }
- }
- }
|