diff --git a/src/lib.rs b/src/lib.rs index c6228e5357c2c0386b28c573bd0c9c0e79c4a328..d79b49949b8a5d45e1c698f23e558f687ae165a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -144,7 +144,9 @@ //! is_send::<Vec<NotSend, [NotSend; 4]>>(); //! ``` +#![cfg_attr(target_has_atomic = "ptr", feature(const_atomic_usize_new))] #![deny(missing_docs)] +#![feature(cfg_target_has_atomic)] #![feature(const_fn)] #![feature(shared)] #![feature(unsize)] diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 2f9cb62193c5f31ac412f5dec6e04978debbc783..8311a5554a115cb30a17742df50f3583292c6c4d 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -2,6 +2,8 @@ use core::marker::{PhantomData, Unsize}; use core::ptr; +#[cfg(target_has_atomic = "ptr")] +use core::sync::atomic::{AtomicUsize, Ordering}; use untagged_option::UntaggedOption; @@ -18,11 +20,15 @@ where A: Unsize<[T]>, { _marker: PhantomData<[T]>, - buffer: UntaggedOption<A>, + // this is from where we dequeue items - head: usize, + #[cfg(target_has_atomic = "ptr")] head: AtomicUsize, + #[cfg(not(target_has_atomic = "ptr"))] head: usize, + // this is where we enqueue new items - tail: usize, + #[cfg(target_has_atomic = "ptr")] tail: AtomicUsize, + #[cfg(not(target_has_atomic = "ptr"))] tail: usize, + buffer: UntaggedOption<A>, } impl<T, A> RingBuffer<T, A> @@ -35,7 +41,13 @@ where RingBuffer { _marker: PhantomData, buffer: UntaggedOption::none(), + #[cfg(target_has_atomic = "ptr")] + head: AtomicUsize::new(0), + #[cfg(not(target_has_atomic = "ptr"))] head: 0, + #[cfg(target_has_atomic = "ptr")] + tail: AtomicUsize::new(0), + #[cfg(not(target_has_atomic = "ptr"))] tail: 0, } } @@ -49,11 +61,22 @@ where /// Returns the item in the front of the queue, or `None` if the queue is empty pub fn dequeue(&mut self) -> Option<T> { let n = self.capacity() + 1; + + #[cfg(target_has_atomic = "ptr")] + let head = self.head.get_mut(); + #[cfg(not(target_has_atomic = "ptr"))] + let head = &mut self.head; + + #[cfg(target_has_atomic = "ptr")] + let tail = self.tail.get_mut(); + #[cfg(not(target_has_atomic = "ptr"))] + let tail = &mut self.tail; + let buffer: &[T] = unsafe { self.buffer.as_ref() }; - if self.head != self.tail { - let item = unsafe { ptr::read(buffer.get_unchecked(self.head)) }; - self.head = (self.head + 1) % n; + if *head != *tail { + let item = unsafe { ptr::read(buffer.get_unchecked(*head)) }; + *head = (*head + 1) % n; Some(item) } else { None @@ -65,14 +88,25 @@ where /// Returns `BufferFullError` if the queue is full pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { let n = self.capacity() + 1; + + #[cfg(target_has_atomic = "ptr")] + let head = self.head.get_mut(); + #[cfg(not(target_has_atomic = "ptr"))] + let head = &mut self.head; + + #[cfg(target_has_atomic = "ptr")] + let tail = self.tail.get_mut(); + #[cfg(not(target_has_atomic = "ptr"))] + let tail = &mut self.tail; + let buffer: &mut [T] = unsafe { self.buffer.as_mut() }; - let next_tail = (self.tail + 1) % n; - if next_tail != self.head { + let next_tail = (*tail + 1) % n; + if next_tail != *head { // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory - unsafe { ptr::write(buffer.get_unchecked_mut(self.tail), item) } - self.tail = next_tail; + unsafe { ptr::write(buffer.get_unchecked_mut(*tail), item) } + *tail = next_tail; Ok(()) } else { Err(BufferFullError) @@ -81,10 +115,20 @@ where /// Returns the number of elements in the queue pub fn len(&self) -> usize { - if self.head > self.tail { - self.head - self.tail + #[cfg(target_has_atomic = "ptr")] + let head = self.head.load(Ordering::Relaxed); + #[cfg(not(target_has_atomic = "ptr"))] + let head = self.head; + + #[cfg(target_has_atomic = "ptr")] + let tail = self.tail.load(Ordering::Relaxed); + #[cfg(not(target_has_atomic = "ptr"))] + let tail = self.tail; + + if head > tail { + head - tail } else { - self.tail - self.head + tail - head } } @@ -176,9 +220,14 @@ where fn next(&mut self) -> Option<&'a T> { if self.index < self.len { + #[cfg(not(target_has_atomic = "ptr"))] + let head = self.rb.head; + #[cfg(target_has_atomic = "ptr")] + let head = self.rb.head.load(Ordering::Relaxed); + let buffer: &[T] = unsafe { self.rb.buffer.as_ref() }; let ptr = buffer.as_ptr(); - let i = (self.rb.head + self.index) % (self.rb.capacity() + 1); + let i = (head + self.index) % (self.rb.capacity() + 1); self.index += 1; Some(unsafe { &*ptr.offset(i as isize) }) } else { @@ -196,10 +245,15 @@ where fn next(&mut self) -> Option<&'a mut T> { if self.index < self.len { + #[cfg(not(target_has_atomic = "ptr"))] + let head = self.rb.head; + #[cfg(target_has_atomic = "ptr")] + let head = self.rb.head.load(Ordering::Relaxed); + let capacity = self.rb.capacity() + 1; let buffer: &mut [T] = unsafe { self.rb.buffer.as_mut() }; let ptr: *mut T = buffer.as_mut_ptr(); - let i = (self.rb.head + self.index) % capacity; + let i = (head + self.index) % capacity; self.index += 1; Some(unsafe { &mut *ptr.offset(i as isize) }) } else { diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 5c6fa0ed0ca103695b64ac9275f317fb98d4c37f..b31ef0fbdcade1a310c78de3cd0da30978f453c9 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -1,5 +1,7 @@ use core::ptr::{self, Shared}; use core::marker::Unsize; +#[cfg(target_has_atomic = "ptr")] +use core::sync::atomic::Ordering; use BufferFullError; use ring_buffer::RingBuffer; @@ -10,8 +12,11 @@ where { /// Splits a statically allocated ring buffer into producer and consumer end points /// - /// *Warning* the current implementation only supports single core processors. It's also fine to - /// use both end points on the same core of a multi-core processor. + /// **Warning** the current single producer single consumer implementation only supports + /// multi-core systems where `cfg(target_has_atomic = "ptr")` holds for all the cores. For + /// example, a dual core system where one core is Cortex-M0 core and the other is Cortex-M3 core + /// is not supported because Cortex-M0 (`thumbv6m-none-eabi`) doesn't satisfy + /// `cfg(target_has_atomic = "ptr")`. All single core systems are supported. pub fn split(&'static mut self) -> (Producer<T, A>, Consumer<T, A>) { ( Producer { @@ -39,8 +44,30 @@ where A: Unsize<[T]>, { /// Returns the item in the front of the queue, or `None` if the queue is empty + #[cfg(target_has_atomic = "ptr")] + pub fn dequeue(&mut self) -> Option<T> { + let rb = unsafe { self.rb.as_ref() }; + + let tail = rb.tail.load(Ordering::Relaxed); + let head = rb.head.load(Ordering::Acquire); + + let n = rb.capacity() + 1; + let buffer: &[T] = unsafe { rb.buffer.as_ref() }; + + if head != tail { + let item = unsafe { ptr::read(buffer.get_unchecked(head)) }; + rb.head.store((head + 1) % n, Ordering::Release); + Some(item) + } else { + None + } + } + + /// Returns the item in the front of the queue, or `None` if the queue is empty + #[cfg(not(target_has_atomic = "ptr"))] pub fn dequeue(&mut self) -> Option<T> { let rb = unsafe { self.rb.as_mut() }; + let n = rb.capacity() + 1; let buffer: &[T] = unsafe { rb.buffer.as_ref() }; @@ -80,8 +107,36 @@ where /// Adds an `item` to the end of the queue /// /// Returns `BufferFullError` if the queue is full + #[cfg(target_has_atomic = "ptr")] + pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { + let rb = unsafe { self.rb.as_mut() }; + + let head = rb.head.load(Ordering::Relaxed); + let tail = rb.tail.load(Ordering::Acquire); + + let n = rb.capacity() + 1; + let next_tail = (tail + 1) % n; + + let buffer: &mut [T] = unsafe { rb.buffer.as_mut() }; + + if next_tail != head { + // NOTE(ptr::write) the memory slot that we are about to write to is uninitialized. We + // use `ptr::write` to avoid running `T`'s destructor on the uninitialized memory + unsafe { ptr::write(buffer.get_unchecked_mut(tail), item) } + rb.tail.store(next_tail, Ordering::Release); + Ok(()) + } else { + Err(BufferFullError) + } + } + + /// Adds an `item` to the end of the queue + /// + /// Returns `BufferFullError` if the queue is full + #[cfg(not(target_has_atomic = "ptr"))] pub fn enqueue(&mut self, item: T) -> Result<(), BufferFullError> { let rb = unsafe { self.rb.as_mut() }; + let n = rb.capacity() + 1; let buffer: &mut [T] = unsafe { rb.buffer.as_mut() };