diff --git a/Cargo.toml b/Cargo.toml index a4ddda15eaa2ede027e56f1dd99ca24715ecebac..914fa7ee1c2d5705f5a7b5c8e2d6ed73ec7141c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,6 @@ keywords = ["CoAP"] bincode = "0.3.0" rustc-serialize = "0.3" mio = "0.5" -threadpool = "0.1" url = "0.2.36" num = "0.1" rand = "0.3" diff --git a/README.md b/README.md index 92c24343f91abf68348927276c89c49c237c0a93..53477d4b440133a043f4bab79e6ade95bd25ef84 100644 --- a/README.md +++ b/README.md @@ -72,10 +72,7 @@ fn main() { ``` ## Benchmark -### Using one thread - - -### Using eight thread - - -Tests were performed using [basho_bench](https://github.com/basho/basho_bench). +```bash +$ cargo run --example server +$ cargo bench +``` diff --git a/benches/client.rs b/benches/client.rs index 6ba830ba2cebde2709ce78daf55411aaa3ef5819..351ce8a0656def9c7e7203bebce53ec80180960d 100644 --- a/benches/client.rs +++ b/benches/client.rs @@ -4,23 +4,24 @@ extern crate test; extern crate coap; use test::Bencher; -use coap::packet::*; -use coap::CoAPClient; +use coap::{CoAPClient, CoAPRequest, IsMessage, MessageType, CoAPOption}; #[bench] fn bench_client_request(b: &mut Bencher) { let addr = "127.0.0.1:5683"; - let request = "test"; - let mut packet = Packet::new(); - packet.header.set_version(1); - packet.header.set_type(MessageType::Confirmable); - packet.header.set_code("0.01"); - packet.header.set_message_id(1); - packet.set_token(vec!(0x51, 0x55, 0x77, 0xE8)); - packet.add_option(CoAPOption::UriPath, request.to_string().into_bytes()); + let endpoint = "test"; + + let mut request = CoAPRequest::new(); + request.set_version(1); + request.set_type(MessageType::Confirmable); + request.set_code("0.01"); + request.set_message_id(1); + request.set_token(vec!(0x51, 0x55, 0x77, 0xE8)); + request.add_option(CoAPOption::UriPath, endpoint.to_string().into_bytes()); b.iter(|| { let client = CoAPClient::new(addr).unwrap(); - client.send(&packet).unwrap(); + client.send(&request).unwrap(); + client.receive().unwrap(); }); } \ No newline at end of file diff --git a/benches/eight_thread_summary.png b/benches/eight_thread_summary.png deleted file mode 100644 index 2ebdcf387c37af8aff57f1e8ce562c7524ea506a..0000000000000000000000000000000000000000 Binary files a/benches/eight_thread_summary.png and /dev/null differ diff --git a/benches/one_thread_summary.png b/benches/one_thread_summary.png deleted file mode 100644 index cae59dd5c88514ee114dbf76d5c0788c82f8a0e9..0000000000000000000000000000000000000000 Binary files a/benches/one_thread_summary.png and /dev/null differ diff --git a/src/lib.rs b/src/lib.rs index b3e8969851a2fad6eb4341e3ab3be9acd7744d6b..69b30ea14527c911b97308ee4ab7a7bbad9c7856 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -70,7 +70,6 @@ extern crate bincode; extern crate rustc_serialize; extern crate mio; -extern crate threadpool; extern crate url; extern crate num; extern crate rand; diff --git a/src/server.rs b/src/server.rs index 6d7dba024b0d8c4841035aaa487718824c205ffd..377a775ca7da1cd42fea8dba9e99679bca4e2514 100644 --- a/src/server.rs +++ b/src/server.rs @@ -8,9 +8,7 @@ use mio::udp::UdpSocket; use message::packet::Packet; use message::request::CoAPRequest; use message::response::CoAPResponse; -use threadpool::ThreadPool; -const DEFAULT_WORKER_NUM: usize = 4; type TxQueue = mpsc::Sender<QueuedResponse>; type RxQueue = mpsc::Receiver<QueuedResponse>; @@ -24,7 +22,7 @@ pub enum CoAPServerError { #[derive(Debug)] struct QueuedResponse { pub address: SocketAddr, - pub response: Option<CoAPResponse>, + pub response: CoAPResponse, } pub trait CoAPHandler: Sync + Send + Copy { @@ -42,7 +40,6 @@ impl<F> CoAPHandler for F struct UdpHandler<H: CoAPHandler + 'static> { socket: UdpSocket, - thread_pool: ThreadPool, tx_sender: TxQueue, rx_recv: RxQueue, coap_handler: H, @@ -50,91 +47,126 @@ struct UdpHandler<H: CoAPHandler + 'static> { impl<H: CoAPHandler + 'static> UdpHandler<H> { fn new(socket: UdpSocket, - thread_pool: ThreadPool, tx_sender: TxQueue, rx_recv: RxQueue, coap_handler: H) -> UdpHandler<H> { UdpHandler { socket: socket, - thread_pool: thread_pool, tx_sender: tx_sender, rx_recv: rx_recv, coap_handler: coap_handler, } } -} - -impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { - type Timeout = usize; - type Message = (); - fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { - // handle the response - if events.is_writable() { - response_handler(&self.rx_recv, &self.socket); - event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); - return; + fn request_handler(&self) -> Option<QueuedResponse> { + match self.requset_recv() { + Some(rqst) => { + let src = rqst.source.unwrap(); + match self.coap_handler.handle(rqst) { + Some(response) => { + debug!("Response: {:?}", response); + return Some(QueuedResponse { + address: src, + response: response, + }); + } + None => { + debug!("No response"); + return None; + } + } + } + None => { + return None; + } } + } - if !events.is_readable() { - warn!("Unreadable Event, {:?}", events); - return; + fn response_handler(&self) { + loop { + match self.rx_recv.try_recv() { + Ok(q_res) => { + self.response_send(q_res); + } + Err(mpsc::TryRecvError::Empty) => { + break; + } + Err(mpsc::TryRecvError::Disconnected) => { + error!("The RxQueue become disconnected"); + break; + } + } } + } - // handle the request - let coap_handler = self.coap_handler; + fn requset_recv(&self) -> Option<CoAPRequest> { let mut buf = [0; 1500]; match self.socket.recv_from(&mut buf) { Ok(Some((nread, src))) => { debug!("Handling request from {}", src); - let response_q = self.tx_sender.clone(); - - event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap(); - - self.thread_pool.execute(move || { - match Packet::from_bytes(&buf[..nread]) { - Ok(packet) => { - // Dispatch user handler, if there is a response packet - // send the reply via the writable event - let rqst = CoAPRequest::from_packet(packet, &src); - match coap_handler.handle(rqst) { - Some(response) => { - debug!("Response: {:?}", response); - response_q.send(QueuedResponse { - address: src, - response: Some(response), - }) - .unwrap(); - } - None => { - response_q.send(QueuedResponse { - address: src, - response: None, - }) - .unwrap(); - } - }; - } - Err(_) => { - error!("Failed to parse request"); - response_q.send(QueuedResponse { - address: src, - response: None, - }) - .unwrap(); - return; - } - }; - }); + + match Packet::from_bytes(&buf[..nread]) { + Ok(packet) => { + return Some(CoAPRequest::from_packet(packet, &src)); + } + Err(_) => { + error!("Failed to parse request"); + return None; + } + } } _ => { error!("Failed to read from socket"); panic!("unexpected error"); } } + } + fn response_send(&self, q_res: QueuedResponse) { + match q_res.response.message.to_bytes() { + Ok(bytes) => { + match self.socket.send_to(&bytes[..], &q_res.address) { + Ok(None) => { + // Look at https://github.com/carllerche/mio/issues/411 in detail + error!("Failed to complete the response"); + } + Ok(_) => {} + Err(_) => { + error!("Failed to send response"); + } + } + } + Err(_) => { + error!("Failed to decode response"); + } + } + } +} + +impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { + type Timeout = usize; + type Message = (); + + fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { + if events.is_writable() { + // handle the response + self.response_handler(); + event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); + return; + } else if events.is_readable() { + // handle the request + match self.request_handler() { + Some(response) => { + self.tx_sender.send(response).unwrap(); + event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap(); + } + None => {} + } + } else { + warn!("Unreadable Event, {:?}", events); + } } fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) { @@ -147,7 +179,6 @@ pub struct CoAPServer { socket: UdpSocket, event_sender: Option<Sender<()>>, event_thread: Option<thread::JoinHandle<()>>, - worker_num: usize, } impl CoAPServer { @@ -161,7 +192,6 @@ impl CoAPServer { socket: s, event_sender: None, event_thread: None, - worker_num: DEFAULT_WORKER_NUM, }) }) } @@ -188,20 +218,18 @@ impl CoAPServer { } // Create resources - let worker_num = self.worker_num; let (tx, rx) = mpsc::channel(); let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel(); // Setup and spawn event loop thread, which will spawn // children threads which handle incomining requests let thread = thread::spawn(move || { - let thread_pool = ThreadPool::new(worker_num); let mut event_loop = EventLoop::new().unwrap(); event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); tx.send(event_loop.channel()).unwrap(); - event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, tx_recv, handler)).unwrap(); + event_loop.run(&mut UdpHandler::new(socket, tx_send, tx_recv, handler)).unwrap(); }); // Ensure threads started successfully @@ -226,47 +254,8 @@ impl CoAPServer { _ => {} } } - - /// Set the number of threads for handling requests - pub fn set_worker_num(&mut self, worker_num: usize) { - self.worker_num = worker_num; - } } -fn response_handler(tx_recv: &RxQueue, tx_only: &UdpSocket) { - // TODO: Add better support for failure detection or logging - match tx_recv.recv() { - Ok(q_res) => { - match q_res.response { - Some(resp) => { - match resp.message.to_bytes() { - Ok(bytes) => { - match tx_only.send_to(&bytes[..], &q_res.address) { - Ok(None) => { - // Look at https://github.com/carllerche/mio/issues/411 in detail - error!("Failed to complete the response"); - } - Ok(_) => {} - Err(_) => { - error!("Failed to send response"); - } - } - } - Err(_) => { - error!("Failed to decode response"); - } - } - } - None => { - debug!("No response"); - } - } - } - Err(_) => { - error!("Failed to get response"); - } - } -} impl Drop for CoAPServer { fn drop(&mut self) {