diff --git a/Cargo.toml b/Cargo.toml index af4294590b2861d8ff1c94266ab65b225fa7f35b..3dacbc31757fb8347ce480907358aec2f7309515 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,8 @@ keywords = ["CoAP"] [dependencies] bincode = "0.3.0" rustc-serialize = "0.3" -mio = "0.3.7" +bytes = "0.2.11" +mio = "0.4" threadpool = "0.1" url = "0.2.36" num = "0.1" diff --git a/src/lib.rs b/src/lib.rs index 2eaa9c7163a092c6b8105a7fcf51b2f293cf31a1..3dff10dc63843e82e6daf118d7fc2d745d020e2a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,7 @@ extern crate bincode; extern crate rustc_serialize; +extern crate bytes; extern crate mio; extern crate threadpool; extern crate url; diff --git a/src/server.rs b/src/server.rs index 4e914e186da287d0b3183e0ca3ce01797683fd45..a47b7a9067ff768b2ba88d2b5ee37734cd3c7422 100644 --- a/src/server.rs +++ b/src/server.rs @@ -7,6 +7,7 @@ use mio::udp::UdpSocket; use packet::Packet; use client::CoAPClient; use threadpool::ThreadPool; +use bytes::RingBuf; const DEFAULT_WORKER_NUM: usize = 4; @@ -27,29 +28,45 @@ impl<F> CoAPHandler for F where F: Fn(Packet, CoAPClient), F: Sync + Send + Copy } } -struct UdpHandler<H: CoAPHandler + 'static>(UdpSocket, ThreadPool, H); +struct UdpHandler<H: CoAPHandler + 'static> { + socket: UdpSocket, + thread_pool: ThreadPool, + coap_handler: H +} + +impl<H: CoAPHandler + 'static> UdpHandler<H> { + fn new(socket: UdpSocket, thread_pool: ThreadPool, coap_handler: H) -> UdpHandler<H> { + UdpHandler { + socket: socket, + thread_pool: thread_pool, + coap_handler: coap_handler + } + } +} impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { - type Timeout = (); + type Timeout = usize; type Message = (); - fn readable(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, _: ReadHint) { - let UdpHandler(ref mut socket, ref thread_pool, coap_handler) = *self; - let mut buf = [0; 1500]; - - match socket.recv_from(&mut buf) { - Ok((nread, src)) => { - thread_pool.execute(move || { - match Packet::from_bytes(&buf[..nread]) { - Ok(packet) => { - let client = CoAPClient::new(src).unwrap(); - coap_handler.handle(packet, client); - }, - Err(_) => return - }; - }); - }, - Err(_) => panic!("unexpected error"), + fn ready(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { + if events.is_readable() { + let coap_handler = self.coap_handler; + let mut buf = RingBuf::new(1500); + + match self.socket.recv_from(&mut buf) { + Ok(Some(src)) => { + self.thread_pool.execute(move || { + match Packet::from_bytes(buf.bytes()) { + Ok(packet) => { + let client = CoAPClient::new(src).unwrap(); + coap_handler.handle(packet, client); + }, + Err(_) => return + }; + }); + }, + _ => panic!("unexpected error"), + } } } @@ -68,12 +85,19 @@ pub struct CoAPServer { impl CoAPServer { /// Creates a CoAP server listening on the given address. pub fn new<A: ToSocketAddrs>(addr: A) -> std::io::Result<CoAPServer> { - UdpSocket::bind(addr).and_then(|s| Ok(CoAPServer { - socket: s, - event_sender: None, - event_thread: None, - worker_num: DEFAULT_WORKER_NUM, - })) + addr.to_socket_addrs().and_then(|mut iter| { + match iter.next() { + Some(ad) => { + UdpSocket::bound(&ad).and_then(|s| Ok(CoAPServer { + socket: s, + event_sender: None, + event_thread: None, + worker_num: DEFAULT_WORKER_NUM, + })) + }, + None => Err(std::io::Error::new(std::io::ErrorKind::Other, "no address")) + } + }) } /// Starts handling requests with the handler. @@ -92,7 +116,7 @@ impl CoAPServer { tx.send(event_loop.channel()).unwrap(); - event_loop.run(&mut UdpHandler(socket, thread_pool, handler)).unwrap(); + event_loop.run(&mut UdpHandler::new(socket, thread_pool, handler)).unwrap(); }); match rx.recv() {