Skip to content
Snippets Groups Projects
Select Git revision
  • f427faec15eff7a181deb318fe666b3804d3107e
  • master default
  • 0.5.1
  • 0.3.1
4 results

server.rs

Blame
  • server.rs 7.64 KiB
    use std;
    use std::io::{Error, ErrorKind};
    use std::thread;
    use std::net::{ToSocketAddrs, SocketAddr};
    use std::sync::mpsc;
    use mio::{EventLoop, PollOpt, EventSet, Handler, Sender, Token};
    use mio::udp::UdpSocket;
    use packet::Packet;
    use threadpool::ThreadPool;
    
    const DEFAULT_WORKER_NUM: usize = 4;
    pub type TxQueue = mpsc::Sender<CoAPResponse>;
    pub type RxQueue = mpsc::Receiver<CoAPResponse>;
    
    #[derive(Debug)]
    pub enum CoAPServerError {
    	NetworkError,
    	EventLoopError,
    	AnotherHandlerIsRunning,
    }
    
    #[derive(Debug)]
    pub struct CoAPResponse {
    	pub address: SocketAddr,
    	pub response: Packet
    }
    
    pub trait CoAPHandler: Sync + Send + Copy {
    	fn handle(&self, Packet) -> Option<Packet>;
    }
    
    impl<F> CoAPHandler for F where F: Fn(Packet) -> Option<Packet>, F: Sync + Send + Copy {
    	fn handle(&self, request: Packet) -> Option<Packet> {
    		return self(request);
    	}
    }
    
    struct UdpHandler<H: CoAPHandler + 'static> {
    	socket: UdpSocket,
    	thread_pool: ThreadPool,
    	tx_sender: TxQueue,
    	coap_handler: H
    }
    
    impl<H: CoAPHandler + 'static> UdpHandler<H> {
    	fn new(socket: UdpSocket, thread_pool: ThreadPool, tx_sender: TxQueue, coap_handler: H) -> UdpHandler<H> {
    		UdpHandler {
    			socket: socket,
    			thread_pool: thread_pool,
    			tx_sender: tx_sender,
    			coap_handler: coap_handler
    		}
    	}
    }
    
    impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
    	type Timeout = usize;
    	type Message = ();
    
    	fn ready(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) {
    		if !events.is_readable() {
    			warn!("Unreadable Event");
    			return;
    		}
    
    		let coap_handler = self.coap_handler;
    		let mut buf = [0; 1500];
    
    		match self.socket.recv_from(&mut buf) {
    			Ok(Some((nread, src))) => {
    				debug!("Handling request from {}", src);
    				let response_q = self.tx_sender.clone();
    				self.thread_pool.execute(move || {
    
    					match Packet::from_bytes(&buf[..nread]) {
    						Ok(packet) => {
    							// Dispatch user handler, if there is a response packet
    							//   send the reply via the TX thread
    							match coap_handler.handle(packet) {
    								Some(response) => {
    									debug!("Response: {:?}", response);
    									response_q.send(CoAPResponse{
    										address: src,
    										response: response
    									}).unwrap();
    								},
    								None => {
    									debug!("No response");
    								}
    							};
    						},
    						Err(_) => {
    							error!("Failed to parse request");
    							return;
    						}
    					};
    
    
    				});
    			},
    			_ => {
    				error!("Failed to read from socket");
    				panic!("unexpected error");
    			},
    		}
    
    	}
    
    	fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) {
    		info!("Shutting down request handler");
            event_loop.shutdown();
        }
    }
    
    pub struct CoAPServer {
        socket: UdpSocket,
        event_sender: Option<Sender<()>>,
        event_thread: Option<thread::JoinHandle<()>>,
        tx_thread: Option<thread::JoinHandle<()>>,
        worker_num: usize,
    }
    
    impl CoAPServer {
    	/// Creates a CoAP server listening on the given address.
    	pub fn new<A: ToSocketAddrs>(addr: A) -> std::io::Result<CoAPServer> {
    		addr.to_socket_addrs().and_then(|mut iter| {
    			match iter.next() {
    				Some(ad) => {
    					UdpSocket::bound(&ad).and_then(|s| Ok(CoAPServer {
    						socket: s,
    						event_sender: None,
    						event_thread: None,
    						tx_thread: None,
    						worker_num: DEFAULT_WORKER_NUM,
    					}))
    				},
    				None => Err(Error::new(ErrorKind::Other, "no address"))
    			}
    		})
    	}
    
    	/// Starts handling requests with the handler
    	pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> {
    		let socket;
    
    		// Early return error checking
    		if let Some(_) = self.event_sender {
    			error!("Handler already running!");
    			return Err(CoAPServerError::AnotherHandlerIsRunning);
    		}
    		match self.socket.try_clone() {
    			Ok(good_socket) => {
    				socket = good_socket
    			},
    			Err(_) => {
    				error!("Network Error!");
    				return Err(CoAPServerError::NetworkError);
    			},
    		}
    
    		// Create resources
    		let worker_num = self.worker_num;
    		let (tx, rx) = mpsc::channel();
    		let (tx_send, tx_recv) : (TxQueue, RxQueue) = mpsc::channel();
    		let tx_only = self.socket.try_clone().unwrap();
    
    		// Setup and spawn single TX thread
    		let tx_thread = thread::spawn(move || {
    			transmit_handler(tx_recv, tx_only);
    		});
    
    		// Setup and spawn event loop thread, which will spawn
    		//   children threads which handle incomining requests
    		let thread = thread::spawn(move || {
    			let thread_pool = ThreadPool::new(worker_num);
    			let mut event_loop = EventLoop::new().unwrap();
    			event_loop.register(&socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
    
    			tx.send(event_loop.channel()).unwrap();
    
    			event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, handler)).unwrap();
    		});
    
    		// Ensure threads started successfully
    		match rx.recv() {
    			Ok(event_sender) => {
    				self.event_sender = Some(event_sender);
    				self.event_thread = Some(thread);
    				self.tx_thread = Some(tx_thread);
    				Ok(())
    			},
    			Err(_) => Err(CoAPServerError::EventLoopError)
    		}
    	}
    
    	/// Stop the server.
    	pub fn stop(&mut self) {
    		let event_sender = self.event_sender.take();
    		match event_sender {
    			Some(ref sender) => {
    				sender.send(()).unwrap();
    				self.event_thread.take().map(|g| g.join());
    			},
    			_ => {},
    		}
    	}
    
    	/// Set the number of threads for handling requests
    	pub fn set_worker_num(&mut self, worker_num: usize) {
    		self.worker_num = worker_num;
    	}
    }
    
    fn transmit_handler(tx_recv: RxQueue, tx_only: UdpSocket) {
    	// Note! We should only transmit with this UDP Socket
    	// TODO: Add better support for failure detection or logging
    	loop {
    		match tx_recv.recv() {
    			Ok(q_res) => {
    				match q_res.response.to_bytes() {
    					Ok(bytes) => {
    						let _ = tx_only.send_to(&bytes[..], &q_res.address);
    					},
    					Err(_) => {
    						error!("Failed to decode response");
    					}
    				}
    			},
    			// recv error occurs when all transmitters are terminited
    			//   (when all UDP Handlers are closed)
    			Err(_) => {
    				info!("Shutting down Transmit Handler");
    				break;
    			}
    		}
    	}
    }
    
    impl Drop for CoAPServer {
        fn drop(&mut self) {
            self.stop();
        }
    }
    
    
    #[cfg(test)]
    mod test {
    	use super::*;
    	use packet::{Packet, PacketType, OptionType, auto_response};
    	use client::CoAPClient;
    
    	fn request_handler(req: Packet) -> Option<Packet> {
    		let uri_path_list = req.get_option(OptionType::UriPath).unwrap();
    		assert!(uri_path_list.len() == 1);
    
    		return match auto_response(req) {
    			Ok(mut response) => {
    				response.set_payload(uri_path_list.front().unwrap().clone());
    				Some(response)
    			},
    			Err(_) => None
    		};
    	}
    
    	#[test]
    	fn test_echo_server() {
    		let mut server = CoAPServer::new("127.0.0.1:5683").unwrap();
    		server.handle(request_handler).unwrap();
    
    		let client = CoAPClient::new("127.0.0.1:5683").unwrap();
    		let mut packet = Packet::new();
    		packet.header.set_version(1);
    		packet.header.set_type(PacketType::Confirmable);
    		packet.header.set_code("0.01");
    		packet.header.set_message_id(1);
    		packet.set_token(vec!(0x51, 0x55, 0x77, 0xE8));
    		packet.add_option(OptionType::UriPath, b"test-echo".to_vec());
    		client.send(&packet).unwrap();
    
    		let recv_packet = client.receive().unwrap();
    		assert_eq!(recv_packet.payload, b"test-echo".to_vec());
    	}
    
    	#[test]
    	fn test_echo_server_no_token() {
    		let mut server = CoAPServer::new("127.0.0.1:5683").unwrap();
    		server.handle(request_handler).unwrap();
    
    		let client = CoAPClient::new("127.0.0.1:5683").unwrap();
    		let mut packet = Packet::new();
    		packet.header.set_version(1);
    		packet.header.set_type(PacketType::Confirmable);
    		packet.header.set_code("0.01");
    		packet.header.set_message_id(1);
    		packet.add_option(OptionType::UriPath, b"test-echo".to_vec());
    		client.send(&packet).unwrap();
    
    		let recv_packet = client.receive().unwrap();
    		assert_eq!(recv_packet.payload, b"test-echo".to_vec());
    	}
    }