From 9af3d71e753ae60d32af3e050699024e23e6d184 Mon Sep 17 00:00:00 2001 From: Covertness <wuyingfengsui@gmail.com> Date: Sun, 7 Aug 2016 17:04:41 +0800 Subject: [PATCH] send response by the writable event --- src/server.rs | 99 ++++++++++++++++++++++++++++----------------------- 1 file changed, 55 insertions(+), 44 deletions(-) diff --git a/src/server.rs b/src/server.rs index e56cc6b..2ef54ba 100644 --- a/src/server.rs +++ b/src/server.rs @@ -24,7 +24,7 @@ pub enum CoAPServerError { #[derive(Debug)] struct QueuedResponse { pub address: SocketAddr, - pub response: CoAPResponse, + pub response: Option<CoAPResponse>, } pub trait CoAPHandler: Sync + Send + Copy { @@ -44,6 +44,7 @@ struct UdpHandler<H: CoAPHandler + 'static> { socket: UdpSocket, thread_pool: ThreadPool, tx_sender: TxQueue, + rx_recv: RxQueue, coap_handler: H, } @@ -51,12 +52,14 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> { fn new(socket: UdpSocket, thread_pool: ThreadPool, tx_sender: TxQueue, + rx_recv: RxQueue, coap_handler: H) -> UdpHandler<H> { UdpHandler { socket: socket, thread_pool: thread_pool, tx_sender: tx_sender, + rx_recv: rx_recv, coap_handler: coap_handler, } } @@ -66,12 +69,20 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { type Timeout = usize; type Message = (); - fn ready(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { + fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { + // handle the response + if events.is_writable() { + response_handler(&self.rx_recv, &self.socket); + event_loop.register(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap(); + return; + } + if !events.is_readable() { - warn!("Unreadable Event"); + warn!("Unreadable Event, {:?}", events); return; } + // handle the request let coap_handler = self.coap_handler; let mut buf = [0; 1500]; @@ -80,28 +91,39 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { debug!("Handling request from {}", src); let response_q = self.tx_sender.clone(); + event_loop.register(&self.socket, Token(1), EventSet::writable(), PollOpt::edge()).unwrap(); + self.thread_pool.execute(move || { match Packet::from_bytes(&buf[..nread]) { Ok(packet) => { // Dispatch user handler, if there is a response packet - // send the reply via the TX thread + // send the reply via the writable event let rqst = CoAPRequest::from_packet(packet, &src); match coap_handler.handle(rqst) { Some(response) => { debug!("Response: {:?}", response); response_q.send(QueuedResponse { address: src, - response: response, + response: Some(response), }) .unwrap(); } None => { - debug!("No response"); + response_q.send(QueuedResponse { + address: src, + response: None, + }) + .unwrap(); } }; } Err(_) => { error!("Failed to parse request"); + response_q.send(QueuedResponse { + address: src, + response: None, + }) + .unwrap(); return; } }; @@ -125,7 +147,6 @@ pub struct CoAPServer { socket: UdpSocket, event_sender: Option<Sender<()>>, event_thread: Option<thread::JoinHandle<()>>, - tx_thread: Option<thread::JoinHandle<()>>, worker_num: usize, } @@ -140,7 +161,6 @@ impl CoAPServer { socket: s, event_sender: None, event_thread: None, - tx_thread: None, worker_num: DEFAULT_WORKER_NUM, }) }) @@ -171,13 +191,6 @@ impl CoAPServer { let worker_num = self.worker_num; let (tx, rx) = mpsc::channel(); let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel(); - let tx_only = self.socket.try_clone().unwrap(); - let tx_send2 = tx_send.clone(); - - // Setup and spawn single TX thread - let tx_thread = thread::spawn(move || { - transmit_handler(tx_send2, tx_recv, tx_only); - }); // Setup and spawn event loop thread, which will spawn // children threads which handle incomining requests @@ -188,7 +201,7 @@ impl CoAPServer { tx.send(event_loop.channel()).unwrap(); - event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, handler)).unwrap(); + event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, tx_recv, handler)).unwrap(); }); // Ensure threads started successfully @@ -196,7 +209,6 @@ impl CoAPServer { Ok(event_sender) => { self.event_sender = Some(event_sender); self.event_thread = Some(thread); - self.tx_thread = Some(tx_thread); Ok(()) } Err(_) => Err(CoAPServerError::EventLoopError), @@ -221,38 +233,37 @@ impl CoAPServer { } } -fn transmit_handler(tx_send: TxQueue, tx_recv: RxQueue, tx_only: UdpSocket) { - // Note! We should only transmit with this UDP Socket +fn response_handler(tx_recv: &RxQueue, tx_only: &UdpSocket) { // TODO: Add better support for failure detection or logging - loop { - match tx_recv.recv() { - Ok(q_res) => { - match q_res.response.message.to_bytes() { - Ok(bytes) => { - match tx_only.send_to(&bytes[..], &q_res.address) { - Ok(None) => { - // try to send again, look at https://github.com/Covertness/coap-rs/issues/8 in detail - tx_send.send(q_res).unwrap() - } - Ok(_) => { - continue; - } - Err(_) => { - error!("Failed to send response"); + match tx_recv.recv() { + Ok(q_res) => { + match q_res.response { + Some(resp) => { + match resp.message.to_bytes() { + Ok(bytes) => { + match tx_only.send_to(&bytes[..], &q_res.address) { + Ok(None) => { + // Look at https://github.com/carllerche/mio/issues/411 in detail + error!("Failed to complete the response"); + } + Ok(_) => {} + Err(_) => { + error!("Failed to send response"); + } } - } - } - Err(_) => { - error!("Failed to decode response"); + } + Err(_) => { + error!("Failed to decode response"); + } } } + None => { + debug!("No response"); + } } - // recv error occurs when all transmitters are terminited - // (when all UDP Handlers are closed) - Err(_) => { - info!("Shutting down Transmit Handler"); - break; - } + } + Err(_) => { + error!("Failed to get response"); } } } -- GitLab