diff --git a/.travis.yml b/.travis.yml index 2e740cfad0ae837823fdf62021aea3a5e726e33f..99758624ca3bedf7a1bf9e351e290d2cff858366 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,8 @@ matrix: rust: nightly - env: TARGET=thumbv6m-none-eabi - rust: nightly + # work around rust-lang/rust#45802 + rust: nightly-2017-11-01 addons: apt: sources: diff --git a/Cargo.toml b/Cargo.toml index 13b4ab795a7a857400a51d97e63d8c8e71b604e3..983d302e07b1b587a426c74961a3b0bdca904914 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,6 @@ version = "0.2.0" [dependencies] untagged-option = "0.1.1" + +[dev-dependencies] +scoped_threadpool = "0.1.8" diff --git a/blacklist.txt b/blacklist.txt new file mode 100644 index 0000000000000000000000000000000000000000..bf051e53d43c7b2770a383860ce0b20c0a5e929d --- /dev/null +++ b/blacklist.txt @@ -0,0 +1,7 @@ +# false positives in thread::spawn (?) +race:*dealloc +race:*drop_slow* +race:__call_tls_dtors + +# false positives in scoped_threadpool (?) +race:*drop* diff --git a/ci/install.sh b/ci/install.sh index 4d5d56a45bb9563ed35d06665575322b4c392172..8724552da22f53a78bf4aed3b94a545f0c0ee0ad 100644 --- a/ci/install.sh +++ b/ci/install.sh @@ -11,7 +11,11 @@ main() { rustup component list | grep 'rust-src.*installed' || \ rustup component add rust-src ;; + x86_64-unknown-linux-gnu) + ;; *) + # unhandled case + exit 1 ;; esac } diff --git a/ci/script.sh b/ci/script.sh index e9ff7e38b9f350574d31a079eeb684e88a3f6d6c..9137e63b6d09fb8427c4e30f657e69667805f096 100644 --- a/ci/script.sh +++ b/ci/script.sh @@ -5,8 +5,23 @@ main() { thumb*m-none-eabi) xargo check --target $TARGET ;; - *) + x86_64-unknown-linux-gnu) cargo check --target $TARGET + + cargo test --target $TARGET + cargo test --target $TARGET --release + + export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt" + export RUSTFLAGS="-Z sanitizer=thread" + + cargo test --test tsan --target $TARGET + cargo test --test tsan --target $TARGET --release + ;; + *) + # unhandled case + exit 1 ;; esac } + +main diff --git a/src/cfail.rs b/src/cfail.rs new file mode 100644 index 0000000000000000000000000000000000000000..50c3a920747f2bfd3a756decc7da33cbb42a0ce9 --- /dev/null +++ b/src/cfail.rs @@ -0,0 +1,80 @@ +//! Compile fail tests +//! +//! # `Send`-ness +//! +//! Collections of `Send`-able things are `Send` +//! +//! ``` +//! use heapless::{RingBuffer, Vec}; +//! use heapless::ring_buffer::{Consumer, Producer}; +//! +//! struct IsSend; +//! +//! unsafe impl Send for IsSend {} +//! +//! fn is_send<T>() where T: Send {} +//! +//! is_send::<Consumer<IsSend, [IsSend; 4]>>(); +//! is_send::<Producer<IsSend, [IsSend; 4]>>(); +//! is_send::<RingBuffer<IsSend, [IsSend; 4]>>(); +//! is_send::<Vec<IsSend, [IsSend; 4]>>(); +//! ``` +//! +//! Collections of non-`Send`-able things are *not* `Send` +//! +//! ``` compile_fail +//! use std::marker::PhantomData; +//! use heapless::ring_buffer::Consumer; +//! +//! type NotSend = PhantomData<*const ()>; +//! +//! fn is_send<T>() where T: Send {} +//! +//! is_send::<Consumer<NotSend, [NotSend; 4]>>(); +//! ``` +//! +//! ``` compile_fail +//! use std::marker::PhantomData; +//! use heapless::ring_buffer::Producer; +//! +//! type NotSend = PhantomData<*const ()>; +//! +//! fn is_send<T>() where T: Send {} +//! +//! is_send::<Producer<NotSend, [NotSend; 4]>>(); +//! ``` +//! +//! ``` compile_fail +//! use std::marker::PhantomData; +//! use heapless::RingBuffer; +//! +//! type NotSend = PhantomData<*const ()>; +//! +//! fn is_send<T>() where T: Send {} +//! +//! is_send::<RingBuffer<NotSend, [NotSend; 4]>>(); +//! ``` +//! +//! ``` compile_fail +//! use std::marker::PhantomData; +//! use heapless::Vec; +//! +//! type NotSend = PhantomData<*const ()>; +//! +//! fn is_send<T>() where T: Send {} +//! +//! is_send::<Vec<NotSend, [NotSend; 4]>>(); +//! ``` +//! +//! # Freeze +//! +//! Splitting a `RingBuffer` should invalidate the original reference. +//! +//! ``` compile_fail +//! use heapless::RingBuffer; +//! +//! let mut rb: RingBuffer<u8, [u8; 4]> = RingBuffer::new(); +//! +//! let (p, c) = rb.split(); +//! rb.enqueue(0).unwrap(); +//! ``` diff --git a/src/lib.rs b/src/lib.rs index c6228e5357c2c0386b28c573bd0c9c0e79c4a328..f36ef00d20d5b6e5c5c10280dae62fab1e5afb00 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,8 +36,6 @@ //! //! ### Single producer single consumer mode //! -//! For use in *single core* systems like microcontrollers -//! //! ``` //! use heapless::RingBuffer; //! @@ -77,75 +75,11 @@ //! // .. //! } //! ``` -//! -//! # `Send`-ness -//! -//! Collections of `Send`-able things are `Send` -//! -//! ``` -//! use heapless::{RingBuffer, Vec}; -//! use heapless::ring_buffer::{Consumer, Producer}; -//! -//! struct IsSend; -//! -//! unsafe impl Send for IsSend {} -//! -//! fn is_send<T>() where T: Send {} -//! -//! is_send::<Consumer<IsSend, [IsSend; 4]>>(); -//! is_send::<Producer<IsSend, [IsSend; 4]>>(); -//! is_send::<RingBuffer<IsSend, [IsSend; 4]>>(); -//! is_send::<Vec<IsSend, [IsSend; 4]>>(); -//! ``` -//! -//! Collections of not `Send`-able things are *not* `Send` -//! -//! ``` compile_fail -//! use std::marker::PhantomData; -//! use heapless::ring_buffer::Consumer; -//! -//! type NotSend = PhantomData<*const ()>; -//! -//! fn is_send<T>() where T: Send {} -//! -//! is_send::<Consumer<NotSend, [NotSend; 4]>>(); -//! ``` -//! -//! ``` compile_fail -//! use std::marker::PhantomData; -//! use heapless::ring_buffer::Producer; -//! -//! type NotSend = PhantomData<*const ()>; -//! -//! fn is_send<T>() where T: Send {} -//! -//! is_send::<Producer<NotSend, [NotSend; 4]>>(); -//! ``` -//! -//! ``` compile_fail -//! use std::marker::PhantomData; -//! use heapless::RingBuffer; -//! -//! type NotSend = PhantomData<*const ()>; -//! -//! fn is_send<T>() where T: Send {} -//! -//! is_send::<RingBuffer<NotSend, [NotSend; 4]>>(); -//! ``` -//! -//! ``` compile_fail -//! use std::marker::PhantomData; -//! use heapless::Vec; -//! -//! type NotSend = PhantomData<*const ()>; -//! -//! fn is_send<T>() where T: Send {} -//! -//! is_send::<Vec<NotSend, [NotSend; 4]>>(); -//! ``` #![deny(missing_docs)] #![feature(const_fn)] +#![feature(const_unsafe_cell_new)] +#![feature(core_intrinsics)] #![feature(shared)] #![feature(unsize)] #![no_std] @@ -155,8 +89,9 @@ extern crate untagged_option; pub use vec::Vec; pub use ring_buffer::RingBuffer; -pub mod ring_buffer; +mod cfail; mod vec; +pub mod ring_buffer; /// Error raised when the buffer is full #[derive(Clone, Copy, Debug, Eq, PartialEq)] diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 2f9cb62193c5f31ac412f5dec6e04978debbc783..3f53646d2ad1173f93c1d8cf236cdb1bbcca0a83 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -1,7 +1,8 @@ //! Ring buffer +use core::cell::UnsafeCell; use core::marker::{PhantomData, Unsize}; -use core::ptr; +use core::{intrinsics, ptr}; use untagged_option::UntaggedOption; @@ -11,6 +12,32 @@ pub use self::spsc::{Consumer, Producer}; mod spsc; +// AtomicUsize with no CAS operations that works on targets that have "no atomic support" according +// to their specification +struct AtomicUsize { + v: UnsafeCell<usize>, +} + +impl AtomicUsize { + pub const fn new(v: usize) -> AtomicUsize { + AtomicUsize { + v: UnsafeCell::new(v), + } + } + + pub fn get_mut(&mut self) -> &mut usize { + unsafe { &mut *self.v.get() } + } + + pub fn load_relaxed(&self) -> usize { + unsafe { intrinsics::atomic_load_relaxed(self.v.get()) } + } + + pub fn store_release(&self, val: usize) { + unsafe { intrinsics::atomic_store_rel(self.v.get(), val) } + } +} + /// An statically allocated ring buffer backed by an array `A` pub struct RingBuffer<T, A> where @@ -18,11 +45,14 @@ where A: Unsize<[T]>, { _marker: PhantomData<[T]>, - buffer: UntaggedOption<A>, + // this is from where we dequeue items - head: usize, + head: AtomicUsize, + // this is where we enqueue new items - tail: usize, + tail: AtomicUsize, + + buffer: UntaggedOption<A>, } impl<T, A> RingBuffer<T, A> @@ -35,8 +65,8 @@ where RingBuffer { _marker: PhantomData, buffer: UntaggedOption::none(), - head: 0, - tail: 0, + head: AtomicUsize::new(0), + tail: AtomicUsize::new(0), } } @@ -49,11 +79,15 @@ where /// Returns the item in the front of the queue, or `None` if the queue is empty pub fn dequeue(&mut self) -> Option<T> { let n = self.capacity() + 1; + + let head = self.head.get_mut(); + let tail = self.tail.get_mut(); + let buffer: &[T] = unsafe { self.buffer.as_ref() }; - if self.head != self.tail { - let item = unsafe { ptr::read(buffer.get_unchecked(self.head)) }; - self.head = (self.head + 1) % n; + if *head != *tail { + let item = unsafe { ptr::read(buffer.get_unchecked(*head)) }; + *head = (*head + 1) % n; Some(item) } else { None @@ -65,14 +99,18 @@ where /// Returns `BufferFullError` if the queue is full pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { let n = self.capacity() + 1; + + let head = self.head.get_mut(); + let tail = self.tail.get_mut(); + let buffer: &mut [T] = unsafe { self.buffer.as_mut() }; - let next_tail = (self.tail + 1) % n; - if next_tail != self.head { + let next_tail = (*tail + 1) % n; + if next_tail != *head { // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory - unsafe { ptr::write(buffer.get_unchecked_mut(self.tail), item) } - self.tail = next_tail; + unsafe { ptr::write(buffer.get_unchecked_mut(*tail), item) } + *tail = next_tail; Ok(()) } else { Err(BufferFullError) @@ -81,10 +119,13 @@ where /// Returns the number of elements in the queue pub fn len(&self) -> usize { - if self.head > self.tail { - self.head - self.tail + let head = self.head.load_relaxed(); + let tail = self.tail.load_relaxed(); + + if head > tail { + head - tail } else { - self.tail - self.head + tail - head } } @@ -176,9 +217,11 @@ where fn next(&mut self) -> Option<&'a T> { if self.index < self.len { + let head = self.rb.head.load_relaxed(); + let buffer: &[T] = unsafe { self.rb.buffer.as_ref() }; let ptr = buffer.as_ptr(); - let i = (self.rb.head + self.index) % (self.rb.capacity() + 1); + let i = (head + self.index) % (self.rb.capacity() + 1); self.index += 1; Some(unsafe { &*ptr.offset(i as isize) }) } else { @@ -196,10 +239,12 @@ where fn next(&mut self) -> Option<&'a mut T> { if self.index < self.len { + let head = self.rb.head.load_relaxed(); + let capacity = self.rb.capacity() + 1; let buffer: &mut [T] = unsafe { self.rb.buffer.as_mut() }; let ptr: *mut T = buffer.as_mut_ptr(); - let i = (self.rb.head + self.index) % capacity; + let i = (head + self.index) % capacity; self.index += 1; Some(unsafe { &mut *ptr.offset(i as isize) }) } else { diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 5c6fa0ed0ca103695b64ac9275f317fb98d4c37f..488c07a87a91a8d34f90a0bf0f55536c4dc2dd5d 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -1,5 +1,5 @@ use core::ptr::{self, Shared}; -use core::marker::Unsize; +use core::marker::{PhantomData, Unsize}; use BufferFullError; use ring_buffer::RingBuffer; @@ -9,16 +9,15 @@ where A: Unsize<[T]>, { /// Splits a statically allocated ring buffer into producer and consumer end points - /// - /// *Warning* the current implementation only supports single core processors. It's also fine to - /// use both end points on the same core of a multi-core processor. - pub fn split(&'static mut self) -> (Producer<T, A>, Consumer<T, A>) { + pub fn split(&mut self) -> (Producer<T, A>, Consumer<T, A>) { ( Producer { rb: unsafe { Shared::new_unchecked(self) }, + _marker: PhantomData, }, Consumer { rb: unsafe { Shared::new_unchecked(self) }, + _marker: PhantomData, }, ) } @@ -26,29 +25,31 @@ where /// A ring buffer "consumer"; it can dequeue items from the ring buffer // NOTE the consumer semantically owns the `head` pointer of the ring buffer -pub struct Consumer<T, A> +pub struct Consumer<'a, T, A> where A: Unsize<[T]>, { // XXX do we need to use `Shared` (for soundness) here? rb: Shared<RingBuffer<T, A>>, + _marker: PhantomData<&'a ()>, } -impl<T, A> Consumer<T, A> +impl<'a, T, A> Consumer<'a, T, A> where A: Unsize<[T]>, { /// Returns the item in the front of the queue, or `None` if the queue is empty pub fn dequeue(&mut self) -> Option<T> { - let rb = unsafe { self.rb.as_mut() }; + let rb = unsafe { self.rb.as_ref() }; + let n = rb.capacity() + 1; let buffer: &[T] = unsafe { rb.buffer.as_ref() }; - // NOTE(volatile) the value of `tail` can change at any time in the execution context of the - // consumer so we inform this to the compiler using a volatile load - if rb.head != unsafe { ptr::read_volatile(&rb.tail) } { - let item = unsafe { ptr::read(buffer.get_unchecked(rb.head)) }; - rb.head = (rb.head + 1) % n; + let tail = rb.tail.load_relaxed(); + let head = rb.head.load_relaxed(); + if head != tail { + let item = unsafe { ptr::read(buffer.get_unchecked(head)) }; + rb.head.store_release((head + 1) % n); Some(item) } else { None @@ -56,7 +57,7 @@ where } } -unsafe impl<T, A> Send for Consumer<T, A> +unsafe impl<'a, T, A> Send for Consumer<'a, T, A> where A: Unsize<[T]>, T: Send, @@ -65,15 +66,16 @@ where /// A ring buffer "producer"; it can enqueue items into the ring buffer // NOTE the producer semantically owns the `tail` pointer of the ring buffer -pub struct Producer<T, A> +pub struct Producer<'a, T, A> where A: Unsize<[T]>, { // XXX do we need to use `Shared` (for soundness) here? rb: Shared<RingBuffer<T, A>>, + _marker: PhantomData<&'a ()>, } -impl<T, A> Producer<T, A> +impl<'a, T, A> Producer<'a, T, A> where A: Unsize<[T]>, { @@ -82,17 +84,18 @@ where /// Returns `BufferFullError` if the queue is full pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { let rb = unsafe { self.rb.as_mut() }; + let n = rb.capacity() + 1; let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; - let next_tail = (rb.tail + 1) % n; - // NOTE(volatile) the value of `head` can change at any time in the execution context of the - // producer so we inform this to the compiler using a volatile load - if next_tail != unsafe { ptr::read_volatile(&rb.head) } { + let head = rb.head.load_relaxed(); + let tail = rb.tail.load_relaxed(); + let next_tail = (tail + 1) % n; + if next_tail != head { // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory - unsafe { ptr::write(buffer.get_unchecked_mut(rb.tail), item) } - rb.tail = next_tail; + unsafe { ptr::write(buffer.get_unchecked_mut(tail), item) } + rb.tail.store_release(next_tail); Ok(()) } else { Err(BufferFullError) @@ -100,7 +103,7 @@ where } } -unsafe impl<T, A> Send for Producer<T, A> +unsafe impl<'a, T, A> Send for Producer<'a, T, A> where A: Unsize<[T]>, T: Send, diff --git a/tests/tsan.rs b/tests/tsan.rs new file mode 100644 index 0000000000000000000000000000000000000000..6680cd248500fb6a910f18e3c992541e434f58fc --- /dev/null +++ b/tests/tsan.rs @@ -0,0 +1,75 @@ +#![deny(warnings)] + +extern crate heapless; +extern crate scoped_threadpool; + +use std::thread; + +use heapless::RingBuffer; +use scoped_threadpool::Pool; + +#[test] +fn once() { + static mut RB: RingBuffer<i32, [i32; 4]> = RingBuffer::new(); + + let rb = unsafe { &mut RB }; + + rb.enqueue(0).unwrap(); + + let (mut p, mut c) = rb.split(); + + p.enqueue(1).unwrap(); + + thread::spawn(move || { + p.enqueue(1).unwrap(); + }); + + thread::spawn(move || { + c.dequeue().unwrap(); + }); +} + +#[test] +fn twice() { + static mut RB: RingBuffer<i32, [i32; 8]> = RingBuffer::new(); + + let rb = unsafe { &mut RB }; + + rb.enqueue(0).unwrap(); + rb.enqueue(1).unwrap(); + + let (mut p, mut c) = rb.split(); + + thread::spawn(move || { + p.enqueue(2).unwrap(); + p.enqueue(3).unwrap(); + }); + + thread::spawn(move || { + c.dequeue().unwrap(); + c.dequeue().unwrap(); + }); +} + +#[test] +fn scoped() { + let mut rb: RingBuffer<i32, [i32; 4]> = RingBuffer::new(); + + rb.enqueue(0).unwrap(); + + { + let (mut p, mut c) = rb.split(); + + Pool::new(2).scoped(move |scope| { + scope.execute(move || { + p.enqueue(1).unwrap(); + }); + + scope.execute(move || { + c.dequeue().unwrap(); + }); + }); + } + + rb.dequeue().unwrap(); +}