Skip to content
Snippets Groups Projects
Commit 1d2450b9 authored by James Munns's avatar James Munns
Browse files

Implementation of separate thread managing outgoing queue

parent e8905188
No related branches found
No related tags found
No related merge requests found
...@@ -3,9 +3,16 @@ extern crate coap; ...@@ -3,9 +3,16 @@ extern crate coap;
use coap::packet::*; use coap::packet::*;
use coap::{CoAPServer, CoAPClient}; use coap::{CoAPServer, CoAPClient};
fn request_handler(req: Packet, resp: CoAPClient) { fn request_handler(req: Packet) -> Option<Packet> {
let uri_path = req.get_option(OptionType::UriPath).unwrap(); let uri_path = req.get_option(OptionType::UriPath).unwrap();
resp.reply(&req, uri_path.front().unwrap().clone()).unwrap();
return match coap::packet::auto_response(req) {
Ok(mut response) => {
response.set_payload(uri_path.front().unwrap().clone());
Some(response)
},
Err(_) => None
};
} }
fn main() { fn main() {
......
...@@ -2,11 +2,16 @@ extern crate coap; ...@@ -2,11 +2,16 @@ extern crate coap;
use std::io; use std::io;
use coap::packet::*; use coap::packet::*;
use coap::{CoAPServer, CoAPClient}; use coap::CoAPServer;
fn request_handler(req: Packet, resp: CoAPClient) { fn request_handler(req: Packet) -> Option<Packet> {
println!("Receive request: {:?}", req); return match coap::packet::auto_response(req) {
resp.reply(&req, b"OK".to_vec()).unwrap(); Ok(mut response) => {
response.set_payload(b"OK".to_vec());
Some(response)
},
Err(_) => None
};
} }
fn main() { fn main() {
......
...@@ -101,24 +101,6 @@ impl CoAPClient { ...@@ -101,24 +101,6 @@ impl CoAPClient {
Self::request_with_timeout(url, Some(Duration::new(DEFAULT_RECEIVE_TIMEOUT, 0))) Self::request_with_timeout(url, Some(Duration::new(DEFAULT_RECEIVE_TIMEOUT, 0)))
} }
/// Response the client with the specifc payload.
pub fn reply(&self, request_packet: &Packet, payload: Vec<u8>) -> Result<()> {
let mut packet = Packet::new();
packet.header.set_version(1);
let response_type = match request_packet.header.get_type() {
PacketType::Confirmable => PacketType::Acknowledgement,
PacketType::NonConfirmable => PacketType::NonConfirmable,
_ => return Err(Error::new(ErrorKind::InvalidInput, "request type error"))
};
packet.header.set_type(response_type);
packet.header.set_code("2.05");
packet.header.set_message_id(request_packet.header.get_message_id());
packet.set_token(request_packet.get_token().clone());
packet.payload = payload;
self.send(&packet)
}
/// Execute a request. /// Execute a request.
pub fn send(&self, packet: &Packet) -> Result<()> { pub fn send(&self, packet: &Packet) -> Result<()> {
match packet.to_bytes() { match packet.to_bytes() {
...@@ -164,7 +146,7 @@ mod test { ...@@ -164,7 +146,7 @@ mod test {
use super::*; use super::*;
use std::time::Duration; use std::time::Duration;
use std::io::ErrorKind; use std::io::ErrorKind;
use packet::{Packet, PacketType}; use packet::Packet;
use server::CoAPServer; use server::CoAPServer;
#[test] #[test]
...@@ -174,15 +156,8 @@ mod test { ...@@ -174,15 +156,8 @@ mod test {
assert!(CoAPClient::request("127.0.0.1").is_err()); assert!(CoAPClient::request("127.0.0.1").is_err());
} }
#[test] fn request_handler(_: Packet) -> Option<Packet> {
fn test_reply_error() { None
let client = CoAPClient::new("127.0.0.1:5683").unwrap();
let mut packet = Packet::new();
packet.header.set_type(PacketType::Acknowledgement);
assert!(client.reply(&packet, b"Test".to_vec()).is_err());
}
fn request_handler(_: Packet, _: CoAPClient) {
} }
#[test] #[test]
......
...@@ -29,8 +29,9 @@ ...@@ -29,8 +29,9 @@
//! use coap::packet::*; //! use coap::packet::*;
//! use coap::{CoAPServer, CoAPClient}; //! use coap::{CoAPServer, CoAPClient};
//! fn request_handler(req: Packet, _resp: CoAPClient) { //! fn request_handler(req: Packet) -> Option<Packet> {
//! println!("Receive request: {:?}", req); //! println!("Receive request: {:?}", req);
//! None
//! } //! }
//! fn main() { //! fn main() {
......
use bincode; use bincode;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::LinkedList; use std::collections::LinkedList;
use std;
macro_rules! u8_to_unsigned_be { macro_rules! u8_to_unsigned_be {
($src:ident, $start:expr, $end:expr, $t:ty) => ({ ($src:ident, $start:expr, $end:expr, $t:ty) => ({
...@@ -173,6 +174,10 @@ impl Packet { ...@@ -173,6 +174,10 @@ impl Packet {
self.options.insert(num, value); self.options.insert(num, value);
} }
pub fn set_payload(&mut self, payload: Vec<u8>){
self.payload = payload;
}
pub fn add_option(&mut self, tp: OptionType, value: Vec<u8>) { pub fn add_option(&mut self, tp: OptionType, value: Vec<u8>) {
let num = Self::get_option_number(tp); let num = Self::get_option_number(tp);
match self.options.get_mut(&num) { match self.options.get_mut(&num) {
...@@ -188,9 +193,12 @@ impl Packet { ...@@ -188,9 +193,12 @@ impl Packet {
self.options.insert(num, list); self.options.insert(num, list);
} }
pub fn get_option(&self, tp: OptionType) -> Option<&LinkedList<Vec<u8>>> { pub fn get_option(&self, tp: OptionType) -> Option<LinkedList<Vec<u8>>> {
let num = Self::get_option_number(tp); let num = Self::get_option_number(tp);
return self.options.get(&num); match self.options.get(&num) {
Some(options) => Some(options.clone()),
None => None
}
} }
/// Decodes a byte slice and construct the equivalent Packet. /// Decodes a byte slice and construct the equivalent Packet.
...@@ -226,40 +234,52 @@ impl Packet { ...@@ -226,40 +234,52 @@ impl Packet {
idx += 1; idx += 1;
if delta == 13 { // Check for special delta characters
match delta {
13 => {
if idx >= buf.len() { if idx >= buf.len() {
return Err(ParseError::InvalidOptionLength); return Err(ParseError::InvalidOptionLength);
} }
delta = buf[idx] as usize + 13; delta = buf[idx] as usize + 13;
idx += 1; idx += 1;
} else if delta == 14 { },
14 => {
if idx + 1 >= buf.len() { if idx + 1 >= buf.len() {
return Err(ParseError::InvalidOptionLength); return Err(ParseError::InvalidOptionLength);
} }
delta = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize; delta = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize;
idx += 2; idx += 2;
} else if delta == 15 { },
15 => {
return Err(ParseError::InvalidOptionDelta); return Err(ParseError::InvalidOptionDelta);
} }
_ => {}
};
if length == 13 { // Check for special length characters
match length {
13 => {
if idx >= buf.len() { if idx >= buf.len() {
return Err(ParseError::InvalidOptionLength); return Err(ParseError::InvalidOptionLength);
} }
length = buf[idx] as usize + 13; length = buf[idx] as usize + 13;
idx += 1; idx += 1;
} else if length == 14 { },
14 => {
if idx + 1 >= buf.len() { if idx + 1 >= buf.len() {
return Err(ParseError::InvalidOptionLength); return Err(ParseError::InvalidOptionLength);
} }
length = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize; length = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize;
idx += 2; idx += 2;
} else if length == 15 { },
15 => {
return Err(ParseError::InvalidOptionLength); return Err(ParseError::InvalidOptionLength);
} },
_ => {}
};
options_number += delta; options_number += delta;
...@@ -416,6 +436,26 @@ impl Packet { ...@@ -416,6 +436,26 @@ impl Packet {
} }
} }
/// Convert a request to a response
pub fn auto_response(request_packet: Packet) -> std::io::Result<Packet> {
let mut packet = Packet::new();
packet.header.set_version(1);
let response_type = match request_packet.header.get_type() {
PacketType::Confirmable => PacketType::Acknowledgement,
PacketType::NonConfirmable => PacketType::NonConfirmable,
_ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "request type error"))
};
packet.header.set_type(response_type);
packet.header.set_code("2.05");
packet.header.set_message_id(request_packet.header.get_message_id());
packet.set_token(request_packet.get_token().clone());
packet.payload = request_packet.payload;
Ok(packet)
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
...@@ -441,14 +481,14 @@ mod test { ...@@ -441,14 +481,14 @@ mod test {
let mut expected_uri_path = LinkedList::new(); let mut expected_uri_path = LinkedList::new();
expected_uri_path.push_back("Hi".as_bytes().to_vec()); expected_uri_path.push_back("Hi".as_bytes().to_vec());
expected_uri_path.push_back("Test".as_bytes().to_vec()); expected_uri_path.push_back("Test".as_bytes().to_vec());
assert_eq!(*uri_path, expected_uri_path); assert_eq!(uri_path, expected_uri_path);
let uri_query = packet.get_option(OptionType::UriQuery); let uri_query = packet.get_option(OptionType::UriQuery);
assert!(uri_query.is_some()); assert!(uri_query.is_some());
let uri_query = uri_query.unwrap(); let uri_query = uri_query.unwrap();
let mut expected_uri_query = LinkedList::new(); let mut expected_uri_query = LinkedList::new();
expected_uri_query.push_back("a=1".as_bytes().to_vec()); expected_uri_query.push_back("a=1".as_bytes().to_vec());
assert_eq!(*uri_query, expected_uri_query); assert_eq!(uri_query, expected_uri_query);
} }
#[test] #[test]
......
use std; use std;
use std::thread; use std::thread;
use std::net::ToSocketAddrs; use std::net::{ToSocketAddrs, SocketAddr};
use std::sync::mpsc; use std::sync::mpsc;
use mio::*; use mio::*;
use mio::udp::UdpSocket; use mio::udp::UdpSocket;
use packet::Packet; use packet::Packet;
use client::CoAPClient;
use threadpool::ThreadPool; use threadpool::ThreadPool;
const DEFAULT_WORKER_NUM: usize = 4; const DEFAULT_WORKER_NUM: usize = 4;
pub type TxQueue = mpsc::Sender<CoAPResponse>;
pub type RxQueue = mpsc::Receiver<CoAPResponse>;
#[derive(Debug)] #[derive(Debug)]
pub enum CoAPServerError { pub enum CoAPServerError {
...@@ -17,27 +18,35 @@ pub enum CoAPServerError { ...@@ -17,27 +18,35 @@ pub enum CoAPServerError {
AnotherHandlerIsRunning, AnotherHandlerIsRunning,
} }
#[derive(Debug)]
pub struct CoAPResponse {
pub address: SocketAddr,
pub response: Packet
}
pub trait CoAPHandler: Sync + Send + Copy { pub trait CoAPHandler: Sync + Send + Copy {
fn handle(&self, Packet, CoAPClient); fn handle(&self, Packet) -> Option<Packet>;
} }
impl<F> CoAPHandler for F where F: Fn(Packet, CoAPClient), F: Sync + Send + Copy { impl<F> CoAPHandler for F where F: Fn(Packet) -> Option<Packet>, F: Sync + Send + Copy {
fn handle(&self, request: Packet, response: CoAPClient) { fn handle(&self, request: Packet) -> Option<Packet> {
self(request, response); return self(request);
} }
} }
struct UdpHandler<H: CoAPHandler + 'static> { struct UdpHandler<H: CoAPHandler + 'static> {
socket: UdpSocket, socket: UdpSocket,
thread_pool: ThreadPool, thread_pool: ThreadPool,
tx_sender: TxQueue,
coap_handler: H coap_handler: H
} }
impl<H: CoAPHandler + 'static> UdpHandler<H> { impl<H: CoAPHandler + 'static> UdpHandler<H> {
fn new(socket: UdpSocket, thread_pool: ThreadPool, coap_handler: H) -> UdpHandler<H> { fn new(socket: UdpSocket, thread_pool: ThreadPool, tx_sender: TxQueue, coap_handler: H) -> UdpHandler<H> {
UdpHandler { UdpHandler {
socket: socket, socket: socket,
thread_pool: thread_pool, thread_pool: thread_pool,
tx_sender: tx_sender,
coap_handler: coap_handler coap_handler: coap_handler
} }
} }
...@@ -54,17 +63,27 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { ...@@ -54,17 +63,27 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
match self.socket.recv_from(&mut buf) { match self.socket.recv_from(&mut buf) {
Ok(Some((nread, src))) => { Ok(Some((nread, src))) => {
let response_q = self.tx_sender.clone();
self.thread_pool.execute(move || { self.thread_pool.execute(move || {
match Packet::from_bytes(&buf[..nread]) { match Packet::from_bytes(&buf[..nread]) {
Ok(packet) => { Ok(packet) => {
let client = CoAPClient::new(src).unwrap(); // Dispatch user handler, if there is a response packet
coap_handler.handle(packet, client); // send the reply via the TX thread
match coap_handler.handle(packet) {
Some(response) => {
response_q.send(CoAPResponse{
address: src,
response: response
}).unwrap();
},
None => {}
};
}, },
Err(_) => return Err(_) => return
}; };
}); });
}, },
_ => panic!("unexpected error"), _ => {}, //panic!("unexpected error"),
} }
} }
} }
...@@ -78,6 +97,7 @@ pub struct CoAPServer { ...@@ -78,6 +97,7 @@ pub struct CoAPServer {
socket: UdpSocket, socket: UdpSocket,
event_sender: Option<Sender<()>>, event_sender: Option<Sender<()>>,
event_thread: Option<thread::JoinHandle<()>>, event_thread: Option<thread::JoinHandle<()>>,
tx_thread: Option<thread::JoinHandle<()>>,
worker_num: usize, worker_num: usize,
} }
...@@ -91,6 +111,7 @@ impl CoAPServer { ...@@ -91,6 +111,7 @@ impl CoAPServer {
socket: s, socket: s,
event_sender: None, event_sender: None,
event_thread: None, event_thread: None,
tx_thread: None,
worker_num: DEFAULT_WORKER_NUM, worker_num: DEFAULT_WORKER_NUM,
})) }))
}, },
...@@ -99,15 +120,36 @@ impl CoAPServer { ...@@ -99,15 +120,36 @@ impl CoAPServer {
}) })
} }
/// Starts handling requests with the handler. /// Starts handling requests with the handler
pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> { pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> {
match self.event_sender { let socket;
None => {
// Early return error checking
if let Some(_) = self.event_sender {
return Err(CoAPServerError::AnotherHandlerIsRunning);
}
match self.socket.try_clone() {
Ok(good_socket) => {
socket = good_socket
},
Err(_) => {
return Err(CoAPServerError::NetworkError);
},
}
// Create resources
let worker_num = self.worker_num; let worker_num = self.worker_num;
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let socket = self.socket.try_clone(); let (tx_send, tx_recv) : (TxQueue, RxQueue) = mpsc::channel();
match socket { let tx_only = self.socket.try_clone().unwrap();
Ok(socket) => {
// Setup and spawn single TX thread
let tx_thread = thread::spawn(move || {
transmit_handler(tx_recv, tx_only);
});
// Setup and spawn event loop thread, which will spawn
// children threads which handle incomining requests
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let thread_pool = ThreadPool::new(worker_num); let thread_pool = ThreadPool::new(worker_num);
let mut event_loop = EventLoop::new().unwrap(); let mut event_loop = EventLoop::new().unwrap();
...@@ -115,23 +157,19 @@ impl CoAPServer { ...@@ -115,23 +157,19 @@ impl CoAPServer {
tx.send(event_loop.channel()).unwrap(); tx.send(event_loop.channel()).unwrap();
event_loop.run(&mut UdpHandler::new(socket, thread_pool, handler)).unwrap(); event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, handler)).unwrap();
}); });
// Ensure threads started successfully
match rx.recv() { match rx.recv() {
Ok(event_sender) => { Ok(event_sender) => {
self.event_sender = Some(event_sender); self.event_sender = Some(event_sender);
self.event_thread = Some(thread); self.event_thread = Some(thread);
self.tx_thread = Some(tx_thread);
Ok(()) Ok(())
}, },
Err(_) => Err(CoAPServerError::EventLoopError) Err(_) => Err(CoAPServerError::EventLoopError)
} }
},
Err(_) => Err(CoAPServerError::NetworkError),
}
},
Some(_) => Err(CoAPServerError::AnotherHandlerIsRunning),
}
} }
/// Stop the server. /// Stop the server.
...@@ -152,6 +190,28 @@ impl CoAPServer { ...@@ -152,6 +190,28 @@ impl CoAPServer {
} }
} }
fn transmit_handler(tx_recv: RxQueue, tx_only: UdpSocket) {
// Note! We should only transmit with this UDP Socket
// TODO: Add better support for failure detection or logging
loop {
match tx_recv.recv() {
Ok(q_res) => {
match q_res.response.to_bytes() {
Ok(bytes) => {
let _ = tx_only.send_to(&bytes[..], &q_res.address);
},
Err(_) => {}
}
},
// recv error occurs when all transmitters are terminited
// (when all UDP Handlers are closed)
Err(_) => {
break;
}
}
}
}
impl Drop for CoAPServer { impl Drop for CoAPServer {
fn drop(&mut self) { fn drop(&mut self) {
self.stop(); self.stop();
...@@ -162,15 +222,20 @@ impl Drop for CoAPServer { ...@@ -162,15 +222,20 @@ impl Drop for CoAPServer {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use packet::{Packet, PacketType, OptionType}; use packet::{Packet, PacketType, OptionType, auto_response};
use client::CoAPClient; use client::CoAPClient;
fn request_handler(req: Packet, resp: CoAPClient) { fn request_handler(req: Packet) -> Option<Packet> {
let uri_path = req.get_option(OptionType::UriPath); let uri_path_list = req.get_option(OptionType::UriPath).unwrap();
assert!(uri_path.is_some()); assert!(uri_path_list.len() == 1);
let uri_path = uri_path.unwrap();
resp.reply(&req, uri_path.front().unwrap().clone()).unwrap(); return match auto_response(req) {
Ok(mut response) => {
response.set_payload(uri_path_list.front().unwrap().clone());
Some(response)
},
Err(_) => None
};
} }
#[test] #[test]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment