use std; use std::thread; use std::net::ToSocketAddrs; use std::sync::mpsc; use mio::*; use mio::udp::UdpSocket; use packet::Packet; use client::CoAPClient; use threadpool::ThreadPool; const DEFAULT_WORKER_NUM: usize = 4; #[derive(Debug)] pub enum CoAPServerError { NetworkError, EventLoopError, AnotherHandlerIsRunning, } pub trait CoAPHandler: Sync + Send + Copy { fn handle(&self, Packet, CoAPClient); } impl<F> CoAPHandler for F where F: Fn(Packet, CoAPClient), F: Sync + Send + Copy { fn handle(&self, request: Packet, response: CoAPClient) { self(request, response); } } struct UdpHandler<H: CoAPHandler + 'static> { socket: UdpSocket, thread_pool: ThreadPool, coap_handler: H } impl<H: CoAPHandler + 'static> UdpHandler<H> { fn new(socket: UdpSocket, thread_pool: ThreadPool, coap_handler: H) -> UdpHandler<H> { UdpHandler { socket: socket, thread_pool: thread_pool, coap_handler: coap_handler } } } impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { type Timeout = usize; type Message = (); fn ready(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { if events.is_readable() { let coap_handler = self.coap_handler; let mut buf = [0; 1500]; match self.socket.recv_from(&mut buf) { Ok(Some((nread, src))) => { self.thread_pool.execute(move || { match Packet::from_bytes(&buf[..nread]) { Ok(packet) => { let client = CoAPClient::new(src).unwrap(); coap_handler.handle(packet, client); }, Err(_) => return }; }); }, _ => panic!("unexpected error"), } } } fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) { event_loop.shutdown(); } } pub struct CoAPServer { socket: UdpSocket, event_sender: Option<Sender<()>>, event_thread: Option<thread::JoinHandle<()>>, worker_num: usize, } impl CoAPServer { /// Creates a CoAP server listening on the given address. pub fn new<A: ToSocketAddrs>(addr: A) -> std::io::Result<CoAPServer> { addr.to_socket_addrs().and_then(|mut iter| { match iter.next() { Some(ad) => { UdpSocket::bound(&ad).and_then(|s| Ok(CoAPServer { socket: s, event_sender: None, event_thread: None, worker_num: DEFAULT_WORKER_NUM, })) }, None => Err(std::io::Error::new(std::io::ErrorKind::Other, "no address")) } }) } /// Starts handling requests with the handler. pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> { match self.event_sender { None => { let worker_num = self.worker_num; let (tx, rx) = mpsc::channel(); let socket = self.socket.try_clone(); match socket { Ok(socket) => { let thread = thread::spawn(move || { let thread_pool = ThreadPool::new(worker_num); let mut event_loop = EventLoop::new().unwrap(); event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); tx.send(event_loop.channel()).unwrap(); event_loop.run(&mut UdpHandler::new(socket, thread_pool, handler)).unwrap(); }); match rx.recv() { Ok(event_sender) => { self.event_sender = Some(event_sender); self.event_thread = Some(thread); Ok(()) }, Err(_) => Err(CoAPServerError::EventLoopError) } }, Err(_) => Err(CoAPServerError::NetworkError), } }, Some(_) => Err(CoAPServerError::AnotherHandlerIsRunning), } } /// Stop the server. pub fn stop(&mut self) { let event_sender = self.event_sender.take(); match event_sender { Some(ref sender) => { sender.send(()).unwrap(); self.event_thread.take().map(|g| g.join()); }, _ => {}, } } /// Set the number of threads for handling requests pub fn set_worker_num(&mut self, worker_num: usize) { self.worker_num = worker_num; } } impl Drop for CoAPServer { fn drop(&mut self) { self.stop(); } } #[cfg(test)] mod test { use super::*; use packet::{Packet, PacketType, OptionType}; use client::CoAPClient; fn request_handler(req: Packet, resp: CoAPClient) { let uri_path = req.get_option(OptionType::UriPath); assert!(uri_path.is_some()); let uri_path = uri_path.unwrap(); resp.reply(&req, uri_path.front().unwrap().clone()).unwrap(); } #[test] fn test_echo_server() { let mut server = CoAPServer::new("127.0.0.1:5683").unwrap(); server.handle(request_handler).unwrap(); let client = CoAPClient::new("127.0.0.1:5683").unwrap(); let mut packet = Packet::new(); packet.header.set_version(1); packet.header.set_type(PacketType::Confirmable); packet.header.set_code("0.01"); packet.header.set_message_id(1); packet.set_token(vec!(0x51, 0x55, 0x77, 0xE8)); packet.add_option(OptionType::UriPath, b"test-echo".to_vec()); client.send(&packet).unwrap(); let recv_packet = client.receive().unwrap(); assert_eq!(recv_packet.payload, b"test-echo".to_vec()); } #[test] fn test_echo_server_no_token() { let mut server = CoAPServer::new("127.0.0.1:5683").unwrap(); server.handle(request_handler).unwrap(); let client = CoAPClient::new("127.0.0.1:5683").unwrap(); let mut packet = Packet::new(); packet.header.set_version(1); packet.header.set_type(PacketType::Confirmable); packet.header.set_code("0.01"); packet.header.set_message_id(1); packet.add_option(OptionType::UriPath, b"test-echo".to_vec()); client.send(&packet).unwrap(); let recv_packet = client.receive().unwrap(); assert_eq!(recv_packet.payload, b"test-echo".to_vec()); } }