Skip to content
Snippets Groups Projects
Commit 9af3d71e authored by Covertness's avatar Covertness
Browse files

send response by the writable event

parent b5469213
No related branches found
No related tags found
No related merge requests found
...@@ -24,7 +24,7 @@ pub enum CoAPServerError { ...@@ -24,7 +24,7 @@ pub enum CoAPServerError {
#[derive(Debug)] #[derive(Debug)]
struct QueuedResponse { struct QueuedResponse {
pub address: SocketAddr, pub address: SocketAddr,
pub response: CoAPResponse, pub response: Option<CoAPResponse>,
} }
pub trait CoAPHandler: Sync + Send + Copy { pub trait CoAPHandler: Sync + Send + Copy {
...@@ -44,6 +44,7 @@ struct UdpHandler<H: CoAPHandler + 'static> { ...@@ -44,6 +44,7 @@ struct UdpHandler<H: CoAPHandler + 'static> {
socket: UdpSocket, socket: UdpSocket,
thread_pool: ThreadPool, thread_pool: ThreadPool,
tx_sender: TxQueue, tx_sender: TxQueue,
rx_recv: RxQueue,
coap_handler: H, coap_handler: H,
} }
...@@ -51,12 +52,14 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> { ...@@ -51,12 +52,14 @@ impl<H: CoAPHandler + 'static> UdpHandler<H> {
fn new(socket: UdpSocket, fn new(socket: UdpSocket,
thread_pool: ThreadPool, thread_pool: ThreadPool,
tx_sender: TxQueue, tx_sender: TxQueue,
rx_recv: RxQueue,
coap_handler: H) coap_handler: H)
-> UdpHandler<H> { -> UdpHandler<H> {
UdpHandler { UdpHandler {
socket: socket, socket: socket,
thread_pool: thread_pool, thread_pool: thread_pool,
tx_sender: tx_sender, tx_sender: tx_sender,
rx_recv: rx_recv,
coap_handler: coap_handler, coap_handler: coap_handler,
} }
} }
...@@ -66,12 +69,20 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { ...@@ -66,12 +69,20 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
type Timeout = usize; type Timeout = usize;
type Message = (); type Message = ();
fn ready(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) { fn ready(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) {
// handle the response
if events.is_writable() {
response_handler(&self.rx_recv, &self.socket);
event_loop.register(&self.socket, Token(0), EventSet::readable(), PollOpt::edge()).unwrap();
return;
}
if !events.is_readable() { if !events.is_readable() {
warn!("Unreadable Event"); warn!("Unreadable Event, {:?}", events);
return; return;
} }
// handle the request
let coap_handler = self.coap_handler; let coap_handler = self.coap_handler;
let mut buf = [0; 1500]; let mut buf = [0; 1500];
...@@ -80,28 +91,39 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { ...@@ -80,28 +91,39 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
debug!("Handling request from {}", src); debug!("Handling request from {}", src);
let response_q = self.tx_sender.clone(); let response_q = self.tx_sender.clone();
event_loop.register(&self.socket, Token(1), EventSet::writable(), PollOpt::edge()).unwrap();
self.thread_pool.execute(move || { self.thread_pool.execute(move || {
match Packet::from_bytes(&buf[..nread]) { match Packet::from_bytes(&buf[..nread]) {
Ok(packet) => { Ok(packet) => {
// Dispatch user handler, if there is a response packet // Dispatch user handler, if there is a response packet
// send the reply via the TX thread // send the reply via the writable event
let rqst = CoAPRequest::from_packet(packet, &src); let rqst = CoAPRequest::from_packet(packet, &src);
match coap_handler.handle(rqst) { match coap_handler.handle(rqst) {
Some(response) => { Some(response) => {
debug!("Response: {:?}", response); debug!("Response: {:?}", response);
response_q.send(QueuedResponse { response_q.send(QueuedResponse {
address: src, address: src,
response: response, response: Some(response),
}) })
.unwrap(); .unwrap();
} }
None => { None => {
debug!("No response"); response_q.send(QueuedResponse {
address: src,
response: None,
})
.unwrap();
} }
}; };
} }
Err(_) => { Err(_) => {
error!("Failed to parse request"); error!("Failed to parse request");
response_q.send(QueuedResponse {
address: src,
response: None,
})
.unwrap();
return; return;
} }
}; };
...@@ -125,7 +147,6 @@ pub struct CoAPServer { ...@@ -125,7 +147,6 @@ pub struct CoAPServer {
socket: UdpSocket, socket: UdpSocket,
event_sender: Option<Sender<()>>, event_sender: Option<Sender<()>>,
event_thread: Option<thread::JoinHandle<()>>, event_thread: Option<thread::JoinHandle<()>>,
tx_thread: Option<thread::JoinHandle<()>>,
worker_num: usize, worker_num: usize,
} }
...@@ -140,7 +161,6 @@ impl CoAPServer { ...@@ -140,7 +161,6 @@ impl CoAPServer {
socket: s, socket: s,
event_sender: None, event_sender: None,
event_thread: None, event_thread: None,
tx_thread: None,
worker_num: DEFAULT_WORKER_NUM, worker_num: DEFAULT_WORKER_NUM,
}) })
}) })
...@@ -171,13 +191,6 @@ impl CoAPServer { ...@@ -171,13 +191,6 @@ impl CoAPServer {
let worker_num = self.worker_num; let worker_num = self.worker_num;
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel(); let (tx_send, tx_recv): (TxQueue, RxQueue) = mpsc::channel();
let tx_only = self.socket.try_clone().unwrap();
let tx_send2 = tx_send.clone();
// Setup and spawn single TX thread
let tx_thread = thread::spawn(move || {
transmit_handler(tx_send2, tx_recv, tx_only);
});
// Setup and spawn event loop thread, which will spawn // Setup and spawn event loop thread, which will spawn
// children threads which handle incomining requests // children threads which handle incomining requests
...@@ -188,7 +201,7 @@ impl CoAPServer { ...@@ -188,7 +201,7 @@ impl CoAPServer {
tx.send(event_loop.channel()).unwrap(); tx.send(event_loop.channel()).unwrap();
event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, handler)).unwrap(); event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, tx_recv, handler)).unwrap();
}); });
// Ensure threads started successfully // Ensure threads started successfully
...@@ -196,7 +209,6 @@ impl CoAPServer { ...@@ -196,7 +209,6 @@ impl CoAPServer {
Ok(event_sender) => { Ok(event_sender) => {
self.event_sender = Some(event_sender); self.event_sender = Some(event_sender);
self.event_thread = Some(thread); self.event_thread = Some(thread);
self.tx_thread = Some(tx_thread);
Ok(()) Ok(())
} }
Err(_) => Err(CoAPServerError::EventLoopError), Err(_) => Err(CoAPServerError::EventLoopError),
...@@ -221,22 +233,20 @@ impl CoAPServer { ...@@ -221,22 +233,20 @@ impl CoAPServer {
} }
} }
fn transmit_handler(tx_send: TxQueue, tx_recv: RxQueue, tx_only: UdpSocket) { fn response_handler(tx_recv: &RxQueue, tx_only: &UdpSocket) {
// Note! We should only transmit with this UDP Socket
// TODO: Add better support for failure detection or logging // TODO: Add better support for failure detection or logging
loop {
match tx_recv.recv() { match tx_recv.recv() {
Ok(q_res) => { Ok(q_res) => {
match q_res.response.message.to_bytes() { match q_res.response {
Some(resp) => {
match resp.message.to_bytes() {
Ok(bytes) => { Ok(bytes) => {
match tx_only.send_to(&bytes[..], &q_res.address) { match tx_only.send_to(&bytes[..], &q_res.address) {
Ok(None) => { Ok(None) => {
// try to send again, look at https://github.com/Covertness/coap-rs/issues/8 in detail // Look at https://github.com/carllerche/mio/issues/411 in detail
tx_send.send(q_res).unwrap() error!("Failed to complete the response");
}
Ok(_) => {
continue;
} }
Ok(_) => {}
Err(_) => { Err(_) => {
error!("Failed to send response"); error!("Failed to send response");
} }
...@@ -247,13 +257,14 @@ fn transmit_handler(tx_send: TxQueue, tx_recv: RxQueue, tx_only: UdpSocket) { ...@@ -247,13 +257,14 @@ fn transmit_handler(tx_send: TxQueue, tx_recv: RxQueue, tx_only: UdpSocket) {
} }
} }
} }
// recv error occurs when all transmitters are terminited None => {
// (when all UDP Handlers are closed) debug!("No response");
Err(_) => { }
info!("Shutting down Transmit Handler");
break;
} }
} }
Err(_) => {
error!("Failed to get response");
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment