diff --git a/Cargo.toml b/Cargo.toml index 835a9a92758546f6e510528727bbf7792556effb..01ac77cb4f226988661bfbd0b9dccbffd19f073f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,4 +12,5 @@ keywords = ["CoAP"] [dependencies] bincode = "0.3.0" rustc-serialize = "0.3" -mio = "0.3.7" \ No newline at end of file +mio = "0.3.7" +threadpool = "0.1" \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 7ef6b1f2147667196be7fd3fe6311b11162b4ffc..f758ae73f83b9c2c3d513bd94064ce636b5ac492 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,6 +78,7 @@ extern crate bincode; extern crate rustc_serialize; extern crate mio; +extern crate threadpool; pub use server::CoAPServer; pub use client::CoAPClient; diff --git a/src/server.rs b/src/server.rs index 18f9bf06577add4cee03f3f1652113329d2b0f45..f32d3f2205187e49d364bbc982954b75550b4cbf 100644 --- a/src/server.rs +++ b/src/server.rs @@ -6,6 +6,9 @@ use mio::*; use mio::udp::UdpSocket; use packet::Packet; use client::CoAPClient; +use threadpool::ThreadPool; + +const DEFAULT_WORKER_NUM: usize = 4; #[derive(Debug)] pub enum CoAPServerError { @@ -24,19 +27,19 @@ impl<F> CoAPHandler for F where F: Fn(Packet, CoAPClient), F: Sync + Send + Copy } } -struct UdpHandler<H: CoAPHandler + 'static>(UdpSocket, H); +struct UdpHandler<H: CoAPHandler + 'static>(UdpSocket, ThreadPool, H); impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { type Timeout = (); type Message = (); fn readable(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, _: ReadHint) { - let UdpHandler(ref mut socket, coap_handler) = *self; + let UdpHandler(ref mut socket, ref thread_pool, coap_handler) = *self; let mut buf = [0; 1500]; match socket.recv_from(&mut buf) { Ok((nread, src)) => { - thread::spawn(move || { + thread_pool.execute(move || { match Packet::from_bytes(&buf[..nread]) { Ok(packet) => { let client = CoAPClient::new(src).unwrap(); @@ -59,6 +62,7 @@ pub struct CoAPServer { socket: UdpSocket, event_sender: Option<Sender<()>>, event_thread: Option<thread::JoinHandle<()>>, + worker_num: usize, } impl CoAPServer { @@ -68,6 +72,7 @@ impl CoAPServer { socket: s, event_sender: None, event_thread: None, + worker_num: DEFAULT_WORKER_NUM, })) } @@ -75,17 +80,19 @@ impl CoAPServer { pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> { match self.event_sender { None => { + let worker_num = self.worker_num; let (tx, rx) = mpsc::channel(); let socket = self.socket.try_clone(); match socket { Ok(socket) => { let thread = thread::spawn(move || { + let thread_pool = ThreadPool::new(worker_num); let mut event_loop = EventLoop::new().unwrap(); event_loop.register(&socket, Token(0)).unwrap(); tx.send(event_loop.channel()).unwrap(); - event_loop.run(&mut UdpHandler(socket, handler)).unwrap(); + event_loop.run(&mut UdpHandler(socket, thread_pool, handler)).unwrap(); }); match rx.recv() { @@ -115,6 +122,10 @@ impl CoAPServer { _ => {}, } } + + pub fn set_worker_num(&mut self, worker_num: usize) { + self.worker_num = worker_num; + } } impl Drop for CoAPServer {