From 1d2450b9ad3515abd02bc198de1ee8e1b6095912 Mon Sep 17 00:00:00 2001
From: James Munns <james.munns@gmail.com>
Date: Wed, 15 Jun 2016 02:08:31 +0200
Subject: [PATCH] Implementation of separate thread managing outgoing queue

---
 examples/client_and_server.rs |  15 +++-
 examples/server.rs            |  17 ++--
 src/client.rs                 |  35 ++------
 src/lib.rs                    |  19 ++--
 src/packet.rs                 | 112 +++++++++++++++--------
 src/server.rs                 | 163 ++++++++++++++++++++++++----------
 6 files changed, 227 insertions(+), 134 deletions(-)

diff --git a/examples/client_and_server.rs b/examples/client_and_server.rs
index b8550e3..39ba858 100644
--- a/examples/client_and_server.rs
+++ b/examples/client_and_server.rs
@@ -3,18 +3,25 @@ extern crate coap;
 use coap::packet::*;
 use coap::{CoAPServer, CoAPClient};
 
-fn request_handler(req: Packet, resp: CoAPClient) {
+fn request_handler(req: Packet) -> Option<Packet> {
 	let uri_path = req.get_option(OptionType::UriPath).unwrap();
-	resp.reply(&req, uri_path.front().unwrap().clone()).unwrap();
+
+	return match coap::packet::auto_response(req) {
+	Ok(mut response) => {
+		response.set_payload(uri_path.front().unwrap().clone());
+		Some(response)
+	},
+	Err(_) => None
+    };
 }
 
 fn main() {
 	let mut server = CoAPServer::new("127.0.0.1:5683").unwrap();
 	server.handle(request_handler).unwrap();
-		
+
 	let url = "coap://127.0.0.1:5683/Rust";
 	println!("Client request: {}", url);
 
 	let response: Packet = CoAPClient::request(url).unwrap();
 	println!("Server reply: {}", String::from_utf8(response.payload).unwrap());
-}
\ No newline at end of file
+}
diff --git a/examples/server.rs b/examples/server.rs
index b33505b..cafd2a3 100644
--- a/examples/server.rs
+++ b/examples/server.rs
@@ -2,11 +2,16 @@ extern crate coap;
 
 use std::io;
 use coap::packet::*;
-use coap::{CoAPServer, CoAPClient};
+use coap::CoAPServer;
 
-fn request_handler(req: Packet, resp: CoAPClient) {
-	println!("Receive request: {:?}", req);
-	resp.reply(&req, b"OK".to_vec()).unwrap();
+fn request_handler(req: Packet) -> Option<Packet> {
+    return match coap::packet::auto_response(req) {
+        Ok(mut response) => {
+		response.set_payload(b"OK".to_vec());
+		Some(response)
+	},
+        Err(_) => None
+    };
 }
 
 fn main() {
@@ -14,11 +19,11 @@ fn main() {
 
 	let mut server = CoAPServer::new(addr).unwrap();
 	server.handle(request_handler).unwrap();
-		
+
 	println!("Server up on {}", addr);
 	println!("Press any key to stop...");
 
 	io::stdin().read_line(&mut String::new()).unwrap();
 
 	println!("Server shutdown");
-}
\ No newline at end of file
+}
diff --git a/src/client.rs b/src/client.rs
index b2846ba..27407e5 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -82,7 +82,7 @@ impl CoAPClient {
 				try!(client.set_receive_timeout(timeout));
 				match client.receive() {
 				 	Ok(receive_packet) => {
-				 		if receive_packet.header.get_message_id() == message_id 
+				 		if receive_packet.header.get_message_id() == message_id
 				 			&& *receive_packet.get_token() == token {
 				 				return Ok(receive_packet)
 				 			} else {
@@ -101,24 +101,6 @@ impl CoAPClient {
 		Self::request_with_timeout(url, Some(Duration::new(DEFAULT_RECEIVE_TIMEOUT, 0)))
 	}
 
-	/// Response the client with the specifc payload.
-	pub fn reply(&self, request_packet: &Packet, payload: Vec<u8>) -> Result<()> {
-		let mut packet = Packet::new();
-
-		packet.header.set_version(1);
-		let response_type = match request_packet.header.get_type() {
-			PacketType::Confirmable => PacketType::Acknowledgement,
-			PacketType::NonConfirmable => PacketType::NonConfirmable,
-			_ => return Err(Error::new(ErrorKind::InvalidInput, "request type error"))
-		};
-		packet.header.set_type(response_type);
-		packet.header.set_code("2.05");
-		packet.header.set_message_id(request_packet.header.get_message_id());
-		packet.set_token(request_packet.get_token().clone());
-		packet.payload = payload;
-		self.send(&packet)
-	}
-
 	/// Execute a request.
 	pub fn send(&self, packet: &Packet) -> Result<()> {
 		match packet.to_bytes() {
@@ -164,7 +146,7 @@ mod test {
 	use super::*;
 	use std::time::Duration;
 	use std::io::ErrorKind;
-	use packet::{Packet, PacketType};
+	use packet::Packet;
 	use server::CoAPServer;
 
 	#[test]
@@ -174,15 +156,8 @@ mod test {
 		assert!(CoAPClient::request("127.0.0.1").is_err());
 	}
 
-	#[test]
-	fn test_reply_error() {
-		let client = CoAPClient::new("127.0.0.1:5683").unwrap();
-		let mut packet = Packet::new();
-		packet.header.set_type(PacketType::Acknowledgement);
-		assert!(client.reply(&packet, b"Test".to_vec()).is_err());
-	}
-
-	fn request_handler(_: Packet, _: CoAPClient) {
+	fn request_handler(_: Packet) -> Option<Packet> {
+		None
 	}
 
 	#[test]
@@ -197,4 +172,4 @@ mod test {
 			assert_eq!(error.kind(), ErrorKind::WouldBlock);
 		}
 	}
-}
\ No newline at end of file
+}
diff --git a/src/lib.rs b/src/lib.rs
index 14fc6b9..5f4c687 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -3,16 +3,16 @@
 //! This library provides both a client interface (`CoAPClient`) and a server interface (`CoAPServer`).
 //!
 //! [spec]: https://tools.ietf.org/html/rfc7252
-//! 
+//!
 //! # Installation
-//! 
+//!
 //! First add this to your `Cargo.toml`:
-//! 
+//!
 //! ```toml
 //! [dependencies]
 //! coap = "0.3"
 //! ```
-//! 
+//!
 //! Then, add this to your crate root:
 //!
 //! ```
@@ -29,21 +29,22 @@
 //! use coap::packet::*;
 //! use coap::{CoAPServer, CoAPClient};
 
-//! fn request_handler(req: Packet, _resp: CoAPClient) {
+//! fn request_handler(req: Packet) -> Option<Packet> {
 //! 	println!("Receive request: {:?}", req);
+//!     None
 //! }
 
 //! fn main() {
 //! 	let addr = "127.0.0.1:5683";
-//! 
+//!
 //! 	let mut server = CoAPServer::new(addr).unwrap();
 //! 	server.handle(request_handler).unwrap();
-//! 		
+//!
 //! 	println!("Server up on {}", addr);
 //!     println!("Press any key to stop...");
 //!
 //! 	io::stdin().read_line(&mut String::new()).unwrap();
-//! 
+//!
 //! 	println!("Server shutdown");
 //! }
 //! ```
@@ -78,4 +79,4 @@ pub use client::CoAPClient;
 
 pub mod packet;
 pub mod client;
-pub mod server;
\ No newline at end of file
+pub mod server;
diff --git a/src/packet.rs b/src/packet.rs
index c2a8b09..3fe80a2 100644
--- a/src/packet.rs
+++ b/src/packet.rs
@@ -1,6 +1,7 @@
 use bincode;
 use std::collections::BTreeMap;
 use std::collections::LinkedList;
+use std;
 
 macro_rules! u8_to_unsigned_be {
     ($src:ident, $start:expr, $end:expr, $t:ty) => ({
@@ -173,6 +174,10 @@ impl Packet {
 		self.options.insert(num, value);
 	}
 
+	pub fn set_payload(&mut self, payload: Vec<u8>){
+		self.payload = payload;
+	}
+
 	pub fn add_option(&mut self, tp: OptionType, value: Vec<u8>) {
 		let num = Self::get_option_number(tp);
 		match self.options.get_mut(&num) {
@@ -188,9 +193,12 @@ impl Packet {
 		self.options.insert(num, list);
 	}
 
-	pub fn get_option(&self, tp: OptionType) -> Option<&LinkedList<Vec<u8>>> {
+	pub fn get_option(&self, tp: OptionType) -> Option<LinkedList<Vec<u8>>> {
 		let num = Self::get_option_number(tp);
-		return self.options.get(&num);
+		match self.options.get(&num) {
+			Some(options) => Some(options.clone()),
+			None => None
+		}
 	}
 
 	/// Decodes a byte slice and construct the equivalent Packet.
@@ -226,40 +234,52 @@ impl Packet {
 
 					idx += 1;
 
-					if delta == 13 {
-						if idx >= buf.len() {
-							return Err(ParseError::InvalidOptionLength);
+					// Check for special delta characters
+					match delta {
+						13 => {
+							if idx >= buf.len() {
+								return Err(ParseError::InvalidOptionLength);
+							}
+							delta = buf[idx] as usize + 13;
+							idx += 1;
+						},
+						14 => {
+							if idx + 1 >= buf.len() {
+								return Err(ParseError::InvalidOptionLength);
+							}
+
+							delta = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize;
+							idx += 2;
+						},
+						15 => {
+							return Err(ParseError::InvalidOptionDelta);
 						}
-						delta = buf[idx] as usize + 13;
-						idx += 1;
-					} else if delta == 14 {
-						if idx + 1 >= buf.len() {
+						_ => {}
+					};
+
+					// Check for special length characters
+					match length {
+						13 => {
+							if idx >= buf.len() {
+								return Err(ParseError::InvalidOptionLength);
+							}
+
+							length = buf[idx] as usize + 13;
+							idx += 1;
+						},
+						14 => {
+							if idx + 1 >= buf.len() {
+								return Err(ParseError::InvalidOptionLength);
+							}
+
+							length = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize;
+							idx += 2;
+						},
+						15 => {
 							return Err(ParseError::InvalidOptionLength);
-						}
-
-						delta = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize;
-						idx += 2;
-					} else if delta == 15 {
-						return Err(ParseError::InvalidOptionDelta);
-					}
-
-					if length == 13 {
-						if idx >= buf.len() {
-							return Err(ParseError::InvalidOptionLength);
-						}
-
-						length = buf[idx] as usize + 13;
-						idx += 1;
-					} else if length == 14 {
-						if idx + 1 >= buf.len() {
-							return Err(ParseError::InvalidOptionLength);
-						}
-
-						length = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize;
-						idx += 2;
-					} else if length == 15 {
-						return Err(ParseError::InvalidOptionLength);
-					}
+						},
+						_ => {}
+					};
 
 					options_number += delta;
 
@@ -416,6 +436,26 @@ impl Packet {
 	}
 }
 
+/// Convert a request to a response
+pub fn auto_response(request_packet: Packet) -> std::io::Result<Packet> {
+		let mut packet = Packet::new();
+
+		packet.header.set_version(1);
+		let response_type = match request_packet.header.get_type() {
+			PacketType::Confirmable => PacketType::Acknowledgement,
+			PacketType::NonConfirmable => PacketType::NonConfirmable,
+			_ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "request type error"))
+		};
+		packet.header.set_type(response_type);
+		packet.header.set_code("2.05");
+		packet.header.set_message_id(request_packet.header.get_message_id());
+		packet.set_token(request_packet.get_token().clone());
+
+		packet.payload = request_packet.payload;
+
+		Ok(packet)
+	}
+
 #[cfg(test)]
 mod test {
 	use super::*;
@@ -441,14 +481,14 @@ mod test {
 		let mut expected_uri_path = LinkedList::new();
 		expected_uri_path.push_back("Hi".as_bytes().to_vec());
 		expected_uri_path.push_back("Test".as_bytes().to_vec());
-		assert_eq!(*uri_path, expected_uri_path);
+		assert_eq!(uri_path, expected_uri_path);
 
 		let uri_query = packet.get_option(OptionType::UriQuery);
 		assert!(uri_query.is_some());
 		let uri_query = uri_query.unwrap();
 		let mut expected_uri_query = LinkedList::new();
 		expected_uri_query.push_back("a=1".as_bytes().to_vec());
-		assert_eq!(*uri_query, expected_uri_query);
+		assert_eq!(uri_query, expected_uri_query);
 	}
 
 	#[test]
diff --git a/src/server.rs b/src/server.rs
index 54514a8..79924db 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,14 +1,15 @@
 use std;
 use std::thread;
-use std::net::ToSocketAddrs;
+use std::net::{ToSocketAddrs, SocketAddr};
 use std::sync::mpsc;
 use mio::*;
 use mio::udp::UdpSocket;
 use packet::Packet;
-use client::CoAPClient;
 use threadpool::ThreadPool;
 
 const DEFAULT_WORKER_NUM: usize = 4;
+pub type TxQueue = mpsc::Sender<CoAPResponse>;
+pub type RxQueue = mpsc::Receiver<CoAPResponse>;
 
 #[derive(Debug)]
 pub enum CoAPServerError {
@@ -17,27 +18,35 @@ pub enum CoAPServerError {
 	AnotherHandlerIsRunning,
 }
 
+#[derive(Debug)]
+pub struct CoAPResponse {
+	pub address: SocketAddr,
+	pub response: Packet
+}
+
 pub trait CoAPHandler: Sync + Send + Copy {
-	fn handle(&self, Packet, CoAPClient);
+	fn handle(&self, Packet) -> Option<Packet>;
 }
 
-impl<F> CoAPHandler for F where F: Fn(Packet, CoAPClient), F: Sync + Send + Copy {
-	fn handle(&self, request: Packet, response: CoAPClient) {
-		self(request, response);
+impl<F> CoAPHandler for F where F: Fn(Packet) -> Option<Packet>, F: Sync + Send + Copy {
+	fn handle(&self, request: Packet) -> Option<Packet> {
+		return self(request);
 	}
 }
 
 struct UdpHandler<H: CoAPHandler + 'static> {
 	socket: UdpSocket,
 	thread_pool: ThreadPool,
+	tx_sender: TxQueue,
 	coap_handler: H
 }
 
 impl<H: CoAPHandler + 'static> UdpHandler<H> {
-	fn new(socket: UdpSocket, thread_pool: ThreadPool, coap_handler: H) -> UdpHandler<H> {
+	fn new(socket: UdpSocket, thread_pool: ThreadPool, tx_sender: TxQueue, coap_handler: H) -> UdpHandler<H> {
 		UdpHandler {
 			socket: socket,
 			thread_pool: thread_pool,
+			tx_sender: tx_sender,
 			coap_handler: coap_handler
 		}
 	}
@@ -48,23 +57,33 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
 	type Message = ();
 
 	fn ready(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) {
-        if events.is_readable() {
-        	let coap_handler = self.coap_handler;
-        	let mut buf = [0; 1500];
+		if events.is_readable() {
+			let coap_handler = self.coap_handler;
+			let mut buf = [0; 1500];
 
 			match self.socket.recv_from(&mut buf) {
 				Ok(Some((nread, src))) => {
+					let response_q = self.tx_sender.clone();
 					self.thread_pool.execute(move || {
 						match Packet::from_bytes(&buf[..nread]) {
 							Ok(packet) => {
-								let client = CoAPClient::new(src).unwrap();
-								coap_handler.handle(packet, client);
+								// Dispatch user handler, if there is a response packet
+								//   send the reply via the TX thread
+								match coap_handler.handle(packet) {
+									Some(response) => {
+										response_q.send(CoAPResponse{
+											address: src,
+											response: response
+										}).unwrap();
+									},
+									None => {}
+								};
 							},
 							Err(_) => return
 						};
 					});
 				},
-				_ => panic!("unexpected error"),
+				_ => {}, //panic!("unexpected error"),
 			}
 		}
 	}
@@ -78,6 +97,7 @@ pub struct CoAPServer {
     socket: UdpSocket,
     event_sender: Option<Sender<()>>,
     event_thread: Option<thread::JoinHandle<()>>,
+    tx_thread: Option<thread::JoinHandle<()>>,
     worker_num: usize,
 }
 
@@ -91,6 +111,7 @@ impl CoAPServer {
 						socket: s,
 						event_sender: None,
 						event_thread: None,
+						tx_thread: None,
 						worker_num: DEFAULT_WORKER_NUM,
 					}))
 				},
@@ -99,38 +120,55 @@ impl CoAPServer {
 		})
 	}
 
-	/// Starts handling requests with the handler.
+	/// Starts handling requests with the handler
 	pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> {
-		match self.event_sender {
-			None => {
-				let worker_num = self.worker_num;
-				let (tx, rx) = mpsc::channel();
-				let socket = self.socket.try_clone();
-				match socket {
-					Ok(socket) => {
-						let thread = thread::spawn(move || {
-							let thread_pool = ThreadPool::new(worker_num);
-							let mut event_loop = EventLoop::new().unwrap();
-							event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
-
-							tx.send(event_loop.channel()).unwrap();
-
-							event_loop.run(&mut UdpHandler::new(socket, thread_pool, handler)).unwrap();
-						});
-
-						match rx.recv() {
-							Ok(event_sender) => {
-								self.event_sender = Some(event_sender);
-								self.event_thread = Some(thread);
-								Ok(())
-							},
-							Err(_) => Err(CoAPServerError::EventLoopError)
-						}
-					},
-					Err(_) => Err(CoAPServerError::NetworkError),
-				}
+		let socket;
+
+		// Early return error checking
+		if let Some(_) = self.event_sender {
+			return Err(CoAPServerError::AnotherHandlerIsRunning);
+		}
+		match self.socket.try_clone() {
+			Ok(good_socket) => {
+				socket = good_socket
+			},
+			Err(_) => {
+				return Err(CoAPServerError::NetworkError);
+			},
+		}
+
+		// Create resources
+		let worker_num = self.worker_num;
+		let (tx, rx) = mpsc::channel();
+		let (tx_send, tx_recv) : (TxQueue, RxQueue) = mpsc::channel();
+		let tx_only = self.socket.try_clone().unwrap();
+
+		// Setup and spawn single TX thread
+		let tx_thread = thread::spawn(move || {
+			transmit_handler(tx_recv, tx_only);
+		});
+
+		// Setup and spawn event loop thread, which will spawn
+		//   children threads which handle incomining requests
+		let thread = thread::spawn(move || {
+			let thread_pool = ThreadPool::new(worker_num);
+			let mut event_loop = EventLoop::new().unwrap();
+			event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
+
+			tx.send(event_loop.channel()).unwrap();
+
+			event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, handler)).unwrap();
+		});
+
+		// Ensure threads started successfully
+		match rx.recv() {
+			Ok(event_sender) => {
+				self.event_sender = Some(event_sender);
+				self.event_thread = Some(thread);
+				self.tx_thread = Some(tx_thread);
+				Ok(())
 			},
-			Some(_) => Err(CoAPServerError::AnotherHandlerIsRunning),
+			Err(_) => Err(CoAPServerError::EventLoopError)
 		}
 	}
 
@@ -152,6 +190,28 @@ impl CoAPServer {
 	}
 }
 
+fn transmit_handler(tx_recv: RxQueue, tx_only: UdpSocket) {
+	// Note! We should only transmit with this UDP Socket
+	// TODO: Add better support for failure detection or logging
+	loop {
+		match tx_recv.recv() {
+			Ok(q_res) => {
+				match q_res.response.to_bytes() {
+					Ok(bytes) => {
+						let _ = tx_only.send_to(&bytes[..], &q_res.address);
+					},
+					Err(_) => {}
+				}
+			},
+			// recv error occurs when all transmitters are terminited
+			//   (when all UDP Handlers are closed)
+			Err(_) => {
+				break;
+			}
+		}
+	}
+}
+
 impl Drop for CoAPServer {
     fn drop(&mut self) {
         self.stop();
@@ -162,15 +222,20 @@ impl Drop for CoAPServer {
 #[cfg(test)]
 mod test {
 	use super::*;
-	use packet::{Packet, PacketType, OptionType};
+	use packet::{Packet, PacketType, OptionType, auto_response};
 	use client::CoAPClient;
 
-	fn request_handler(req: Packet, resp: CoAPClient) {
-		let uri_path = req.get_option(OptionType::UriPath);
-		assert!(uri_path.is_some());
-		let uri_path = uri_path.unwrap();
+	fn request_handler(req: Packet) -> Option<Packet> {
+		let uri_path_list = req.get_option(OptionType::UriPath).unwrap();
+		assert!(uri_path_list.len() == 1);
 
-		resp.reply(&req, uri_path.front().unwrap().clone()).unwrap();
+		return match auto_response(req) {
+			Ok(mut response) => {
+				response.set_payload(uri_path_list.front().unwrap().clone());
+				Some(response)
+			},
+			Err(_) => None
+		};
 	}
 
 	#[test]
-- 
GitLab