Skip to content
Snippets Groups Projects
Commit 0af4e219 authored by Covertness's avatar Covertness Committed by GitHub
Browse files

Merge pull request #12 from Covertness/worker-process

support thread pool
parents 4be8812b 414d1cf7
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ url = "0.2.36" ...@@ -17,6 +17,7 @@ url = "0.2.36"
num = "0.1" num = "0.1"
rand = "0.3" rand = "0.3"
log = "0.3" log = "0.3"
threadpool = "1.3"
[dev-dependencies] [dev-dependencies]
quickcheck = "0.2.27" quickcheck = "0.2.27"
...@@ -73,6 +73,7 @@ extern crate mio; ...@@ -73,6 +73,7 @@ extern crate mio;
extern crate url; extern crate url;
extern crate num; extern crate num;
extern crate rand; extern crate rand;
extern crate threadpool;
#[cfg(test)] #[cfg(test)]
extern crate quickcheck; extern crate quickcheck;
......
...@@ -8,6 +8,9 @@ use mio::udp::UdpSocket; ...@@ -8,6 +8,9 @@ use mio::udp::UdpSocket;
use message::packet::Packet; use message::packet::Packet;
use message::request::CoAPRequest; use message::request::CoAPRequest;
use message::response::CoAPResponse; use message::response::CoAPResponse;
use threadpool::ThreadPool;
const DEFAULT_WORKER_NUM: usize = 4;
type TxQueue = mpsc::Sender<QueuedResponse>; type TxQueue = mpsc::Sender<QueuedResponse>;
type RxQueue = mpsc::Receiver<QueuedResponse>; type RxQueue = mpsc::Receiver<QueuedResponse>;
...@@ -25,6 +28,19 @@ struct QueuedResponse { ...@@ -25,6 +28,19 @@ struct QueuedResponse {
pub response: CoAPResponse, pub response: CoAPResponse,
} }
#[derive(Debug)]
enum EventLoopNotify {
NewResponse,
Shutdown,
}
#[derive(Debug)]
enum ResponseError {
SocketUnwritable,
SocketError,
PacketInvalid,
}
pub trait CoAPHandler: Sync + Send + Copy { pub trait CoAPHandler: Sync + Send + Copy {
fn handle(&self, CoAPRequest) -> Option<CoAPResponse>; fn handle(&self, CoAPRequest) -> Option<CoAPResponse>;
} }
...@@ -42,6 +58,7 @@ struct UdpHandler<H: CoAPHandler + 'static> { ...@@ -42,6 +58,7 @@ struct UdpHandler<H: CoAPHandler + 'static> {
socket: UdpSocket, socket: UdpSocket,
tx_sender: TxQueue, tx_sender: TxQueue,
rx_recv: RxQueue, rx_recv: RxQueue,
worker_pool: ThreadPool,
coap_handler: H, coap_handler: H,
} }
...@@ -49,52 +66,70 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> { ...@@ -49,52 +66,70 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
fn new(socket: UdpSocket, fn new(socket: UdpSocket,
tx_sender: TxQueue, tx_sender: TxQueue,
rx_recv: RxQueue, rx_recv: RxQueue,
worker_num: usize,
coap_handler: H) coap_handler: H)
-> UdpHandler<H> { -> UdpHandler<H> {
UdpHandler { UdpHandler {
socket: socket, socket: socket,
tx_sender: tx_sender, tx_sender: tx_sender,
rx_recv: rx_recv, rx_recv: rx_recv,
worker_pool: ThreadPool::new(worker_num),
coap_handler: coap_handler, coap_handler: coap_handler,
} }
} }
fn request_handler(&self) -> Option<QueuedResponse> { fn request_handler(&self, event_loop: &mut EventLoop<UdpHandler<H>>) {
match self.requset_recv() { match self.requset_recv() {
Some(rqst) => { Some(rqst) => {
let src = rqst.source.unwrap(); let src = rqst.source.unwrap();
match self.coap_handler.handle(rqst) { let coap_handler = self.coap_handler;
let response_q = self.tx_sender.clone();
let event_sender = event_loop.channel();
self.worker_pool.execute(move || {
match coap_handler.handle(rqst) {
Some(response) => { Some(response) => {
debug!("Response: {:?}", response); debug!("Response: {:?}", response);
return Some(QueuedResponse {
response_q.send(QueuedResponse {
address: src, address: src,
response: response, response: response,
}); }).unwrap();
match event_sender.send(EventLoopNotify::NewResponse) {
Ok(()) => {}
Err(error) => {
warn!("Notify NewResponse failed, {:?}", error);
}
}
} }
None => { None => {
debug!("No response"); debug!("No response");
return None;
}
} }
} }
None => { });
return None;
} }
None => {}
} }
} }
fn response_handler(&self) { fn response_handler(&self) -> bool {
loop { loop {
match self.rx_recv.try_recv() { match self.rx_recv.try_recv() {
Ok(q_res) => { Ok(q_res) => {
self.response_send(q_res); match self.response_send(&q_res) {
Ok(()) => {}
Err(ResponseError::SocketUnwritable) => {
self.tx_sender.send(q_res).unwrap();
return false;
}
Err(_) => {}
}
} }
Err(mpsc::TryRecvError::Empty) => { Err(mpsc::TryRecvError::Empty) => {
break; return true;
} }
Err(mpsc::TryRecvError::Disconnected) => { Err(mpsc::TryRecvError::Disconnected) => {
error!("The RxQueue become disconnected"); panic!("The RxQueue become disconnected");
break;
} }
} }
} }
...@@ -124,22 +159,22 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> { ...@@ -124,22 +159,22 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
} }
} }
fn response_send(&self, q_res: QueuedResponse) { fn response_send(&self, q_res: & QueuedResponse) -> Result<(), ResponseError> {
match q_res.response.message.to_bytes() { match q_res.response.message.to_bytes() {
Ok(bytes) => { Ok(bytes) => {
match self.socket.send_to(&bytes[..], &q_res.address) { match self.socket.send_to(&bytes[..], &q_res.address) {
Ok(None) => {
// Look at https://github.com/carllerche/mio/issues/411 in detail // Look at https://github.com/carllerche/mio/issues/411 in detail
error!("Failed to complete the response"); Ok(None) => return Err(ResponseError::SocketUnwritable),
Ok(_) => Ok(()),
Err(error) => {
error!("Failed to send response, {:?}", error);
return Err(ResponseError::SocketError);
} }
Ok(_) => {}
Err(_) => {
error!("Failed to send response");
} }
} }
} Err(error) => {
Err(_) => { error!("Failed to decode response, {:?}", error);
error!("Failed to decode response"); return Err(ResponseError::PacketInvalid);
} }
} }
} }
...@@ -147,38 +182,45 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> { ...@@ -147,38 +182,45 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
type Timeout = usize; type Timeout = usize;
type Message = (); type Message = EventLoopNotify;
fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) {
if events.is_writable() { if events.is_writable() {
// handle the response // handle the response
self.response_handler(); match self.response_handler() {
event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); true => {
return; event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::level()).unwrap();
} else if events.is_readable() { }
// handle the request false => {
match self.request_handler() {
Some(response) => {
self.tx_sender.send(response).unwrap();
event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap(); event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap();
} }
None => {}
} }
} else if events.is_readable() {
// handle the request
self.request_handler(event_loop);
} else { } else {
warn!("Unreadable Event, {:?}", events); warn!("Unknown Event, {:?}", events);
} }
} }
fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) { fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, msg: EventLoopNotify) {
match msg {
EventLoopNotify::NewResponse => {
event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap();
}
EventLoopNotify::Shutdown => {
info!("Shutting down request handler"); info!("Shutting down request handler");
event_loop.shutdown(); event_loop.shutdown();
} }
} }
}
}
pub struct CoAPServer { pub struct CoAPServer {
socket: UdpSocket, socket: UdpSocket,
event_sender: Option<Sender<()>>, event_sender: Option<Sender<EventLoopNotify>>,
event_thread: Option<thread::JoinHandle<()>>, event_thread: Option<thread::JoinHandle<()>>,
worker_num: usize,
} }
impl CoAPServer { impl CoAPServer {
...@@ -192,6 +234,7 @@ impl CoAPServer { ...@@ -192,6 +234,7 @@ impl CoAPServer {
socket: s, socket: s,
event_sender: None, event_sender: None,
event_thread: None, event_thread: None,
worker_num: DEFAULT_WORKER_NUM,
}) })
}) })
} }
...@@ -220,16 +263,17 @@ impl CoAPServer { ...@@ -220,16 +263,17 @@ impl CoAPServer {
// Create resources // Create resources
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel(); let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel();
let worker_num = self.worker_num;
// Setup and spawn event loop thread, which will spawn // Setup and spawn event loop thread, which will spawn
// children threads which handle incomining requests // children threads which handle incomining requests
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::level()).unwrap();
tx.send(event_loop.channel()).unwrap(); tx.send(event_loop.channel()).unwrap();
event_loop.run(&mut UdpHandler::new(socket, tx_send, tx_recv, handler)).unwrap(); event_loop.run(&mut UdpHandler::new(socket, tx_send, tx_recv, worker_num, handler)).unwrap();
}); });
// Ensure threads started successfully // Ensure threads started successfully
...@@ -248,12 +292,17 @@ impl CoAPServer { ...@@ -248,12 +292,17 @@ impl CoAPServer {
let event_sender = self.event_sender.take(); let event_sender = self.event_sender.take();
match event_sender { match event_sender {
Some(ref sender) => { Some(ref sender) => {
sender.send(()).unwrap(); sender.send(EventLoopNotify::Shutdown).unwrap();
self.event_thread.take().map(|g| g.join()); self.event_thread.take().map(|g| g.join());
} }
_ => {} _ => {}
} }
} }
/// Set the number of threads for handling requests
pub fn set_worker_num(&mut self, worker_num: usize) {
self.worker_num = worker_num;
}
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment