diff --git a/ci/script.sh b/ci/script.sh index 9137e63b6d09fb8427c4e30f657e69667805f096..07260f43a8b567eb7dd2d032b5b8ed4157119142 100644 --- a/ci/script.sh +++ b/ci/script.sh @@ -11,8 +11,9 @@ main() { cargo test --target $TARGET cargo test --target $TARGET --release - export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt" export RUSTFLAGS="-Z sanitizer=thread" + export RUST_TEST_THREADS=1 + export TSAN_OPTIONS="suppressions=$(pwd)/blacklist.txt" cargo test --test tsan --target $TARGET cargo test --test tsan --target $TARGET --release diff --git a/src/ring_buffer/mod.rs b/src/ring_buffer/mod.rs index 3f53646d2ad1173f93c1d8cf236cdb1bbcca0a83..0ae68a7c909c67b0f5f044e8336d7a1da33196f9 100644 --- a/src/ring_buffer/mod.rs +++ b/src/ring_buffer/mod.rs @@ -29,6 +29,10 @@ impl AtomicUsize { unsafe { &mut *self.v.get() } } + pub fn load_acquire(&self) -> usize { + unsafe { intrinsics::atomic_load_acq(self.v.get()) } + } + pub fn load_relaxed(&self) -> usize { unsafe { intrinsics::atomic_load_relaxed(self.v.get()) } } @@ -129,6 +133,11 @@ where } } + /// Returns `true` if the ring buffer has a length of 0 + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Iterates from the front of the queue to the back pub fn iter(&self) -> Iter<T, A> { Iter { diff --git a/src/ring_buffer/spsc.rs b/src/ring_buffer/spsc.rs index 488c07a87a91a8d34f90a0bf0f55536c4dc2dd5d..082aab71441c92796e2a9ee0c1c4a0e1beb5aadb 100644 --- a/src/ring_buffer/spsc.rs +++ b/src/ring_buffer/spsc.rs @@ -45,7 +45,7 @@ where let n = rb.capacity() + 1; let buffer: &[T] = unsafe { rb.buffer.as_ref() }; - let tail = rb.tail.load_relaxed(); + let tail = rb.tail.load_acquire(); let head = rb.head.load_relaxed(); if head != tail { let item = unsafe { ptr::read(buffer.get_unchecked(head)) }; diff --git a/tests/tsan.rs b/tests/tsan.rs index 6680cd248500fb6a910f18e3c992541e434f58fc..9fbcb8d59b553bd6c513f63872db9595566244b1 100644 --- a/tests/tsan.rs +++ b/tests/tsan.rs @@ -73,3 +73,48 @@ fn scoped() { rb.dequeue().unwrap(); } + +#[test] +fn contention() { + const N: usize = 1024; + + let mut rb: RingBuffer<u8, [u8; N]> = RingBuffer::new(); + + { + let (mut p, mut c) = rb.split(); + + Pool::new(2).scoped(move |scope| { + scope.execute(move || { + let mut sum: u32 = 0; + + for i in 0..N { + let i = i as u8; + sum = sum.wrapping_add(i as u32); + while let Err(_) = p.enqueue(i) {} + } + + println!("producer: {}", sum); + }); + + scope.execute(move || { + let mut sum: u32 = 0; + + for _ in 0..N { + loop { + match c.dequeue() { + Some(v) => { + sum = sum.wrapping_add(v as u32); + break; + } + _ => {} + } + } + } + + println!("consumer: {}", sum); + }); + }); + } + + assert!(rb.is_empty()); +}