This commit is contained in:
open-trade
2020-10-31 14:33:25 +08:00
parent 75dcbe47b7
commit f1646522d1
5 changed files with 686 additions and 655 deletions

View File

@@ -117,15 +117,17 @@ impl PeerMap {
}
const REG_TIMEOUT: i32 = 30_000;
pub const LICENSE_KEY: &'static str = "";
type Sink = SplitSink<Framed<TcpStream, BytesCodec>, Bytes>;
type Sender = mpsc::UnboundedSender<(RendezvousMessage, SocketAddr)>;
static mut ROTATION_RELAY_SERVER: usize = 0;
#[derive(Clone)]
pub struct RendezvousServer {
tcp_punch: Arc<Mutex<HashMap<SocketAddr, Sink>>>,
pm: PeerMap,
tx: Sender,
relay_server: String,
relay_servers: Vec<String>,
serial: i32,
rendezvous_servers: Vec<String>,
version: String,
@@ -136,7 +138,7 @@ impl RendezvousServer {
pub async fn start(
addr: &str,
addr2: &str,
relay_server: String,
relay_servers: Vec<String>,
serial: i32,
rendezvous_servers: Vec<String>,
software_url: String,
@@ -151,7 +153,7 @@ impl RendezvousServer {
tcp_punch: Arc::new(Mutex::new(HashMap::new())),
pm: PeerMap::new()?,
tx: tx.clone(),
relay_server,
relay_servers,
serial,
rendezvous_servers,
version,
@@ -241,7 +243,7 @@ impl RendezvousServer {
let mut res = TestNatResponse {
port: addr.port() as _,
..Default::default()
}
};
if rs.serial > tar.serial {
let mut cu = ConfigUpdate::new();
cu.serial = rs.serial;
@@ -352,7 +354,10 @@ impl RendezvousServer {
self.rendezvous_servers = cu
.rendezvous_servers
.drain(..)
.filter(|x| test_if_valid_server(x).is_ok())
.filter(|x| {
!x.is_empty()
&& test_if_valid_server(x, "rendezvous-server").is_ok()
})
.collect();
log::info!(
"configure updated: serial={} rendezvous-servers={:?}",
@@ -505,6 +510,14 @@ impl RendezvousServer {
addr: SocketAddr,
ph: PunchHoleRequest,
) -> ResultType<(RendezvousMessage, Option<SocketAddr>)> {
if ph.licence_key != LICENSE_KEY {
let mut msg_out = RendezvousMessage::new();
msg_out.set_punch_hole_response(PunchHoleResponse {
failure: punch_hole_response::Failure::LICENCE_MISMATCH.into(),
..Default::default()
});
return Ok((msg_out, None));
}
let id = ph.id;
// punch hole request from A, relay to B,
// check if in same intranet first,
@@ -539,9 +552,13 @@ impl RendezvousServer {
&peer.socket_addr,
&addr
);
let i = unsafe {
ROTATION_RELAY_SERVER += 1;
ROTATION_RELAY_SERVER % self.relay_servers.len()
};
msg_out.set_fetch_local_addr(FetchLocalAddr {
socket_addr,
relay_server: self.relay_server.clone(),
relay_server: self.relay_servers[i].clone(),
..Default::default()
});
} else {
@@ -551,10 +568,14 @@ impl RendezvousServer {
&peer.socket_addr,
&addr
);
let i = unsafe {
ROTATION_RELAY_SERVER += 1;
ROTATION_RELAY_SERVER % self.relay_servers.len()
};
msg_out.set_punch_hole(PunchHole {
socket_addr,
nat_type: ph.nat_type,
relay_server: self.relay_server.clone(),
relay_server: self.relay_servers[i].clone(),
..Default::default()
});
}
@@ -629,10 +650,14 @@ impl RendezvousServer {
}
}
pub fn test_if_valid_server(host: &str) -> ResultType<SocketAddr> {
if host.contains(":") {
pub fn test_if_valid_server(host: &str, name: &str) -> ResultType<SocketAddr> {
let res = if host.contains(":") {
hbb_common::to_socket_addr(host)
} else {
hbb_common::to_socket_addr(&format!("{}:{}", host, 0))
};
if res.is_err() {
log::error!("Invalid {} {}: {:?}", name, host, res);
}
res
}