Skip to content
Snippets Groups Projects
Commit e4c85478 authored by James Munns's avatar James Munns
Browse files

Run rust format. No functional change.

parent 45773bd2
Branches
Tags
No related merge requests found
...@@ -20,25 +20,27 @@ impl CoAPClient { ...@@ -20,25 +20,27 @@ impl CoAPClient {
match iter.next() { match iter.next() {
Some(SocketAddr::V4(a)) => { Some(SocketAddr::V4(a)) => {
UdpSocket::bind("0.0.0.0:0").and_then(|s| { UdpSocket::bind("0.0.0.0:0").and_then(|s| {
s.set_read_timeout(Some(Duration::new(DEFAULT_RECEIVE_TIMEOUT, 0))).and_then(|_| { s.set_read_timeout(Some(Duration::new(DEFAULT_RECEIVE_TIMEOUT, 0)))
.and_then(|_| {
Ok(CoAPClient { Ok(CoAPClient {
socket: s, socket: s,
peer_addr: SocketAddr::V4(a), peer_addr: SocketAddr::V4(a),
}) })
}) })
}) })
}, }
Some(SocketAddr::V6(a)) => { Some(SocketAddr::V6(a)) => {
UdpSocket::bind(":::0").and_then(|s| { UdpSocket::bind(":::0").and_then(|s| {
s.set_read_timeout(Some(Duration::new(DEFAULT_RECEIVE_TIMEOUT, 0))).and_then(|_| { s.set_read_timeout(Some(Duration::new(DEFAULT_RECEIVE_TIMEOUT, 0)))
.and_then(|_| {
Ok(CoAPClient { Ok(CoAPClient {
socket: s, socket: s,
peer_addr: SocketAddr::V6(a), peer_addr: SocketAddr::V6(a),
}) })
}) })
}) })
}, }
None => Err(Error::new(ErrorKind::Other, "no address")) None => Err(Error::new(ErrorKind::Other, "no address")),
} }
}) })
} }
...@@ -58,7 +60,7 @@ impl CoAPClient { ...@@ -58,7 +60,7 @@ impl CoAPClient {
let message_id = thread_rng().gen_range(0, num::pow(2u32, 16)) as u16; let message_id = thread_rng().gen_range(0, num::pow(2u32, 16)) as u16;
packet.header.set_message_id(message_id); packet.header.set_message_id(message_id);
let mut token: Vec<u8> = vec!(1, 1, 1, 1); let mut token: Vec<u8> = vec![1, 1, 1, 1];
for x in token.iter_mut() { for x in token.iter_mut() {
*x = random() *x = random()
} }
...@@ -66,7 +68,7 @@ impl CoAPClient { ...@@ -66,7 +68,7 @@ impl CoAPClient {
let domain = match url_params.domain() { let domain = match url_params.domain() {
Some(d) => d, Some(d) => d,
None => return Err(Error::new(ErrorKind::InvalidInput, "domain error")) None => return Err(Error::new(ErrorKind::InvalidInput, "domain error")),
}; };
let port = url_params.port_or_default().unwrap(); let port = url_params.port_or_default().unwrap();
...@@ -82,17 +84,17 @@ impl CoAPClient { ...@@ -82,17 +84,17 @@ impl CoAPClient {
try!(client.set_receive_timeout(timeout)); try!(client.set_receive_timeout(timeout));
match client.receive() { match client.receive() {
Ok(receive_packet) => { Ok(receive_packet) => {
if receive_packet.header.get_message_id() == message_id if receive_packet.header.get_message_id() == message_id &&
&& *receive_packet.get_token() == token { *receive_packet.get_token() == token {
return Ok(receive_packet) return Ok(receive_packet);
} else { } else {
return Err(Error::new(ErrorKind::Other, "receive invalid data")) return Err(Error::new(ErrorKind::Other, "receive invalid data"));
}
} }
}, Err(e) => Err(e),
Err(e) => Err(e)
} }
}, }
Err(_) => Err(Error::new(ErrorKind::InvalidInput, "url error")) Err(_) => Err(Error::new(ErrorKind::InvalidInput, "url error")),
} }
} }
...@@ -111,8 +113,8 @@ impl CoAPClient { ...@@ -111,8 +113,8 @@ impl CoAPClient {
} else { } else {
Err(Error::new(ErrorKind::Other, "send length error")) Err(Error::new(ErrorKind::Other, "send length error"))
} }
}, }
Err(_) => Err(Error::new(ErrorKind::InvalidInput, "packet error")) Err(_) => Err(Error::new(ErrorKind::InvalidInput, "packet error")),
} }
} }
...@@ -123,7 +125,7 @@ impl CoAPClient { ...@@ -123,7 +125,7 @@ impl CoAPClient {
let (nread, _src) = try!(self.socket.recv_from(&mut buf)); let (nread, _src) = try!(self.socket.recv_from(&mut buf));
match Packet::from_bytes(&buf[..nread]) { match Packet::from_bytes(&buf[..nread]) {
Ok(packet) => Ok(packet), Ok(packet) => Ok(packet),
Err(_) => Err(Error::new(ErrorKind::InvalidInput, "packet error")) Err(_) => Err(Error::new(ErrorKind::InvalidInput, "packet error")),
} }
} }
...@@ -165,7 +167,9 @@ mod test { ...@@ -165,7 +167,9 @@ mod test {
let mut server = CoAPServer::new("127.0.0.1:5684").unwrap(); let mut server = CoAPServer::new("127.0.0.1:5684").unwrap();
server.handle(request_handler).unwrap(); server.handle(request_handler).unwrap();
let error = CoAPClient::request_with_timeout("coap://127.0.0.1:5684/Rust", Some(Duration::new(1, 0))).unwrap_err(); let error = CoAPClient::request_with_timeout("coap://127.0.0.1:5684/Rust",
Some(Duration::new(1, 0)))
.unwrap_err();
if cfg!(windows) { if cfg!(windows) {
assert_eq!(error.kind(), ErrorKind::TimedOut); assert_eq!(error.kind(), ErrorKind::TimedOut);
} else { } else {
......
//! Implementation of the [CoAP Protocol][spec]. //! Implementation of the [CoAP Protocol][spec].
//! //!
//! This library provides both a client interface (`CoAPClient`) and a server interface (`CoAPServer`). //! This library provides both a client interface (`CoAPClient`)
//! and a server interface (`CoAPServer`).
//! //!
//! [spec]: https://tools.ietf.org/html/rfc7252 //! [spec]: https://tools.ietf.org/html/rfc7252
//! //!
...@@ -72,7 +73,8 @@ extern crate threadpool; ...@@ -72,7 +73,8 @@ extern crate threadpool;
extern crate url; extern crate url;
extern crate num; extern crate num;
extern crate rand; extern crate rand;
#[cfg(test)] extern crate quickcheck; #[cfg(test)]
extern crate quickcheck;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
......
...@@ -21,14 +21,14 @@ pub enum PacketType { ...@@ -21,14 +21,14 @@ pub enum PacketType {
pub struct PacketHeaderRaw { pub struct PacketHeaderRaw {
ver_type_tkl: u8, ver_type_tkl: u8,
code: u8, code: u8,
message_id: u16 message_id: u16,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct PacketHeader { pub struct PacketHeader {
ver_type_tkl: u8, ver_type_tkl: u8,
pub code: PacketClass, pub code: PacketClass,
message_id: u16 message_id: u16,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
...@@ -36,7 +36,7 @@ pub enum PacketClass { ...@@ -36,7 +36,7 @@ pub enum PacketClass {
Empty, Empty,
Request(Requests), Request(Requests),
Response(Responses), Response(Responses),
Reserved Reserved,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
...@@ -44,7 +44,7 @@ pub enum Requests { ...@@ -44,7 +44,7 @@ pub enum Requests {
Get, Get,
Post, Post,
Put, Put,
Delete Delete,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
...@@ -74,7 +74,7 @@ pub enum Responses { ...@@ -74,7 +74,7 @@ pub enum Responses {
BadGateway, BadGateway,
ServiceUnavailable, ServiceUnavailable,
GatewayTimeout, GatewayTimeout,
ProxyingNotSupported ProxyingNotSupported,
} }
pub fn class_to_code(class: &PacketClass) -> u8 { pub fn class_to_code(class: &PacketClass) -> u8 {
...@@ -111,7 +111,7 @@ pub fn class_to_code(class: &PacketClass) -> u8 { ...@@ -111,7 +111,7 @@ pub fn class_to_code(class: &PacketClass) -> u8 {
PacketClass::Response(Responses::ProxyingNotSupported) => 0x95, PacketClass::Response(Responses::ProxyingNotSupported) => 0x95,
_ => 0xFF, _ => 0xFF,
} as u8 } as u8;
} }
pub fn code_to_class(code: &u8) -> PacketClass { pub fn code_to_class(code: &u8) -> PacketClass {
...@@ -163,7 +163,6 @@ pub fn class_to_str(class: &PacketClass) -> String { ...@@ -163,7 +163,6 @@ pub fn class_to_str(class: &PacketClass) -> String {
} }
impl PacketHeader { impl PacketHeader {
pub fn new() -> PacketHeader { pub fn new() -> PacketHeader {
return PacketHeader::from_raw(&PacketHeaderRaw::default()); return PacketHeader::from_raw(&PacketHeaderRaw::default());
} }
...@@ -173,7 +172,7 @@ impl PacketHeader { ...@@ -173,7 +172,7 @@ impl PacketHeader {
ver_type_tkl: raw.ver_type_tkl, ver_type_tkl: raw.ver_type_tkl,
code: code_to_class(&raw.code), code: code_to_class(&raw.code),
message_id: raw.message_id, message_id: raw.message_id,
} };
} }
pub fn to_raw(&self) -> PacketHeaderRaw { pub fn to_raw(&self) -> PacketHeaderRaw {
...@@ -181,7 +180,7 @@ impl PacketHeader { ...@@ -181,7 +180,7 @@ impl PacketHeader {
ver_type_tkl: self.ver_type_tkl, ver_type_tkl: self.ver_type_tkl,
code: class_to_code(&self.code), code: class_to_code(&self.code),
message_id: self.message_id, message_id: self.message_id,
} };
} }
#[inline] #[inline]
...@@ -294,7 +293,7 @@ pub enum OptionType { ...@@ -294,7 +293,7 @@ pub enum OptionType {
Block1, Block1,
ProxyUri, ProxyUri,
ProxyScheme, ProxyScheme,
Size1 Size1,
} }
#[derive(Debug)] #[derive(Debug)]
...@@ -339,7 +338,7 @@ impl Packet { ...@@ -339,7 +338,7 @@ impl Packet {
Some(list) => { Some(list) => {
list.push_back(value); list.push_back(value);
return; return;
}, }
None => (), None => (),
}; };
...@@ -352,7 +351,7 @@ impl Packet { ...@@ -352,7 +351,7 @@ impl Packet {
let num = Self::get_option_number(tp); let num = Self::get_option_number(tp);
match self.options.get(&num) { match self.options.get(&num) {
Some(options) => Some(options.clone()), Some(options) => Some(options.clone()),
None => None None => None,
} }
} }
...@@ -398,15 +397,16 @@ impl Packet { ...@@ -398,15 +397,16 @@ impl Packet {
} }
delta = buf[idx] as usize + 13; delta = buf[idx] as usize + 13;
idx += 1; idx += 1;
}, }
14 => { 14 => {
if idx + 1 >= buf.len() { if idx + 1 >= buf.len() {
return Err(ParseError::InvalidOptionLength); return Err(ParseError::InvalidOptionLength);
} }
delta = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize; delta = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) +
269) as usize;
idx += 2; idx += 2;
}, }
15 => { 15 => {
return Err(ParseError::InvalidOptionDelta); return Err(ParseError::InvalidOptionDelta);
} }
...@@ -422,18 +422,19 @@ impl Packet { ...@@ -422,18 +422,19 @@ impl Packet {
length = buf[idx] as usize + 13; length = buf[idx] as usize + 13;
idx += 1; idx += 1;
}, }
14 => { 14 => {
if idx + 1 >= buf.len() { if idx + 1 >= buf.len() {
return Err(ParseError::InvalidOptionLength); return Err(ParseError::InvalidOptionLength);
} }
length = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize; length = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) +
269) as usize;
idx += 2; idx += 2;
}, }
15 => { 15 => {
return Err(ParseError::InvalidOptionLength); return Err(ParseError::InvalidOptionLength);
}, }
_ => {} _ => {}
}; };
...@@ -469,7 +470,7 @@ impl Packet { ...@@ -469,7 +470,7 @@ impl Packet {
options: options, options: options,
payload: payload, payload: payload,
}) })
}, }
Err(_) => Err(ParseError::InvalidHeader), Err(_) => Err(ParseError::InvalidHeader),
} }
} }
...@@ -522,8 +523,12 @@ impl Packet { ...@@ -522,8 +523,12 @@ impl Packet {
unsafe { unsafe {
use std::ptr; use std::ptr;
let buf_len = options_bytes.len(); let buf_len = options_bytes.len();
ptr::copy(header.as_ptr(), options_bytes.as_mut_ptr().offset(buf_len as isize), header.len()); ptr::copy(header.as_ptr(),
ptr::copy(value.as_ptr(), options_bytes.as_mut_ptr().offset((buf_len + header.len()) as isize), value.len()); options_bytes.as_mut_ptr().offset(buf_len as isize),
header.len());
ptr::copy(value.as_ptr(),
options_bytes.as_mut_ptr().offset((buf_len + header.len()) as isize),
value.len());
options_bytes.set_len(buf_len + header.len() + value.len()); options_bytes.set_len(buf_len + header.len() + value.len());
} }
} }
...@@ -540,15 +545,22 @@ impl Packet { ...@@ -540,15 +545,22 @@ impl Packet {
} }
let mut buf: Vec<u8> = Vec::with_capacity(buf_length); let mut buf: Vec<u8> = Vec::with_capacity(buf_length);
let header_result: bincode::EncodingResult<()> = bincode::encode_into(&self.header.to_raw(), &mut buf, bincode::SizeLimit::Infinite); let header_result: bincode::EncodingResult<()> =
bincode::encode_into(&self.header.to_raw(),
&mut buf,
bincode::SizeLimit::Infinite);
match header_result { match header_result {
Ok(_) => { Ok(_) => {
buf.reserve(self.token.len() + options_bytes.len()); buf.reserve(self.token.len() + options_bytes.len());
unsafe { unsafe {
use std::ptr; use std::ptr;
let buf_len = buf.len(); let buf_len = buf.len();
ptr::copy(self.token.as_ptr(), buf.as_mut_ptr().offset(buf_len as isize), self.token.len()); ptr::copy(self.token.as_ptr(),
ptr::copy(options_bytes.as_ptr(), buf.as_mut_ptr().offset((buf_len + self.token.len()) as isize), options_bytes.len()); buf.as_mut_ptr().offset(buf_len as isize),
self.token.len());
ptr::copy(options_bytes.as_ptr(),
buf.as_mut_ptr().offset((buf_len + self.token.len()) as isize),
options_bytes.len());
buf.set_len(buf_len + self.token.len() + options_bytes.len()); buf.set_len(buf_len + self.token.len() + options_bytes.len());
} }
...@@ -558,12 +570,14 @@ impl Packet { ...@@ -558,12 +570,14 @@ impl Packet {
unsafe { unsafe {
use std::ptr; use std::ptr;
let buf_len = buf.len(); let buf_len = buf.len();
ptr::copy(self.payload.as_ptr(), buf.as_mut_ptr().offset(buf.len() as isize), self.payload.len()); ptr::copy(self.payload.as_ptr(),
buf.as_mut_ptr().offset(buf.len() as isize),
self.payload.len());
buf.set_len(buf_len + self.payload.len()); buf.set_len(buf_len + self.payload.len());
} }
} }
Ok(buf) Ok(buf)
}, }
Err(_) => Err(PackageError::InvalidHeader), Err(_) => Err(PackageError::InvalidHeader),
} }
} }
...@@ -600,7 +614,7 @@ pub fn auto_response(request_packet: &Packet) -> Option<Packet> { ...@@ -600,7 +614,7 @@ pub fn auto_response(request_packet: &Packet) -> Option<Packet> {
let response_type = match request_packet.header.get_type() { let response_type = match request_packet.header.get_type() {
PacketType::Confirmable => PacketType::Acknowledgement, PacketType::Confirmable => PacketType::Acknowledgement,
PacketType::NonConfirmable => PacketType::NonConfirmable, PacketType::NonConfirmable => PacketType::NonConfirmable,
_ => return None _ => return None,
}; };
packet.header.set_type(response_type); packet.header.set_type(response_type);
packet.header.code = PacketClass::Response(Responses::Content); packet.header.code = PacketClass::Response(Responses::Content);
...@@ -635,7 +649,8 @@ mod test { ...@@ -635,7 +649,8 @@ mod test {
#[test] #[test]
fn test_decode_packet_with_options() { fn test_decode_packet_with_options() {
let buf = [0x44, 0x01, 0x84, 0x9e, 0x51, 0x55, 0x77, 0xe8, 0xb2, 0x48, 0x69, 0x04, 0x54, 0x65, 0x73, 0x74, 0x43, 0x61, 0x3d, 0x31]; let buf = [0x44, 0x01, 0x84, 0x9e, 0x51, 0x55, 0x77, 0xe8, 0xb2, 0x48, 0x69, 0x04, 0x54,
0x65, 0x73, 0x74, 0x43, 0x61, 0x3d, 0x31];
let packet = Packet::from_bytes(&buf); let packet = Packet::from_bytes(&buf);
assert!(packet.is_ok()); assert!(packet.is_ok());
let packet = packet.unwrap(); let packet = packet.unwrap();
...@@ -644,7 +659,7 @@ mod test { ...@@ -644,7 +659,7 @@ mod test {
assert_eq!(packet.header.get_token_length(), 4); assert_eq!(packet.header.get_token_length(), 4);
assert_eq!(packet.header.code, PacketClass::Request(Requests::Get)); assert_eq!(packet.header.code, PacketClass::Request(Requests::Get));
assert_eq!(packet.header.get_message_id(), 33950); assert_eq!(packet.header.get_message_id(), 33950);
assert_eq!(*packet.get_token(), vec!(0x51, 0x55, 0x77, 0xE8)); assert_eq!(*packet.get_token(), vec![0x51, 0x55, 0x77, 0xE8]);
assert_eq!(packet.options.len(), 2); assert_eq!(packet.options.len(), 2);
let uri_path = packet.get_option(OptionType::UriPath); let uri_path = packet.get_option(OptionType::UriPath);
...@@ -665,16 +680,18 @@ mod test { ...@@ -665,16 +680,18 @@ mod test {
#[test] #[test]
fn test_decode_packet_with_payload() { fn test_decode_packet_with_payload() {
let buf = [0x64, 0x45, 0x13, 0xFD, 0xD0, 0xE2, 0x4D, 0xAC, 0xFF, 0x48, 0x65, 0x6C, 0x6C, 0x6F]; let buf = [0x64, 0x45, 0x13, 0xFD, 0xD0, 0xE2, 0x4D, 0xAC, 0xFF, 0x48, 0x65, 0x6C, 0x6C,
0x6F];
let packet = Packet::from_bytes(&buf); let packet = Packet::from_bytes(&buf);
assert!(packet.is_ok()); assert!(packet.is_ok());
let packet = packet.unwrap(); let packet = packet.unwrap();
assert_eq!(packet.header.get_version(), 1); assert_eq!(packet.header.get_version(), 1);
assert_eq!(packet.header.get_type(), PacketType::Acknowledgement); assert_eq!(packet.header.get_type(), PacketType::Acknowledgement);
assert_eq!(packet.header.get_token_length(), 4); assert_eq!(packet.header.get_token_length(), 4);
assert_eq!(packet.header.code, PacketClass::Response(Responses::Content)); assert_eq!(packet.header.code,
PacketClass::Response(Responses::Content));
assert_eq!(packet.header.get_message_id(), 5117); assert_eq!(packet.header.get_message_id(), 5117);
assert_eq!(*packet.get_token(), vec!(0xD0, 0xE2, 0x4D, 0xAC)); assert_eq!(*packet.get_token(), vec![0xD0, 0xE2, 0x4D, 0xAC]);
assert_eq!(packet.payload, "Hello".as_bytes().to_vec()); assert_eq!(packet.payload, "Hello".as_bytes().to_vec());
} }
...@@ -685,11 +702,13 @@ mod test { ...@@ -685,11 +702,13 @@ mod test {
packet.header.set_type(PacketType::Confirmable); packet.header.set_type(PacketType::Confirmable);
packet.header.code = PacketClass::Request(Requests::Get); packet.header.code = PacketClass::Request(Requests::Get);
packet.header.set_message_id(33950); packet.header.set_message_id(33950);
packet.set_token(vec!(0x51, 0x55, 0x77, 0xE8)); packet.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
packet.add_option(OptionType::UriPath, b"Hi".to_vec()); packet.add_option(OptionType::UriPath, b"Hi".to_vec());
packet.add_option(OptionType::UriPath, b"Test".to_vec()); packet.add_option(OptionType::UriPath, b"Test".to_vec());
packet.add_option(OptionType::UriQuery, b"a=1".to_vec()); packet.add_option(OptionType::UriQuery, b"a=1".to_vec());
assert_eq!(packet.to_bytes().unwrap(), vec!(0x44, 0x01, 0x84, 0x9e, 0x51, 0x55, 0x77, 0xe8, 0xb2, 0x48, 0x69, 0x04, 0x54, 0x65, 0x73, 0x74, 0x43, 0x61, 0x3d, 0x31)); assert_eq!(packet.to_bytes().unwrap(),
vec![0x44, 0x01, 0x84, 0x9e, 0x51, 0x55, 0x77, 0xe8, 0xb2, 0x48, 0x69, 0x04,
0x54, 0x65, 0x73, 0x74, 0x43, 0x61, 0x3d, 0x31]);
} }
#[test] #[test]
...@@ -699,9 +718,11 @@ mod test { ...@@ -699,9 +718,11 @@ mod test {
packet.header.set_type(PacketType::Acknowledgement); packet.header.set_type(PacketType::Acknowledgement);
packet.header.code = PacketClass::Response(Responses::Content); packet.header.code = PacketClass::Response(Responses::Content);
packet.header.set_message_id(5117); packet.header.set_message_id(5117);
packet.set_token(vec!(0xD0, 0xE2, 0x4D, 0xAC)); packet.set_token(vec![0xD0, 0xE2, 0x4D, 0xAC]);
packet.payload = "Hello".as_bytes().to_vec(); packet.payload = "Hello".as_bytes().to_vec();
assert_eq!(packet.to_bytes().unwrap(), vec!(0x64, 0x45, 0x13, 0xFD, 0xD0, 0xE2, 0x4D, 0xAC, 0xFF, 0x48, 0x65, 0x6C, 0x6C, 0x6F)); assert_eq!(packet.to_bytes().unwrap(),
vec![0x64, 0x45, 0x13, 0xFD, 0xD0, 0xE2, 0x4D, 0xAC, 0xFF, 0x48, 0x65, 0x6C,
0x6C, 0x6F]);
} }
#[test] #[test]
...@@ -712,11 +733,15 @@ mod test { ...@@ -712,11 +733,15 @@ mod test {
fn run(x: Vec<u8>) -> TestResult { fn run(x: Vec<u8>) -> TestResult {
match Packet::from_bytes(&x[..]) { match Packet::from_bytes(&x[..]) {
Ok(packet) => { Ok(packet) => {
TestResult::from_bool(packet.get_token().len() == packet.header.get_token_length() as usize) TestResult::from_bool(packet.get_token().len() ==
}, packet.header.get_token_length() as usize)
Err(_) => TestResult::passed() }
Err(_) => TestResult::passed(),
} }
} }
QuickCheck::new().tests(10000).gen(StdGen::new(rand::thread_rng(), 1500)).quickcheck(run as fn(Vec<u8>) -> TestResult) QuickCheck::new()
.tests(10000)
.gen(StdGen::new(rand::thread_rng(), 1500))
.quickcheck(run as fn(Vec<u8>) -> TestResult)
} }
} }
...@@ -22,14 +22,17 @@ pub enum CoAPServerError { ...@@ -22,14 +22,17 @@ pub enum CoAPServerError {
#[derive(Debug)] #[derive(Debug)]
pub struct CoAPResponse { pub struct CoAPResponse {
pub address: SocketAddr, pub address: SocketAddr,
pub response: Packet pub response: Packet,
} }
pub trait CoAPHandler: Sync + Send + Copy { pub trait CoAPHandler: Sync + Send + Copy {
fn handle(&self, Packet, Option<Packet>) -> Option<Packet>; fn handle(&self, Packet, Option<Packet>) -> Option<Packet>;
} }
impl<F> CoAPHandler for F where F: Fn(Packet, Option<Packet>) -> Option<Packet>, F: Sync + Send + Copy { impl<F> CoAPHandler for F
where F: Fn(Packet, Option<Packet>) -> Option<Packet>,
F: Sync + Send + Copy
{
fn handle(&self, request: Packet, response: Option<Packet>) -> Option<Packet> { fn handle(&self, request: Packet, response: Option<Packet>) -> Option<Packet> {
return self(request, response); return self(request, response);
} }
...@@ -39,16 +42,20 @@ struct UdpHandler<H: CoAPHandler + 'static> { ...@@ -39,16 +42,20 @@ struct UdpHandler<H: CoAPHandler + 'static> {
socket: UdpSocket, socket: UdpSocket,
thread_pool: ThreadPool, thread_pool: ThreadPool,
tx_sender: TxQueue, tx_sender: TxQueue,
coap_handler: H coap_handler: H,
} }
impl<H: CoAPHandler + 'static> UdpHandler<H> { impl<H: CoAPHandler + 'static> UdpHandler<H> {
fn new(socket: UdpSocket, thread_pool: ThreadPool, tx_sender: TxQueue, coap_handler: H) -> UdpHandler<H> { fn new(socket: UdpSocket,
thread_pool: ThreadPool,
tx_sender: TxQueue,
coap_handler: H)
-> UdpHandler<H> {
UdpHandler { UdpHandler {
socket: socket, socket: socket,
thread_pool: thread_pool, thread_pool: thread_pool,
tx_sender: tx_sender, tx_sender: tx_sender,
coap_handler: coap_handler coap_handler: coap_handler,
} }
} }
} }
...@@ -82,25 +89,26 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> { ...@@ -82,25 +89,26 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
debug!("Response: {:?}", response); debug!("Response: {:?}", response);
response_q.send(CoAPResponse { response_q.send(CoAPResponse {
address: src, address: src,
response: response response: response,
}).unwrap(); })
}, .unwrap();
}
None => { None => {
debug!("No response"); debug!("No response");
} }
}; };
}, }
Err(_) => { Err(_) => {
error!("Failed to parse request"); error!("Failed to parse request");
return; return;
} }
}; };
}); });
}, }
_ => { _ => {
error!("Failed to read from socket"); error!("Failed to read from socket");
panic!("unexpected error"); panic!("unexpected error");
}, }
} }
} }
...@@ -125,15 +133,17 @@ impl CoAPServer { ...@@ -125,15 +133,17 @@ impl CoAPServer {
addr.to_socket_addrs().and_then(|mut iter| { addr.to_socket_addrs().and_then(|mut iter| {
match iter.next() { match iter.next() {
Some(ad) => { Some(ad) => {
UdpSocket::bound(&ad).and_then(|s| Ok(CoAPServer { UdpSocket::bound(&ad).and_then(|s| {
Ok(CoAPServer {
socket: s, socket: s,
event_sender: None, event_sender: None,
event_thread: None, event_thread: None,
tx_thread: None, tx_thread: None,
worker_num: DEFAULT_WORKER_NUM, worker_num: DEFAULT_WORKER_NUM,
})) })
}, })
None => Err(Error::new(ErrorKind::Other, "no address")) }
None => Err(Error::new(ErrorKind::Other, "no address")),
} }
}) })
} }
...@@ -148,13 +158,11 @@ impl CoAPServer { ...@@ -148,13 +158,11 @@ impl CoAPServer {
return Err(CoAPServerError::AnotherHandlerIsRunning); return Err(CoAPServerError::AnotherHandlerIsRunning);
} }
match self.socket.try_clone() { match self.socket.try_clone() {
Ok(good_socket) => { Ok(good_socket) => socket = good_socket,
socket = good_socket
},
Err(_) => { Err(_) => {
error!("Network Error!"); error!("Network Error!");
return Err(CoAPServerError::NetworkError); return Err(CoAPServerError::NetworkError);
}, }
} }
// Create resources // Create resources
...@@ -187,8 +195,8 @@ impl CoAPServer { ...@@ -187,8 +195,8 @@ impl CoAPServer {
self.event_thread = Some(thread); self.event_thread = Some(thread);
self.tx_thread = Some(tx_thread); self.tx_thread = Some(tx_thread);
Ok(()) Ok(())
}, }
Err(_) => Err(CoAPServerError::EventLoopError) Err(_) => Err(CoAPServerError::EventLoopError),
} }
} }
...@@ -199,8 +207,8 @@ impl CoAPServer { ...@@ -199,8 +207,8 @@ impl CoAPServer {
Some(ref sender) => { Some(ref sender) => {
sender.send(()).unwrap(); sender.send(()).unwrap();
self.event_thread.take().map(|g| g.join()); self.event_thread.take().map(|g| g.join());
}, }
_ => {}, _ => {}
} }
} }
...@@ -219,12 +227,12 @@ fn transmit_handler(tx_recv: RxQueue, tx_only: UdpSocket) { ...@@ -219,12 +227,12 @@ fn transmit_handler(tx_recv: RxQueue, tx_only: UdpSocket) {
match q_res.response.to_bytes() { match q_res.response.to_bytes() {
Ok(bytes) => { Ok(bytes) => {
let _ = tx_only.send_to(&bytes[..], &q_res.address); let _ = tx_only.send_to(&bytes[..], &q_res.address);
}, }
Err(_) => { Err(_) => {
error!("Failed to decode response"); error!("Failed to decode response");
} }
} }
}, }
// recv error occurs when all transmitters are terminited // recv error occurs when all transmitters are terminited
// (when all UDP Handlers are closed) // (when all UDP Handlers are closed)
Err(_) => { Err(_) => {
...@@ -256,8 +264,8 @@ mod test { ...@@ -256,8 +264,8 @@ mod test {
Some(mut packet) => { Some(mut packet) => {
packet.set_payload(uri_path_list.front().unwrap().clone()); packet.set_payload(uri_path_list.front().unwrap().clone());
Some(packet) Some(packet)
}, }
_ => None _ => None,
} }
} }
...@@ -272,7 +280,7 @@ mod test { ...@@ -272,7 +280,7 @@ mod test {
packet.header.set_type(PacketType::Confirmable); packet.header.set_type(PacketType::Confirmable);
packet.header.set_code("0.01"); packet.header.set_code("0.01");
packet.header.set_message_id(1); packet.header.set_message_id(1);
packet.set_token(vec!(0x51, 0x55, 0x77, 0xE8)); packet.set_token(vec![0x51, 0x55, 0x77, 0xE8]);
packet.add_option(OptionType::UriPath, b"test-echo".to_vec()); packet.add_option(OptionType::UriPath, b"test-echo".to_vec());
client.send(&packet).unwrap(); client.send(&packet).unwrap();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment