Skip to content
Snippets Groups Projects
Commit b38d4d01 authored by Covertness's avatar Covertness
Browse files

handle request with thread pool

parent 0a8d8224
No related branches found
No related tags found
No related merge requests found
...@@ -13,3 +13,4 @@ keywords = ["CoAP"] ...@@ -13,3 +13,4 @@ keywords = ["CoAP"]
bincode = "0.3.0" bincode = "0.3.0"
rustc-serialize = "0.3" rustc-serialize = "0.3"
mio = "0.3.7" mio = "0.3.7"
threadpool = "0.1"
\ No newline at end of file
...@@ -78,6 +78,7 @@ ...@@ -78,6 +78,7 @@
extern crate bincode; extern crate bincode;
extern crate rustc_serialize; extern crate rustc_serialize;
extern crate mio; extern crate mio;
extern crate threadpool;
pub use server::CoAPServer; pub use server::CoAPServer;
pub use client::CoAPClient; pub use client::CoAPClient;
......
...@@ -6,6 +6,9 @@ use mio::*; ...@@ -6,6 +6,9 @@ use mio::*;
use mio::udp::UdpSocket; use mio::udp::UdpSocket;
use packet::Packet; use packet::Packet;
use client::CoAPClient; use client::CoAPClient;
use threadpool::ThreadPool;
const DEFAULT_WORKER_NUM: usize = 4;
#[derive(Debug)] #[derive(Debug)]
pub enum CoAPServerError { pub enum CoAPServerError {
...@@ -24,19 +27,19 @@ impl<F> CoAPHandler for F where F: Fn(Packet, CoAPClient), F: Sync + Send + Copy ...@@ -24,19 +27,19 @@ impl<F> CoAPHandler for F where F: Fn(Packet, CoAPClient), F: Sync + Send + Copy
} }
} }
struct UdpHandler<H: CoAPHandler + 'static>(UdpSocket, H); struct UdpHandler<H: CoAPHandler + 'static>(UdpSocket, ThreadPool, H);
impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
type Timeout = (); type Timeout = ();
type Message = (); type Message = ();
fn readable(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, _: ReadHint) { fn readable(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, _: ReadHint) {
let UdpHandler(ref mut socket, coap_handler) = *self; let UdpHandler(ref mut socket, ref thread_pool, coap_handler) = *self;
let mut buf = [0; 1500]; let mut buf = [0; 1500];
match socket.recv_from(&mut buf) { match socket.recv_from(&mut buf) {
Ok((nread, src)) => { Ok((nread, src)) => {
thread::spawn(move || { thread_pool.execute(move || {
match Packet::from_bytes(&buf[..nread]) { match Packet::from_bytes(&buf[..nread]) {
Ok(packet) => { Ok(packet) => {
let client = CoAPClient::new(src).unwrap(); let client = CoAPClient::new(src).unwrap();
...@@ -59,6 +62,7 @@ pub struct CoAPServer { ...@@ -59,6 +62,7 @@ pub struct CoAPServer {
socket: UdpSocket, socket: UdpSocket,
event_sender: Option<Sender<()>>, event_sender: Option<Sender<()>>,
event_thread: Option<thread::JoinHandle<()>>, event_thread: Option<thread::JoinHandle<()>>,
worker_num: usize,
} }
impl CoAPServer { impl CoAPServer {
...@@ -68,6 +72,7 @@ impl CoAPServer { ...@@ -68,6 +72,7 @@ impl CoAPServer {
socket: s, socket: s,
event_sender: None, event_sender: None,
event_thread: None, event_thread: None,
worker_num: DEFAULT_WORKER_NUM,
})) }))
} }
...@@ -75,17 +80,19 @@ impl CoAPServer { ...@@ -75,17 +80,19 @@ impl CoAPServer {
pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> { pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> {
match self.event_sender { match self.event_sender {
None => { None => {
let worker_num = self.worker_num;
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let socket = self.socket.try_clone(); let socket = self.socket.try_clone();
match socket { match socket {
Ok(socket) => { Ok(socket) => {
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let thread_pool = ThreadPool::new(worker_num);
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
event_loop.register(&socket, Token(0)).unwrap(); event_loop.register(&socket, Token(0)).unwrap();
tx.send(event_loop.channel()).unwrap(); tx.send(event_loop.channel()).unwrap();
event_loop.run(&mut UdpHandler(socket, handler)).unwrap(); event_loop.run(&mut UdpHandler(socket, thread_pool, handler)).unwrap();
}); });
match rx.recv() { match rx.recv() {
...@@ -115,6 +122,10 @@ impl CoAPServer { ...@@ -115,6 +122,10 @@ impl CoAPServer {
_ => {}, _ => {},
} }
} }
pub fn set_worker_num(&mut self, worker_num: usize) {
self.worker_num = worker_num;
}
} }
impl Drop for CoAPServer { impl Drop for CoAPServer {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment