mirror of
https://github.com/rustdesk/rustdesk-server.git
synced 2026-03-07 04:23:11 +08:00
protbuf 3.1 with_bytes
This commit is contained in:
@@ -89,7 +89,12 @@ enum LoopFailure {
|
||||
|
||||
impl RendezvousServer {
|
||||
#[tokio::main(flavor = "multi_thread")]
|
||||
pub async fn start(port: i32, serial: i32, key: &str, rmem: usize) -> ResultType<()> {
|
||||
pub async fn start(
|
||||
port: i32,
|
||||
serial: i32,
|
||||
key: &str,
|
||||
rmem: usize,
|
||||
) -> ResultType<()> {
|
||||
let (key, sk) = Self::get_server_sk(key);
|
||||
let addr = format!("0.0.0.0:{}", port);
|
||||
let addr2 = format!("0.0.0.0:{}", port - 1);
|
||||
@@ -138,7 +143,6 @@ impl RendezvousServer {
|
||||
log::info!("local-ip: {:?}", rs.inner.local_ip);
|
||||
std::env::set_var("PORT_FOR_API", port.to_string());
|
||||
rs.parse_relay_servers(&get_arg("relay-servers"));
|
||||
let pm = rs.pm.clone();
|
||||
let mut listener = new_listener(&addr, false).await?;
|
||||
let mut listener2 = new_listener(&addr2, false).await?;
|
||||
let mut listener3 = new_listener(&addr3, false).await?;
|
||||
@@ -299,7 +303,7 @@ impl RendezvousServer {
|
||||
) -> ResultType<()> {
|
||||
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
|
||||
match msg_in.union {
|
||||
Some(rendezvous_message::Union::register_peer(rp)) => {
|
||||
Some(rendezvous_message::Union::RegisterPeer(rp)) => {
|
||||
// B registered
|
||||
if rp.id.len() > 0 {
|
||||
log::trace!("New peer registered: {:?} {:?}", &rp.id, &addr);
|
||||
@@ -315,7 +319,7 @@ impl RendezvousServer {
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(rendezvous_message::Union::register_pk(rk)) => {
|
||||
Some(rendezvous_message::Union::RegisterPk(rk)) => {
|
||||
if rk.uuid.is_empty() || rk.pk.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -402,7 +406,7 @@ impl RendezvousServer {
|
||||
});
|
||||
socket.send(&msg_out, addr).await?
|
||||
}
|
||||
Some(rendezvous_message::Union::punch_hole_request(ph)) => {
|
||||
Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
|
||||
if self.pm.is_in_memory(&ph.id).await {
|
||||
self.handle_udp_punch_hole_request(addr, ph, key).await?;
|
||||
} else {
|
||||
@@ -414,13 +418,13 @@ impl RendezvousServer {
|
||||
});
|
||||
}
|
||||
}
|
||||
Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
|
||||
Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
|
||||
self.handle_hole_sent(phs, addr, Some(socket)).await?;
|
||||
}
|
||||
Some(rendezvous_message::Union::local_addr(la)) => {
|
||||
Some(rendezvous_message::Union::LocalAddr(la)) => {
|
||||
self.handle_local_addr(la, addr, Some(socket)).await?;
|
||||
}
|
||||
Some(rendezvous_message::Union::configure_update(mut cu)) => {
|
||||
Some(rendezvous_message::Union::ConfigureUpdate(mut cu)) => {
|
||||
if addr.ip() == ADDR_127 && cu.serial > self.inner.serial {
|
||||
let mut inner: Inner = (*self.inner).clone();
|
||||
inner.serial = cu.serial;
|
||||
@@ -441,7 +445,7 @@ impl RendezvousServer {
|
||||
);
|
||||
}
|
||||
}
|
||||
Some(rendezvous_message::Union::software_update(su)) => {
|
||||
Some(rendezvous_message::Union::SoftwareUpdate(su)) => {
|
||||
if !self.inner.version.is_empty() && su.url != self.inner.version {
|
||||
let mut msg_out = RendezvousMessage::new();
|
||||
msg_out.set_software_update(SoftwareUpdate {
|
||||
@@ -468,7 +472,7 @@ impl RendezvousServer {
|
||||
) -> bool {
|
||||
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
|
||||
match msg_in.union {
|
||||
Some(rendezvous_message::Union::punch_hole_request(ph)) => {
|
||||
Some(rendezvous_message::Union::PunchHoleRequest(ph)) => {
|
||||
// there maybe several attempt, so sink can be none
|
||||
if let Some(sink) = sink.take() {
|
||||
self.tcp_punch.lock().await.insert(addr, sink);
|
||||
@@ -476,24 +480,24 @@ impl RendezvousServer {
|
||||
allow_err!(self.handle_tcp_punch_hole_request(addr, ph, &key, ws).await);
|
||||
return true;
|
||||
}
|
||||
Some(rendezvous_message::Union::request_relay(mut rf)) => {
|
||||
Some(rendezvous_message::Union::RequestRelay(mut rf)) => {
|
||||
// there maybe several attempt, so sink can be none
|
||||
if let Some(sink) = sink.take() {
|
||||
self.tcp_punch.lock().await.insert(addr, sink);
|
||||
}
|
||||
if let Some(peer) = self.pm.get_in_memory(&rf.id).await {
|
||||
let mut msg_out = RendezvousMessage::new();
|
||||
rf.socket_addr = AddrMangle::encode(addr);
|
||||
rf.socket_addr = AddrMangle::encode(addr).into();
|
||||
msg_out.set_request_relay(rf);
|
||||
let peer_addr = peer.read().await.socket_addr;
|
||||
self.tx.send(Data::Msg(msg_out, peer_addr)).ok();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
Some(rendezvous_message::Union::relay_response(mut rr)) => {
|
||||
Some(rendezvous_message::Union::RelayResponse(mut rr)) => {
|
||||
let addr_b = AddrMangle::decode(&rr.socket_addr);
|
||||
rr.socket_addr = Default::default();
|
||||
let id = rr.get_id();
|
||||
let id = rr.id();
|
||||
if !id.is_empty() {
|
||||
let pk = self.get_pk(&rr.version, id.to_owned()).await;
|
||||
rr.set_pk(pk);
|
||||
@@ -510,13 +514,13 @@ impl RendezvousServer {
|
||||
msg_out.set_relay_response(rr);
|
||||
allow_err!(self.send_to_tcp_sync(msg_out, addr_b).await);
|
||||
}
|
||||
Some(rendezvous_message::Union::punch_hole_sent(phs)) => {
|
||||
Some(rendezvous_message::Union::PunchHoleSent(phs)) => {
|
||||
allow_err!(self.handle_hole_sent(phs, addr, None).await);
|
||||
}
|
||||
Some(rendezvous_message::Union::local_addr(la)) => {
|
||||
Some(rendezvous_message::Union::LocalAddr(la)) => {
|
||||
allow_err!(self.handle_local_addr(la, addr, None).await);
|
||||
}
|
||||
Some(rendezvous_message::Union::test_nat_request(tar)) => {
|
||||
Some(rendezvous_message::Union::TestNatRequest(tar)) => {
|
||||
let mut msg_out = RendezvousMessage::new();
|
||||
let mut res = TestNatResponse {
|
||||
port: addr.port() as _,
|
||||
@@ -531,7 +535,7 @@ impl RendezvousServer {
|
||||
msg_out.set_test_nat_response(res);
|
||||
Self::send_to_sink(sink, msg_out).await;
|
||||
}
|
||||
Some(rendezvous_message::Union::register_pk(_rk)) => {
|
||||
Some(rendezvous_message::Union::RegisterPk(_)) => {
|
||||
let res = register_pk_response::Result::NOT_SUPPORT;
|
||||
let mut msg_out = RendezvousMessage::new();
|
||||
msg_out.set_register_pk_response(RegisterPkResponse {
|
||||
@@ -607,7 +611,7 @@ impl RendezvousServer {
|
||||
);
|
||||
let mut msg_out = RendezvousMessage::new();
|
||||
let mut p = PunchHoleResponse {
|
||||
socket_addr: AddrMangle::encode(addr),
|
||||
socket_addr: AddrMangle::encode(addr).into(),
|
||||
pk: self.get_pk(&phs.version, phs.id).await,
|
||||
relay_server: phs.relay_server.clone(),
|
||||
..Default::default()
|
||||
@@ -714,7 +718,7 @@ impl RendezvousServer {
|
||||
_ => false,
|
||||
},
|
||||
};
|
||||
let socket_addr = AddrMangle::encode(addr);
|
||||
let socket_addr = AddrMangle::encode(addr).into();
|
||||
if same_intranet {
|
||||
log::debug!(
|
||||
"Fetch local addr {:?} {:?} request from {:?}",
|
||||
@@ -858,7 +862,7 @@ impl RendezvousServer {
|
||||
self.relay_servers = self.relay_servers0.clone();
|
||||
}
|
||||
|
||||
fn get_relay_server(&self, pa: IpAddr, pb: IpAddr) -> String {
|
||||
fn get_relay_server(&self, _pa: IpAddr, _pb: IpAddr) -> String {
|
||||
if self.relay_servers.is_empty() {
|
||||
return "".to_owned();
|
||||
} else if self.relay_servers.len() == 1 {
|
||||
@@ -1029,7 +1033,7 @@ impl RendezvousServer {
|
||||
let mut stream = stream;
|
||||
if let Some(Ok(bytes)) = stream.next_timeout(30_000).await {
|
||||
if let Ok(msg_in) = RendezvousMessage::parse_from_bytes(&bytes) {
|
||||
if let Some(rendezvous_message::Union::test_nat_request(_)) = msg_in.union {
|
||||
if let Some(rendezvous_message::Union::TestNatRequest(_)) = msg_in.union {
|
||||
let mut msg_out = RendezvousMessage::new();
|
||||
msg_out.set_test_nat_response(TestNatResponse {
|
||||
port: addr.port() as _,
|
||||
@@ -1042,12 +1046,21 @@ impl RendezvousServer {
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_listener(&self, stream: TcpStream, addr: SocketAddr, key: &str, ws: bool) {
|
||||
async fn handle_listener(
|
||||
&self,
|
||||
stream: TcpStream,
|
||||
addr: SocketAddr,
|
||||
key: &str,
|
||||
ws: bool,
|
||||
) {
|
||||
log::debug!("Tcp connection from {:?}, ws: {}", addr, ws);
|
||||
let mut rs = self.clone();
|
||||
let key = key.to_owned();
|
||||
tokio::spawn(async move {
|
||||
allow_err!(rs.handle_listener_inner(stream, addr, &key, ws).await);
|
||||
allow_err!(
|
||||
rs.handle_listener_inner(stream, addr, &key, ws)
|
||||
.await
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1067,7 +1080,10 @@ impl RendezvousServer {
|
||||
while let Ok(Some(Ok(msg))) = timeout(30_000, b.next()).await {
|
||||
match msg {
|
||||
tungstenite::Message::Binary(bytes) => {
|
||||
if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
|
||||
if !self
|
||||
.handle_tcp(&bytes, &mut sink, addr, key, ws)
|
||||
.await
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -1078,7 +1094,10 @@ impl RendezvousServer {
|
||||
let (a, mut b) = Framed::new(stream, BytesCodec::new()).split();
|
||||
sink = Some(Sink::TcpStream(a));
|
||||
while let Ok(Some(Ok(bytes))) = timeout(30_000, b.next()).await {
|
||||
if !self.handle_tcp(&bytes, &mut sink, addr, key, ws).await {
|
||||
if !self
|
||||
.handle_tcp(&bytes, &mut sink, addr, key, ws)
|
||||
.await
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -1091,13 +1110,13 @@ impl RendezvousServer {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn get_pk(&mut self, version: &str, id: String) -> Vec<u8> {
|
||||
async fn get_pk(&mut self, version: &str, id: String) -> Bytes {
|
||||
if version.is_empty() || self.inner.sk.is_none() {
|
||||
Vec::new()
|
||||
Bytes::new()
|
||||
} else {
|
||||
match self.pm.get(&id).await {
|
||||
Some(peer) => {
|
||||
let pk = peer.read().await.pk.clone();
|
||||
let pk = peer.read().await.pk.clone().into();
|
||||
sign::sign(
|
||||
&hbb_common::message_proto::IdPk {
|
||||
id,
|
||||
@@ -1108,8 +1127,9 @@ impl RendezvousServer {
|
||||
.unwrap_or_default(),
|
||||
&self.inner.sk.as_ref().unwrap(),
|
||||
)
|
||||
.into()
|
||||
}
|
||||
_ => Vec::new(),
|
||||
_ => Bytes::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1213,13 +1233,6 @@ async fn test_hbbs(addr: SocketAddr) -> ResultType<()> {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn distance(a: &(i32, i32), b: &(i32, i32)) -> i32 {
|
||||
let dx = a.0 - b.0;
|
||||
let dy = a.1 - b.1;
|
||||
dx * dx + dy * dy
|
||||
}
|
||||
|
||||
#[inline]
|
||||
async fn send_rk_res(
|
||||
socket: &mut FramedSocket,
|
||||
|
||||
Reference in New Issue
Block a user