Skip to content
Snippets Groups Projects
Commit 414d1cf7 authored by Covertness's avatar Covertness
Browse files

support thread pool

parent 4be8812b
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ url = "0.2.36" ...@@ -17,6 +17,7 @@ url = "0.2.36"
num = "0.1" num = "0.1"
rand = "0.3" rand = "0.3"
log = "0.3" log = "0.3"
threadpool = "1.3"
[dev-dependencies] [dev-dependencies]
quickcheck = "0.2.27" quickcheck = "0.2.27"
...@@ -73,6 +73,7 @@ extern crate mio; ...@@ -73,6 +73,7 @@ extern crate mio;
extern crate url; extern crate url;
extern crate num; extern crate num;
extern crate rand; extern crate rand;
extern crate threadpool;
#[cfg(test)] #[cfg(test)]
extern crate quickcheck; extern crate quickcheck;
......
...@@ -8,6 +8,9 @@ use mio::udp::UdpSocket; ...@@ -8,6 +8,9 @@ use mio::udp::UdpSocket;
use message::packet::Packet; use message::packet::Packet;
use message::request::CoAPRequest; use message::request::CoAPRequest;
use message::response::CoAPResponse; use message::response::CoAPResponse;
use threadpool::ThreadPool;
const DEFAULT_WORKER_NUM: usize = 4;
type TxQueue = mpsc::Sender<QueuedResponse>; type TxQueue = mpsc::Sender<QueuedResponse>;
type RxQueue = mpsc::Receiver<QueuedResponse>; type RxQueue = mpsc::Receiver<QueuedResponse>;
...@@ -25,6 +28,19 @@ struct QueuedResponse { ...@@ -25,6 +28,19 @@ struct QueuedResponse {
pub response: CoAPResponse, pub response: CoAPResponse,
} }
#[derive(Debug)]
enum EventLoopNotify {
NewResponse,
Shutdown,
}
#[derive(Debug)]
enum ResponseError {
SocketUnwritable,
SocketError,
PacketInvalid,
}
pub trait CoAPHandler: Sync + Send + Copy { pub trait CoAPHandler: Sync + Send + Copy {
fn handle(&self, CoAPRequest) -> Option<CoAPResponse>; fn handle(&self, CoAPRequest) -> Option<CoAPResponse>;
} }
...@@ -42,6 +58,7 @@ struct UdpHandler<H: CoAPHandler + 'static> { ...@@ -42,6 +58,7 @@ struct UdpHandler<H: CoAPHandler + 'static> {
socket: UdpSocket, socket: UdpSocket,
tx_sender: TxQueue, tx_sender: TxQueue,
rx_recv: RxQueue, rx_recv: RxQueue,
worker_pool: ThreadPool,
coap_handler: H, coap_handler: H,
} }
...@@ -49,52 +66,70 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> { ...@@ -49,52 +66,70 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
fn new(socket: UdpSocket, fn new(socket: UdpSocket,
tx_sender: TxQueue, tx_sender: TxQueue,
rx_recv: RxQueue, rx_recv: RxQueue,
worker_num: usize,
coap_handler: H) coap_handler: H)
-> UdpHandler<H> { -> UdpHandler<H> {
UdpHandler { UdpHandler {
socket: socket, socket: socket,
tx_sender: tx_sender, tx_sender: tx_sender,
rx_recv: rx_recv, rx_recv: rx_recv,
worker_pool: ThreadPool::new(worker_num),
coap_handler: coap_handler, coap_handler: coap_handler,
} }
} }
fn request_handler(&self) -> Option<QueuedResponse> { fn request_handler(&self, event_loop: &mut EventLoop<UdpHandler<H>>) {
match self.requset_recv() { match self.requset_recv() {
Some(rqst) => { Some(rqst) => {
let src = rqst.source.unwrap(); let src = rqst.source.unwrap();
match self.coap_handler.handle(rqst) { let coap_handler = self.coap_handler;
let response_q = self.tx_sender.clone();
let event_sender = event_loop.channel();
self.worker_pool.execute(move || {
match coap_handler.handle(rqst) {
Some(response) => { Some(response) => {
debug!("Response: {:?}", response); debug!("Response: {:?}", response);
return Some(QueuedResponse {
response_q.send(QueuedResponse {
address: src, address: src,
response: response, response: response,
}); }).unwrap();
match event_sender.send(EventLoopNotify::NewResponse) {
Ok(()) => {}
Err(error) => {
warn!("Notify NewResponse failed, {:?}", error);
}
}
} }
None => { None => {
debug!("No response"); debug!("No response");
return None;
}
} }
} }
None => { });
return None;
} }
None => {}
} }
} }
fn response_handler(&self) { fn response_handler(&self) -> bool {
loop { loop {
match self.rx_recv.try_recv() { match self.rx_recv.try_recv() {
Ok(q_res) => { Ok(q_res) => {
self.response_send(q_res); match self.response_send(&q_res) {
Ok(()) => {}
Err(ResponseError::SocketUnwritable) => {
self.tx_sender.send(q_res).unwrap();
return false;
}
Err(_) => {}
}
} }
Err(mpsc::TryRecvError::Empty) => { Err(mpsc::TryRecvError::Empty) => {
break; return true;
} }
Err(mpsc::TryRecvError::Disconnected) => { Err(mpsc::TryRecvError::Disconnected) => {
error!("The RxQueue become disconnected"); panic!("The RxQueue become disconnected");
break;
} }
} }
} }
...@@ -124,22 +159,22 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> { ...@@ -124,22 +159,22 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
} }
} }
fn response_send(&self, q_res: QueuedResponse) { fn response_send(&self, q_res: & QueuedResponse) -> Result<(), ResponseError> {
match q_res.response.message.to_bytes() { match q_res.response.message.to_bytes() {
Ok(bytes) => { Ok(bytes) => {
match self.socket.send_to(&bytes[..], &q_res.address) { match self.socket.send_to(&bytes[..], &q_res.address) {
Ok(None) => {
// Look at https://github.com/carllerche/mio/issues/411 in detail // Look at https://github.com/carllerche/mio/issues/411 in detail
error!("Failed to complete the response"); Ok(None) => return Err(ResponseError::SocketUnwritable),
Ok(_) => Ok(()),
Err(error) => {
error!("Failed to send response, {:?}", error);
return Err(ResponseError::SocketError);
} }
Ok(_) => {}
Err(_) => {
error!("Failed to send response");
} }
} }
} Err(error) => {
Err(_) => { error!("Failed to decode response, {:?}", error);
error!("Failed to decode response"); return Err(ResponseError::PacketInvalid);
} }
} }
} }
...@@ -147,38 +182,45 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> { ...@@ -147,38 +182,45 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
type Timeout = usize; type Timeout = usize;
type Message = (); type Message = EventLoopNotify;
fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) {
if events.is_writable() { if events.is_writable() {
// handle the response // handle the response
self.response_handler(); match self.response_handler() {
event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); true => {
return; event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::level()).unwrap();
} else if events.is_readable() { }
// handle the request false => {
match self.request_handler() {
Some(response) => {
self.tx_sender.send(response).unwrap();
event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap(); event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap();
} }
None => {}
} }
} else if events.is_readable() {
// handle the request
self.request_handler(event_loop);
} else { } else {
warn!("Unreadable Event, {:?}", events); warn!("Unknown Event, {:?}", events);
} }
} }
fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) { fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, msg: EventLoopNotify) {
match msg {
EventLoopNotify::NewResponse => {
event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap();
}
EventLoopNotify::Shutdown => {
info!("Shutting down request handler"); info!("Shutting down request handler");
event_loop.shutdown(); event_loop.shutdown();
} }
} }
}
}
pub struct CoAPServer { pub struct CoAPServer {
socket: UdpSocket, socket: UdpSocket,
event_sender: Option<Sender<()>>, event_sender: Option<Sender<EventLoopNotify>>,
event_thread: Option<thread::JoinHandle<()>>, event_thread: Option<thread::JoinHandle<()>>,
worker_num: usize,
} }
impl CoAPServer { impl CoAPServer {
...@@ -192,6 +234,7 @@ impl CoAPServer { ...@@ -192,6 +234,7 @@ impl CoAPServer {
socket: s, socket: s,
event_sender: None, event_sender: None,
event_thread: None, event_thread: None,
worker_num: DEFAULT_WORKER_NUM,
}) })
}) })
} }
...@@ -220,16 +263,17 @@ impl CoAPServer { ...@@ -220,16 +263,17 @@ impl CoAPServer {
// Create resources // Create resources
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel(); let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel();
let worker_num = self.worker_num;
// Setup and spawn event loop thread, which will spawn // Setup and spawn event loop thread, which will spawn
// children threads which handle incomining requests // children threads which handle incomining requests
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::level()).unwrap();
tx.send(event_loop.channel()).unwrap(); tx.send(event_loop.channel()).unwrap();
event_loop.run(&mut UdpHandler::new(socket, tx_send, tx_recv, handler)).unwrap(); event_loop.run(&mut UdpHandler::new(socket, tx_send, tx_recv, worker_num, handler)).unwrap();
}); });
// Ensure threads started successfully // Ensure threads started successfully
...@@ -248,12 +292,17 @@ impl CoAPServer { ...@@ -248,12 +292,17 @@ impl CoAPServer {
let event_sender = self.event_sender.take(); let event_sender = self.event_sender.take();
match event_sender { match event_sender {
Some(ref sender) => { Some(ref sender) => {
sender.send(()).unwrap(); sender.send(EventLoopNotify::Shutdown).unwrap();
self.event_thread.take().map(|g| g.join()); self.event_thread.take().map(|g| g.join());
} }
_ => {} _ => {}
} }
} }
/// Set the number of threads for handling requests
pub fn set_worker_num(&mut self, worker_num: usize) {
self.worker_num = worker_num;
}
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment