Skip to content
Snippets Groups Projects
Commit 29442194 authored by Covertness's avatar Covertness
Browse files

remove thread_pool

parent 84a3faf4
No related branches found
No related tags found
No related merge requests found
...@@ -13,7 +13,6 @@ keywords = ["CoAP"] ...@@ -13,7 +13,6 @@ keywords = ["CoAP"]
bincode = "0.3.0" bincode = "0.3.0"
rustc-serialize = "0.3" rustc-serialize = "0.3"
mio = "0.5" mio = "0.5"
threadpool = "0.1"
url = "0.2.36" url = "0.2.36"
num = "0.1" num = "0.1"
rand = "0.3" rand = "0.3"
......
...@@ -72,10 +72,7 @@ fn main() { ...@@ -72,10 +72,7 @@ fn main() {
``` ```
## Benchmark ## Benchmark
### Using one thread ```bash
![image](benches/one_thread_summary.png) $ cargo run --example server
$ cargo bench
### Using eight thread ```
![image](benches/eight_thread_summary.png)
Tests were performed using [basho_bench](https://github.com/basho/basho_bench).
...@@ -4,23 +4,24 @@ extern crate test; ...@@ -4,23 +4,24 @@ extern crate test;
extern crate coap; extern crate coap;
use test::Bencher; use test::Bencher;
use coap::packet::*; use coap::{CoAPClient, CoAPRequest, IsMessage, MessageType, CoAPOption};
use coap::CoAPClient;
#[bench] #[bench]
fn bench_client_request(b: &mut Bencher) { fn bench_client_request(b: &mut Bencher) {
let addr = "127.0.0.1:5683"; let addr = "127.0.0.1:5683";
let request = "test"; let endpoint = "test";
let mut packet = Packet::new();
packet.header.set_version(1); let mut request = CoAPRequest::new();
packet.header.set_type(MessageType::Confirmable); request.set_version(1);
packet.header.set_code("0.01"); request.set_type(MessageType::Confirmable);
packet.header.set_message_id(1); request.set_code("0.01");
packet.set_token(vec!(0x51, 0x55, 0x77, 0xE8)); request.set_message_id(1);
packet.add_option(CoAPOption::UriPath, request.to_string().into_bytes()); request.set_token(vec!(0x51, 0x55, 0x77, 0xE8));
request.add_option(CoAPOption::UriPath, endpoint.to_string().into_bytes());
b.iter(|| { b.iter(|| {
let client = CoAPClient::new(addr).unwrap(); let client = CoAPClient::new(addr).unwrap();
client.send(&packet).unwrap(); client.send(&request).unwrap();
client.receive().unwrap();
}); });
} }
\ No newline at end of file
benches/eight_thread_summary.png

105 KiB

benches/one_thread_summary.png

94.2 KiB

...@@ -70,7 +70,6 @@ ...@@ -70,7 +70,6 @@
extern crate bincode; extern crate bincode;
extern crate rustc_serialize; extern crate rustc_serialize;
extern crate mio; extern crate mio;
extern crate threadpool;
extern crate url; extern crate url;
extern crate num; extern crate num;
extern crate rand; extern crate rand;
......
...@@ -8,9 +8,7 @@ use mio::udp::UdpSocket; ...@@ -8,9 +8,7 @@ use mio::udp::UdpSocket;
use message::packet::Packet; use message::packet::Packet;
use message::request::CoAPRequest; use message::request::CoAPRequest;
use message::response::CoAPResponse; use message::response::CoAPResponse;
use threadpool::ThreadPool;
const DEFAULT_WORKER_NUM: usize = 4;
type TxQueue = mpsc::Sender<QueuedResponse>; type TxQueue = mpsc::Sender<QueuedResponse>;
type RxQueue = mpsc::Receiver<QueuedResponse>; type RxQueue = mpsc::Receiver<QueuedResponse>;
...@@ -24,7 +22,7 @@ pub enum CoAPServerError { ...@@ -24,7 +22,7 @@ pub enum CoAPServerError {
#[derive(Debug)] #[derive(Debug)]
struct QueuedResponse { struct QueuedResponse {
pub address: SocketAddr, pub address: SocketAddr,
pub response: Option<CoAPResponse>, pub response: CoAPResponse,
} }
pub trait CoAPHandler: Sync + Send + Copy { pub trait CoAPHandler: Sync + Send + Copy {
...@@ -42,7 +40,6 @@ impl<F> CoAPHandler for F ...@@ -42,7 +40,6 @@ impl<F> CoAPHandler for F
struct UdpHandler<H: CoAPHandler + 'static> { struct UdpHandler<H: CoAPHandler + 'static> {
socket: UdpSocket, socket: UdpSocket,
thread_pool: ThreadPool,
tx_sender: TxQueue, tx_sender: TxQueue,
rx_recv: RxQueue, rx_recv: RxQueue,
coap_handler: H, coap_handler: H,
...@@ -50,91 +47,126 @@ struct UdpHandler<H: CoAPHandler + 'static> { ...@@ -50,91 +47,126 @@ struct UdpHandler<H: CoAPHandler + 'static> {
impl<H: CoAPHandler + 'static> UdpHandler<H> { impl<H: CoAPHandler + 'static> UdpHandler<H> {
fn new(socket: UdpSocket, fn new(socket: UdpSocket,
thread_pool: ThreadPool,
tx_sender: TxQueue, tx_sender: TxQueue,
rx_recv: RxQueue, rx_recv: RxQueue,
coap_handler: H) coap_handler: H)
-> UdpHandler<H> { -> UdpHandler<H> {
UdpHandler { UdpHandler {
socket: socket, socket: socket,
thread_pool: thread_pool,
tx_sender: tx_sender, tx_sender: tx_sender,
rx_recv: rx_recv, rx_recv: rx_recv,
coap_handler: coap_handler, coap_handler: coap_handler,
} }
} }
}
impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { fn request_handler(&self) -> Option<QueuedResponse> {
type Timeout = usize; match self.requset_recv() {
type Message = (); Some(rqst) => {
let src = rqst.source.unwrap();
fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { match self.coap_handler.handle(rqst) {
// handle the response Some(response) => {
if events.is_writable() { debug!("Response: {:?}", response);
response_handler(&self.rx_recv, &self.socket); return Some(QueuedResponse {
event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); address: src,
return; response: response,
});
}
None => {
debug!("No response");
return None;
}
}
}
None => {
return None;
}
}
} }
if !events.is_readable() { fn response_handler(&self) {
warn!("Unreadable Event, {:?}", events); loop {
return; match self.rx_recv.try_recv() {
Ok(q_res) => {
self.response_send(q_res);
}
Err(mpsc::TryRecvError::Empty) => {
break;
}
Err(mpsc::TryRecvError::Disconnected) => {
error!("The RxQueue become disconnected");
break;
}
}
}
} }
// handle the request fn requset_recv(&self) -> Option<CoAPRequest> {
let coap_handler = self.coap_handler;
let mut buf = [0; 1500]; let mut buf = [0; 1500];
match self.socket.recv_from(&mut buf) { match self.socket.recv_from(&mut buf) {
Ok(Some((nread, src))) => { Ok(Some((nread, src))) => {
debug!("Handling request from {}", src); debug!("Handling request from {}", src);
let response_q = self.tx_sender.clone();
event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap();
self.thread_pool.execute(move || {
match Packet::from_bytes(&buf[..nread]) { match Packet::from_bytes(&buf[..nread]) {
Ok(packet) => { Ok(packet) => {
// Dispatch user handler, if there is a response packet return Some(CoAPRequest::from_packet(packet, &src));
// send the reply via the writable event
let rqst = CoAPRequest::from_packet(packet, &src);
match coap_handler.handle(rqst) {
Some(response) => {
debug!("Response: {:?}", response);
response_q.send(QueuedResponse {
address: src,
response: Some(response),
})
.unwrap();
}
None => {
response_q.send(QueuedResponse {
address: src,
response: None,
})
.unwrap();
}
};
} }
Err(_) => { Err(_) => {
error!("Failed to parse request"); error!("Failed to parse request");
response_q.send(QueuedResponse { return None;
address: src, }
response: None,
})
.unwrap();
return;
} }
};
});
} }
_ => { _ => {
error!("Failed to read from socket"); error!("Failed to read from socket");
panic!("unexpected error"); panic!("unexpected error");
} }
} }
}
fn response_send(&self, q_res: QueuedResponse) {
match q_res.response.message.to_bytes() {
Ok(bytes) => {
match self.socket.send_to(&bytes[..], &q_res.address) {
Ok(None) => {
// Look at https://github.com/carllerche/mio/issues/411 in detail
error!("Failed to complete the response");
}
Ok(_) => {}
Err(_) => {
error!("Failed to send response");
}
}
}
Err(_) => {
error!("Failed to decode response");
}
}
}
}
impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
type Timeout = usize;
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) {
if events.is_writable() {
// handle the response
self.response_handler();
event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
return;
} else if events.is_readable() {
// handle the request
match self.request_handler() {
Some(response) => {
self.tx_sender.send(response).unwrap();
event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap();
}
None => {}
}
} else {
warn!("Unreadable Event, {:?}", events);
}
} }
fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) { fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) {
...@@ -147,7 +179,6 @@ pub struct CoAPServer { ...@@ -147,7 +179,6 @@ pub struct CoAPServer {
socket: UdpSocket, socket: UdpSocket,
event_sender: Option<Sender<()>>, event_sender: Option<Sender<()>>,
event_thread: Option<thread::JoinHandle<()>>, event_thread: Option<thread::JoinHandle<()>>,
worker_num: usize,
} }
impl CoAPServer { impl CoAPServer {
...@@ -161,7 +192,6 @@ impl CoAPServer { ...@@ -161,7 +192,6 @@ impl CoAPServer {
socket: s, socket: s,
event_sender: None, event_sender: None,
event_thread: None, event_thread: None,
worker_num: DEFAULT_WORKER_NUM,
}) })
}) })
} }
...@@ -188,20 +218,18 @@ impl CoAPServer { ...@@ -188,20 +218,18 @@ impl CoAPServer {
} }
// Create resources // Create resources
let worker_num = self.worker_num;
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel(); let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel();
// Setup and spawn event loop thread, which will spawn // Setup and spawn event loop thread, which will spawn
// children threads which handle incomining requests // children threads which handle incomining requests
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let thread_pool = ThreadPool::new(worker_num);
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
tx.send(event_loop.channel()).unwrap(); tx.send(event_loop.channel()).unwrap();
event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, tx_recv, handler)).unwrap(); event_loop.run(&mut UdpHandler::new(socket, tx_send, tx_recv, handler)).unwrap();
}); });
// Ensure threads started successfully // Ensure threads started successfully
...@@ -226,47 +254,8 @@ impl CoAPServer { ...@@ -226,47 +254,8 @@ impl CoAPServer {
_ => {} _ => {}
} }
} }
/// Set the number of threads for handling requests
pub fn set_worker_num(&mut self, worker_num: usize) {
self.worker_num = worker_num;
}
} }
fn response_handler(tx_recv: &RxQueue, tx_only: &UdpSocket) {
// TODO: Add better support for failure detection or logging
match tx_recv.recv() {
Ok(q_res) => {
match q_res.response {
Some(resp) => {
match resp.message.to_bytes() {
Ok(bytes) => {
match tx_only.send_to(&bytes[..], &q_res.address) {
Ok(None) => {
// Look at https://github.com/carllerche/mio/issues/411 in detail
error!("Failed to complete the response");
}
Ok(_) => {}
Err(_) => {
error!("Failed to send response");
}
}
}
Err(_) => {
error!("Failed to decode response");
}
}
}
None => {
debug!("No response");
}
}
}
Err(_) => {
error!("Failed to get response");
}
}
}
impl Drop for CoAPServer { impl Drop for CoAPServer {
fn drop(&mut self) { fn drop(&mut self) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment