Skip to content
Snippets Groups Projects
Commit a8fa0e63 authored by Covertness's avatar Covertness Committed by GitHub
Browse files

Merge pull request #2 from jamesmunns/copper-support

Respond from the CoAP port to support Copper
parents e8905188 16256961
No related branches found
No related tags found
No related merge requests found
[package]
name = "coap"
version = "0.3.1"
version = "0.4.0"
description = "A CoAP library"
readme = "README.md"
documentation = "http://covertness.github.io/coap-rs/coap/index.html"
......@@ -17,6 +17,7 @@ threadpool = "0.1"
url = "0.2.36"
num = "0.1"
rand = "0.3"
log = "0.3"
[dev-dependencies]
quickcheck = "*"
Copyright (c) 2014 Sean McArthur
Copyright (c) 2014-2016 Yang Zhang
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
......
......@@ -17,7 +17,7 @@ First add this to your `Cargo.toml`:
```toml
[dependencies]
coap = "0.3"
coap = "0.4"
```
Then, add this to your crate root:
......@@ -36,8 +36,9 @@ use std::io;
use coap::packet::*;
use coap::{CoAPServer, CoAPClient};
fn request_handler(req: Packet, _resp: CoAPClient) {
fn request_handler(req: Packet, response: Option<Packet>) -> Option<Packet> {
println!("Receive request: {:?}", req);
response
}
fn main() {
......
......@@ -3,9 +3,16 @@ extern crate coap;
use coap::packet::*;
use coap::{CoAPServer, CoAPClient};
fn request_handler(req: Packet, resp: CoAPClient) {
fn request_handler(req: Packet, response: Option<Packet>) -> Option<Packet> {
let uri_path = req.get_option(OptionType::UriPath).unwrap();
resp.reply(&req, uri_path.front().unwrap().clone()).unwrap();
return match response {
Some(mut packet) => {
packet.set_payload(uri_path.front().unwrap().clone());
Some(packet)
},
_ => None
};
}
fn main() {
......
......@@ -2,11 +2,16 @@ extern crate coap;
use std::io;
use coap::packet::*;
use coap::{CoAPServer, CoAPClient};
use coap::CoAPServer;
fn request_handler(req: Packet, resp: CoAPClient) {
println!("Receive request: {:?}", req);
resp.reply(&req, b"OK".to_vec()).unwrap();
fn request_handler(_: Packet, response: Option<Packet>) -> Option<Packet> {
return match response {
Some(mut packet) => {
packet.set_payload(b"OK".to_vec());
Some(packet)
},
_ => None
};
}
fn main() {
......
......@@ -101,24 +101,6 @@ impl CoAPClient {
Self::request_with_timeout(url, Some(Duration::new(DEFAULT_RECEIVE_TIMEOUT, 0)))
}
/// Response the client with the specifc payload.
pub fn reply(&self, request_packet: &Packet, payload: Vec<u8>) -> Result<()> {
let mut packet = Packet::new();
packet.header.set_version(1);
let response_type = match request_packet.header.get_type() {
PacketType::Confirmable => PacketType::Acknowledgement,
PacketType::NonConfirmable => PacketType::NonConfirmable,
_ => return Err(Error::new(ErrorKind::InvalidInput, "request type error"))
};
packet.header.set_type(response_type);
packet.header.set_code("2.05");
packet.header.set_message_id(request_packet.header.get_message_id());
packet.set_token(request_packet.get_token().clone());
packet.payload = payload;
self.send(&packet)
}
/// Execute a request.
pub fn send(&self, packet: &Packet) -> Result<()> {
match packet.to_bytes() {
......@@ -164,7 +146,7 @@ mod test {
use super::*;
use std::time::Duration;
use std::io::ErrorKind;
use packet::{Packet, PacketType};
use packet::Packet;
use server::CoAPServer;
#[test]
......@@ -174,15 +156,8 @@ mod test {
assert!(CoAPClient::request("127.0.0.1").is_err());
}
#[test]
fn test_reply_error() {
let client = CoAPClient::new("127.0.0.1:5683").unwrap();
let mut packet = Packet::new();
packet.header.set_type(PacketType::Acknowledgement);
assert!(client.reply(&packet, b"Test".to_vec()).is_err());
}
fn request_handler(_: Packet, _: CoAPClient) {
fn request_handler(_: Packet, _:Option<Packet>) -> Option<Packet> {
None
}
#[test]
......
......@@ -29,8 +29,9 @@
//! use coap::packet::*;
//! use coap::{CoAPServer, CoAPClient};
//! fn request_handler(req: Packet, _resp: CoAPClient) {
//! fn request_handler(req: Packet, resp: Option<Packet>) -> Option<Packet> {
//! println!("Receive request: {:?}", req);
//! None
//! }
//! fn main() {
......@@ -73,6 +74,9 @@ extern crate num;
extern crate rand;
#[cfg(test)] extern crate quickcheck;
#[macro_use]
extern crate log;
pub use server::CoAPServer;
pub use client::CoAPClient;
......
......@@ -173,6 +173,10 @@ impl Packet {
self.options.insert(num, value);
}
pub fn set_payload(&mut self, payload: Vec<u8>){
self.payload = payload;
}
pub fn add_option(&mut self, tp: OptionType, value: Vec<u8>) {
let num = Self::get_option_number(tp);
match self.options.get_mut(&num) {
......@@ -188,9 +192,12 @@ impl Packet {
self.options.insert(num, list);
}
pub fn get_option(&self, tp: OptionType) -> Option<&LinkedList<Vec<u8>>> {
pub fn get_option(&self, tp: OptionType) -> Option<LinkedList<Vec<u8>>> {
let num = Self::get_option_number(tp);
return self.options.get(&num);
match self.options.get(&num) {
Some(options) => Some(options.clone()),
None => None
}
}
/// Decodes a byte slice and construct the equivalent Packet.
......@@ -226,40 +233,52 @@ impl Packet {
idx += 1;
if delta == 13 {
// Check for special delta characters
match delta {
13 => {
if idx >= buf.len() {
return Err(ParseError::InvalidOptionLength);
}
delta = buf[idx] as usize + 13;
idx += 1;
} else if delta == 14 {
},
14 => {
if idx + 1 >= buf.len() {
return Err(ParseError::InvalidOptionLength);
}
delta = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize;
idx += 2;
} else if delta == 15 {
},
15 => {
return Err(ParseError::InvalidOptionDelta);
}
_ => {}
};
if length == 13 {
// Check for special length characters
match length {
13 => {
if idx >= buf.len() {
return Err(ParseError::InvalidOptionLength);
}
length = buf[idx] as usize + 13;
idx += 1;
} else if length == 14 {
},
14 => {
if idx + 1 >= buf.len() {
return Err(ParseError::InvalidOptionLength);
}
length = (u16::from_be(u8_to_unsigned_be!(buf, idx, idx + 1, u16)) + 269) as usize;
idx += 2;
} else if length == 15 {
},
15 => {
return Err(ParseError::InvalidOptionLength);
}
},
_ => {}
};
options_number += delta;
......@@ -416,6 +435,26 @@ impl Packet {
}
}
/// Convert a request to a response
pub fn auto_response(request_packet: &Packet) -> Option<Packet> {
let mut packet = Packet::new();
packet.header.set_version(1);
let response_type = match request_packet.header.get_type() {
PacketType::Confirmable => PacketType::Acknowledgement,
PacketType::NonConfirmable => PacketType::NonConfirmable,
_ => return None
};
packet.header.set_type(response_type);
packet.header.set_code("2.05");
packet.header.set_message_id(request_packet.header.get_message_id());
packet.set_token(request_packet.get_token().clone());
packet.payload = request_packet.payload.clone();
Some(packet)
}
#[cfg(test)]
mod test {
use super::*;
......@@ -441,14 +480,14 @@ mod test {
let mut expected_uri_path = LinkedList::new();
expected_uri_path.push_back("Hi".as_bytes().to_vec());
expected_uri_path.push_back("Test".as_bytes().to_vec());
assert_eq!(*uri_path, expected_uri_path);
assert_eq!(uri_path, expected_uri_path);
let uri_query = packet.get_option(OptionType::UriQuery);
assert!(uri_query.is_some());
let uri_query = uri_query.unwrap();
let mut expected_uri_query = LinkedList::new();
expected_uri_query.push_back("a=1".as_bytes().to_vec());
assert_eq!(*uri_query, expected_uri_query);
assert_eq!(uri_query, expected_uri_query);
}
#[test]
......
use std;
use std::io::{Error, ErrorKind};
use std::thread;
use std::net::ToSocketAddrs;
use std::net::{ToSocketAddrs, SocketAddr};
use std::sync::mpsc;
use mio::*;
use mio::{EventLoop, PollOpt, EventSet, Handler, Sender, Token};
use mio::udp::UdpSocket;
use packet::Packet;
use client::CoAPClient;
use packet::{Packet, auto_response};
use threadpool::ThreadPool;
const DEFAULT_WORKER_NUM: usize = 4;
pub type TxQueue = mpsc::Sender<CoAPResponse>;
pub type RxQueue = mpsc::Receiver<CoAPResponse>;
#[derive(Debug)]
pub enum CoAPServerError {
......@@ -17,27 +19,35 @@ pub enum CoAPServerError {
AnotherHandlerIsRunning,
}
#[derive(Debug)]
pub struct CoAPResponse {
pub address: SocketAddr,
pub response: Packet
}
pub trait CoAPHandler: Sync + Send + Copy {
fn handle(&self, Packet, CoAPClient);
fn handle(&self, Packet, Option<Packet>) -> Option<Packet>;
}
impl<F> CoAPHandler for F where F: Fn(Packet, CoAPClient), F: Sync + Send + Copy {
fn handle(&self, request: Packet, response: CoAPClient) {
self(request, response);
impl<F> CoAPHandler for F where F: Fn(Packet, Option<Packet>) -> Option<Packet>, F: Sync + Send + Copy {
fn handle(&self, request: Packet, response: Option<Packet>) -> Option<Packet> {
return self(request, response);
}
}
struct UdpHandler<H: CoAPHandler + 'static> {
socket: UdpSocket,
thread_pool: ThreadPool,
tx_sender: TxQueue,
coap_handler: H
}
impl<H: CoAPHandler + 'static> UdpHandler<H> {
fn new(socket: UdpSocket, thread_pool: ThreadPool, coap_handler: H) -> UdpHandler<H> {
fn new(socket: UdpSocket, thread_pool: ThreadPool, tx_sender: TxQueue, coap_handler: H) -> UdpHandler<H> {
UdpHandler {
socket: socket,
thread_pool: thread_pool,
tx_sender: tx_sender,
coap_handler: coap_handler
}
}
......@@ -48,28 +58,55 @@ impl<H: CoAPHandler + 'static> Handler for UdpHandler<H> {
type Message = ();
fn ready(&mut self, _: &mut EventLoop<UdpHandler<H>>, _: Token, events: EventSet) {
if events.is_readable() {
if !events.is_readable() {
warn!("Unreadable Event");
return;
}
let coap_handler = self.coap_handler;
let mut buf = [0; 1500];
match self.socket.recv_from(&mut buf) {
Ok(Some((nread, src))) => {
debug!("Handling request from {}", src);
let response_q = self.tx_sender.clone();
self.thread_pool.execute(move || {
match Packet::from_bytes(&buf[..nread]) {
Ok(packet) => {
let client = CoAPClient::new(src).unwrap();
coap_handler.handle(packet, client);
// Pre-generate a response
let auto_resp = auto_response(&packet);
// Dispatch user handler, if there is a response packet
// send the reply via the TX thread
match coap_handler.handle(packet, auto_resp) {
Some(response) => {
debug!("Response: {:?}", response);
response_q.send(CoAPResponse{
address: src,
response: response
}).unwrap();
},
Err(_) => return
None => {
debug!("No response");
}
};
});
},
_ => panic!("unexpected error"),
Err(_) => {
error!("Failed to parse request");
return;
}
};
});
},
_ => {
error!("Failed to read from socket");
panic!("unexpected error");
},
}
}
fn notify(&mut self, event_loop: &mut EventLoop<UdpHandler<H>>, _: ()) {
info!("Shutting down request handler");
event_loop.shutdown();
}
}
......@@ -78,6 +115,7 @@ pub struct CoAPServer {
socket: UdpSocket,
event_sender: Option<Sender<()>>,
event_thread: Option<thread::JoinHandle<()>>,
tx_thread: Option<thread::JoinHandle<()>>,
worker_num: usize,
}
......@@ -91,23 +129,47 @@ impl CoAPServer {
socket: s,
event_sender: None,
event_thread: None,
tx_thread: None,
worker_num: DEFAULT_WORKER_NUM,
}))
},
None => Err(std::io::Error::new(std::io::ErrorKind::Other, "no address"))
None => Err(Error::new(ErrorKind::Other, "no address"))
}
})
}
/// Starts handling requests with the handler.
/// Starts handling requests with the handler
pub fn handle<H: CoAPHandler + 'static>(&mut self, handler: H) -> Result<(), CoAPServerError> {
match self.event_sender {
None => {
let socket;
// Early return error checking
if let Some(_) = self.event_sender {
error!("Handler already running!");
return Err(CoAPServerError::AnotherHandlerIsRunning);
}
match self.socket.try_clone() {
Ok(good_socket) => {
socket = good_socket
},
Err(_) => {
error!("Network Error!");
return Err(CoAPServerError::NetworkError);
},
}
// Create resources
let worker_num = self.worker_num;
let (tx, rx) = mpsc::channel();
let socket = self.socket.try_clone();
match socket {
Ok(socket) => {
let (tx_send, tx_recv) : (TxQueue, RxQueue) = mpsc::channel();
let tx_only = self.socket.try_clone().unwrap();
// Setup and spawn single TX thread
let tx_thread = thread::spawn(move || {
transmit_handler(tx_recv, tx_only);
});
// Setup and spawn event loop thread, which will spawn
// children threads which handle incomining requests
let thread = thread::spawn(move || {
let thread_pool = ThreadPool::new(worker_num);
let mut event_loop = EventLoop::new().unwrap();
......@@ -115,23 +177,19 @@ impl CoAPServer {
tx.send(event_loop.channel()).unwrap();
event_loop.run(&mut UdpHandler::new(socket, thread_pool, handler)).unwrap();
event_loop.run(&mut UdpHandler::new(socket, thread_pool, tx_send, handler)).unwrap();
});
// Ensure threads started successfully
match rx.recv() {
Ok(event_sender) => {
self.event_sender = Some(event_sender);
self.event_thread = Some(thread);
self.tx_thread = Some(tx_thread);
Ok(())
},
Err(_) => Err(CoAPServerError::EventLoopError)
}
},
Err(_) => Err(CoAPServerError::NetworkError),
}
},
Some(_) => Err(CoAPServerError::AnotherHandlerIsRunning),
}
}
/// Stop the server.
......@@ -152,6 +210,31 @@ impl CoAPServer {
}
}
fn transmit_handler(tx_recv: RxQueue, tx_only: UdpSocket) {
// Note! We should only transmit with this UDP Socket
// TODO: Add better support for failure detection or logging
loop {
match tx_recv.recv() {
Ok(q_res) => {
match q_res.response.to_bytes() {
Ok(bytes) => {
let _ = tx_only.send_to(&bytes[..], &q_res.address);
},
Err(_) => {
error!("Failed to decode response");
}
}
},
// recv error occurs when all transmitters are terminited
// (when all UDP Handlers are closed)
Err(_) => {
info!("Shutting down Transmit Handler");
break;
}
}
}
}
impl Drop for CoAPServer {
fn drop(&mut self) {
self.stop();
......@@ -165,12 +248,17 @@ mod test {
use packet::{Packet, PacketType, OptionType};
use client::CoAPClient;
fn request_handler(req: Packet, resp: CoAPClient) {
let uri_path = req.get_option(OptionType::UriPath);
assert!(uri_path.is_some());
let uri_path = uri_path.unwrap();
fn request_handler(req: Packet, response: Option<Packet>) -> Option<Packet> {
let uri_path_list = req.get_option(OptionType::UriPath).unwrap();
assert!(uri_path_list.len() == 1);
resp.reply(&req, uri_path.front().unwrap().clone()).unwrap();
match response {
Some(mut packet) => {
packet.set_payload(uri_path_list.front().unwrap().clone());
Some(packet)
},
_ => None
}
}
#[test]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment