From 1d19007a466062de164c05a83c0705b96c864473 Mon Sep 17 00:00:00 2001 From: open-trade Date: Thu, 17 Sep 2020 10:02:20 +0800 Subject: [PATCH] prototyping forward request --- libs/hbb_common | 2 +- src/hbbf/main.rs | 5 +++-- src/main.rs | 4 ++-- src/rendezvous_server.rs | 34 +++++++++++++++++++++++++++++++--- 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/libs/hbb_common b/libs/hbb_common index 8209a1f..6ccb0d9 160000 --- a/libs/hbb_common +++ b/libs/hbb_common @@ -1 +1 @@ -Subproject commit 8209a1f2d1d11b7019e224fdc161f138415f6d53 +Subproject commit 6ccb0d96e7bb37ea43df5c53e07983d6c07804e5 diff --git a/src/hbbf/main.rs b/src/hbbf/main.rs index c351f73..7759a1a 100644 --- a/src/hbbf/main.rs +++ b/src/hbbf/main.rs @@ -1,5 +1,6 @@ use hbb_common::{ - env_logger, log, + env_logger::*, + log, protobuf::Message as _, rendezvous_proto::*, sleep, @@ -18,7 +19,7 @@ lazy_static::lazy_static! { #[tokio::main] async fn main() -> ResultType<()> { - env_logger::init(); + init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info")); let addr = "0.0.0.0:21117"; log::info!("Listening on {}", addr); let mut listener = new_listener(addr, true).await?; diff --git a/src/main.rs b/src/main.rs index 14c443d..5576439 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,12 @@ // https://tools.ietf.org/rfc/rfc5128.txt // https://blog.csdn.net/bytxl/article/details/44344855 -use hbb_common::{env_logger, log, tokio, ResultType}; +use hbb_common::{env_logger::*, log, tokio, ResultType}; use hbbs::*; #[tokio::main] async fn main() -> ResultType<()> { - env_logger::init(); + init_from_env(Env::default().filter_or(DEFAULT_FILTER_ENV, "info")); let addr = "0.0.0.0:21116"; log::info!("Listening on {}", addr); RendezvousServer::start(&addr).await?; diff --git a/src/rendezvous_server.rs b/src/rendezvous_server.rs index b134992..1bf94ad 100644 --- a/src/rendezvous_server.rs +++ b/src/rendezvous_server.rs @@ -157,8 +157,27 @@ impl RendezvousServer { Some(rendezvous_message::Union::punch_hole_request(ph)) => { allow_err!(rs.handle_tcp_punch_hole_request(addr, ph.id).await); } + Some(rendezvous_message::Union::request_forward(mut rf)) => { + if !rs.pm.is_in_memory(&rf.id) { + break; + } + let mut msg_out = RendezvousMessage::new(); + rf.socket_addr = AddrMangle::encode(addr); + msg_out.set_request_forward(rf); + rs.tx.send((msg_out, addr)).ok(); + } + Some(rendezvous_message::Union::request_forward_response(rfr)) => { + let addr_b = AddrMangle::decode(&rfr.socket_addr); + let sender_b= rs.tcp_punch.lock().unwrap().remove(&addr_b); + if let Some(mut sender_b) = sender_b { + if let Ok(bytes) = rfr.write_to_bytes() { + allow_err!(sender_b.send(Bytes::from(bytes)).await); + } + } + break; + } Some(rendezvous_message::Union::punch_hole_sent(phs)) => { - allow_err!(rs.handle_hole_sent(&phs, addr, None).await); + allow_err!(rs.handle_hole_sent(phs, addr, None).await); break; } Some(rendezvous_message::Union::local_addr(la)) => { @@ -233,8 +252,15 @@ impl RendezvousServer { }); } } + Some(rendezvous_message::Union::request_forward(rf)) => { + if self.pm.is_in_memory(&rf.id) { + let mut msg_out = RendezvousMessage::new(); + msg_out.set_request_forward(rf); + socket.send(&msg_out, addr).await? + } + } Some(rendezvous_message::Union::punch_hole_sent(phs)) => { - self.handle_hole_sent(&phs, addr, Some(socket)).await?; + self.handle_hole_sent(phs, addr, Some(socket)).await?; } Some(rendezvous_message::Union::local_addr(la)) => { self.handle_local_addr(&la, addr, Some(socket)).await?; @@ -304,7 +330,7 @@ impl RendezvousServer { #[inline] async fn handle_hole_sent<'a>( &mut self, - phs: &PunchHoleSent, + phs: PunchHoleSent, addr: SocketAddr, socket: Option<&'a mut FramedSocket>, ) -> ResultType<()> { @@ -321,9 +347,11 @@ impl RendezvousServer { Some(peer) => peer.pk, _ => Vec::new(), }; + let forward_server = phs.forward_server; msg_out.set_punch_hole_response(PunchHoleResponse { socket_addr: AddrMangle::encode(addr), pk, + forward_server, ..Default::default() }); if let Some(socket) = socket {