From 414d1cf7fb0ccb8b41905feea09b9cdabd99d4b2 Mon Sep 17 00:00:00 2001
From: Covertness <wuyingfengsui@gmail.com>
Date: Sat, 27 Aug 2016 18:07:15 +0800
Subject: [PATCH] support thread pool

---
 Cargo.toml    |   1 +
 src/lib.rs    |   1 +
 src/server.rs | 147 +++++++++++++++++++++++++++++++++-----------------
 3 files changed, 100 insertions(+), 49 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index fcdf1e6..32cacc6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,6 +17,7 @@ url = "0.2.36"
 num = "0.1"
 rand = "0.3"
 log = "0.3"
+threadpool = "1.3"
 
 [dev-dependencies]
 quickcheck = "0.2.27"
diff --git a/src/lib.rs b/src/lib.rs
index 69b30ea..3e04e3e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -73,6 +73,7 @@ extern crate mio;
 extern crate url;
 extern crate num;
 extern crate rand;
+extern crate threadpool;
 #[cfg(test)]
 extern crate quickcheck;
 
diff --git a/src/server.rs b/src/server.rs
index 377a775..67f722e 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -8,6 +8,9 @@ use mio::udp::UdpSocket;
 use message::packet::Packet;
 use message::request::CoAPRequest;
 use message::response::CoAPResponse;
+use threadpool::ThreadPool;
+
+const DEFAULT_WORKER_NUM: usize = 4;
 
 type TxQueue = mpsc::Sender<QueuedResponse>;
 type RxQueue = mpsc::Receiver<QueuedResponse>;
@@ -25,6 +28,19 @@ struct QueuedResponse {
     pub response: CoAPResponse,
 }
 
+#[derive(Debug)]
+enum EventLoopNotify {
+    NewResponse,
+    Shutdown,
+}
+
+#[derive(Debug)]
+enum ResponseError {
+    SocketUnwritable,
+    SocketError,
+    PacketInvalid,
+}
+
 pub trait CoAPHandler: Sync + Send + Copy {
     fn handle(&self, CoAPRequest) -> Option<CoAPResponse>;
 }
@@ -42,6 +58,7 @@ struct UdpHandler<H: CoAPHandler + 'static> {
     socket: UdpSocket,
     tx_sender: TxQueue,
     rx_recv: RxQueue,
+    worker_pool: ThreadPool,
     coap_handler: H,
 }
 
@@ -49,52 +66,70 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
     fn new(socket: UdpSocket,
            tx_sender: TxQueue,
            rx_recv: RxQueue,
+           worker_num: usize,
            coap_handler: H)
            -> UdpHandler<H> {
         UdpHandler {
             socket: socket,
             tx_sender: tx_sender,
             rx_recv: rx_recv,
+            worker_pool: ThreadPool::new(worker_num),
             coap_handler: coap_handler,
         }
     }
 
-    fn request_handler(&self) -> Option<QueuedResponse> {
+    fn request_handler(&self, event_loop: &mut EventLoop<UdpHandler<H>>) {
         match self.requset_recv() {
             Some(rqst) => {
                 let src = rqst.source.unwrap();
-                match self.coap_handler.handle(rqst) {
-                    Some(response) => {
-                        debug!("Response: {:?}", response);
-                        return Some(QueuedResponse {
-                            address: src,
-                            response: response,
-                        });
+                let coap_handler = self.coap_handler;
+                let response_q = self.tx_sender.clone();
+                let event_sender = event_loop.channel();
+
+                self.worker_pool.execute(move || {
+                    match coap_handler.handle(rqst) {
+                        Some(response) => {
+                            debug!("Response: {:?}", response);
+
+                            response_q.send(QueuedResponse {
+                                address: src,
+                                response: response,
+                            }).unwrap();
+                            match event_sender.send(EventLoopNotify::NewResponse) {
+                                Ok(()) => {}
+                                Err(error) => {
+                                    warn!("Notify NewResponse failed, {:?}", error);
+                                }
+                            }
+                        }
+                        None => {
+                            debug!("No response");
+                        }
                     }
-                    None => {
-                        debug!("No response");
-                        return None;
-                    }
-                }
-            }
-            None => {
-                return None;
+                });
             }
+            None => {}
         }
     }
 
-    fn response_handler(&self) {
+    fn response_handler(&self) -> bool {
         loop {
             match self.rx_recv.try_recv() {
                 Ok(q_res) => {
-                    self.response_send(q_res);
+                    match self.response_send(&q_res) {
+                        Ok(()) => {}
+                        Err(ResponseError::SocketUnwritable) => {
+                            self.tx_sender.send(q_res).unwrap();
+                            return false;
+                        }
+                        Err(_) => {}
+                    }
                 }
                 Err(mpsc::TryRecvError::Empty) => {
-                    break;
+                    return true;
                 }
                 Err(mpsc::TryRecvError::Disconnected) => {
-                    error!("The RxQueue become disconnected");
-                    break;
+                    panic!("The RxQueue become disconnected");
                 }
             }
         }
@@ -124,22 +159,22 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
         }
     }
 
-    fn response_send(&self, q_res: QueuedResponse) {
+    fn response_send(&self, q_res: & QueuedResponse) -> Result<(), ResponseError> {
         match q_res.response.message.to_bytes() {
             Ok(bytes) => {
                 match self.socket.send_to(&bytes[..], &q_res.address) {
-                    Ok(None) => {
-                        // Look at https://github.com/carllerche/mio/issues/411 in detail
-                        error!("Failed to complete the response");
-                    }
-                    Ok(_) => {}
-                    Err(_) => {
-                        error!("Failed to send response");
+                    // Look at https://github.com/carllerche/mio/issues/411 in detail
+                    Ok(None) => return Err(ResponseError::SocketUnwritable),
+                    Ok(_) => Ok(()),
+                    Err(error) => {
+                        error!("Failed to send response, {:?}", error);
+                        return Err(ResponseError::SocketError);
                     }
                 }
             }
-            Err(_) => {
-                error!("Failed to decode response");
+            Err(error) => {
+                error!("Failed to decode response, {:?}", error);
+                return Err(ResponseError::PacketInvalid);
             }
         }
     }
@@ -147,38 +182,45 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
 
 impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
     type Timeout = usize;
-    type Message = ();
+    type Message = EventLoopNotify;
 
     fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) {
         if events.is_writable() {
             // handle the response
-            self.response_handler();
-            event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
-            return;
-        } else if events.is_readable() {
-            // handle the request
-            match self.request_handler() {
-                Some(response) => {
-                    self.tx_sender.send(response).unwrap();
+            match self.response_handler() {
+                true => {
+                    event_loop.reregister(&self.socket, Token(0), EventSet::readable(), PollOpt::level()).unwrap();
+                }
+                false => {
                     event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap();
                 }
-                None => {}
             }
+        } else if events.is_readable() {
+            // handle the request
+            self.request_handler(event_loop);
         } else {
-            warn!("Unreadable Event, {:?}", events);
+            warn!("Unknown Event, {:?}", events);
         }
     }
 
-    fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) {
-        info!("Shutting down request handler");
-        event_loop.shutdown();
+    fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, msg: EventLoopNotify) {
+        match msg {
+            EventLoopNotify::NewResponse => {
+                event_loop.reregister(&self.socket, Token(0), EventSet::writable(), PollOpt::edge()).unwrap();
+            }
+            EventLoopNotify::Shutdown => {
+                info!("Shutting down request handler");
+                event_loop.shutdown();
+            }
+        }
     }
 }
 
 pub struct CoAPServer {
     socket: UdpSocket,
-    event_sender: Option<Sender<()>>,
+    event_sender: Option<Sender<EventLoopNotify>>,
     event_thread: Option<thread::JoinHandle<()>>,
+    worker_num: usize,
 }
 
 impl CoAPServer {
@@ -192,6 +234,7 @@ impl CoAPServer {
                             socket: s,
                             event_sender: None,
                             event_thread: None,
+                            worker_num: DEFAULT_WORKER_NUM,
                         })
                     })
                 }
@@ -220,16 +263,17 @@ impl CoAPServer {
         // Create resources
         let (tx, rx) = mpsc::channel();
         let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel();
+        let worker_num = self.worker_num;
 
         // Setup and spawn event loop thread, which will spawn
         //   children threads which handle incomining requests
         let thread = thread::spawn(move || {
             let mut event_loop = EventLoop::new().unwrap();
-            event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
+            event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::level()).unwrap();
 
             tx.send(event_loop.channel()).unwrap();
 
-            event_loop.run(&mut UdpHandler::new(socket, tx_send, tx_recv, handler)).unwrap();
+            event_loop.run(&mut UdpHandler::new(socket, tx_send, tx_recv, worker_num, handler)).unwrap();
         });
 
         // Ensure threads started successfully
@@ -248,12 +292,17 @@ impl CoAPServer {
         let event_sender = self.event_sender.take();
         match event_sender {
             Some(ref sender) => {
-                sender.send(()).unwrap();
+                sender.send(EventLoopNotify::Shutdown).unwrap();
                 self.event_thread.take().map(|g| g.join());
             }
             _ => {}
         }
     }
+
+    /// Set the number of threads for handling requests
+    pub fn set_worker_num(&mut self, worker_num: usize) {
+        self.worker_num = worker_num;
+    }
 }
 
 
-- 
GitLab