# Steps to replicate
* It never fails on macOS
* It very reliably fails but not 100% of the time on Linux docker running on macOS or on a github CI pipeline
* This is either a bytes::BytesMut or mio::net::TcpStream bug, or this could also be me but i am not able to understand where the bug is.
# On Linux
* Cargo.toml
```sh
[package]
name = "experimentation"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
bytes = "1.4"
mio = {version = "0.8", features = ["os-poll" , "net"]}
rand = "0.8"
```
## Overall code structure
The file has two #[test]'s so you can run each one independently
* test #1
called test_emulated() and it writes a fixed size, random bytes frame but arbitrarily splitting it in "half" and then tries to reconstruct the frame but checking if enough bytes are available in the bytes::BytesMut buffer. It seems to never fail on either linux or Mac
* test #2
called test_socket() and this test has two thread
a new thread which runs server socket, it will read bytes and try to reassemble and count frames.
the main thread has a client socket and will write N frames, N has to be set pretty high, to get a failure.
Note: in this test, both read_frame & write_frame take place via a helper struct which will indicate whether io operation completed or socket was not ready. This is due to socket being intentionally non-blocking.
* experimentation/examples/bytes_bug.rs
```rust
use bytes::{Bytes, BytesMut};
use mio::net::TcpStream as TcpStreamMio;
use std::{
error::Error,
io::{Read, Write},
mem::MaybeUninit,
net::{TcpListener, TcpStream},
ops::Range,
thread::sleep,
time::{Duration, Instant},
};
pub fn random_bytes(size: usize) -> &'static [u8] {
let data = (0..size).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
// leaking because this is just for the test and i want to refernece the address in multiple threads
let leaked_ref: &'static [u8] = Box::leak(data.into_boxed_slice());
leaked_ref
}
pub fn find_available_port(range: Range<u16>) -> &'static str {
for port in range.clone() {
let addr = format!("0.0.0.0:{}", port);
if TcpListener::bind(&addr).is_ok() {
// leaking because this is just for the test and i want to refernece the address in multiple threads
return Box::leak(addr.into_boxed_str());
}
}
panic!("Unable to find an available port in range: {:?}", range);
}
const TEST_SEND_FRAME_SIZE: usize = 128;
const WRITE_N_TIMES: usize = 30_000_000;
pub trait Framer {
fn get_frame(bytes: &mut BytesMut) -> Option<Bytes>;
}
#[derive(Debug)]
pub struct MsgFramer;
impl Framer for MsgFramer {
fn get_frame(bytes: &mut BytesMut) -> Option<Bytes> {
if bytes.len() < TEST_SEND_FRAME_SIZE {
return None;
} else {
let frame = bytes.split_to(TEST_SEND_FRAME_SIZE);
// println!("frame.len(): {}, bytes.len(): {}", frame.len(), bytes.len());
return Some(frame.freeze());
}
}
}
const EOF: usize = 0;
#[derive(Debug)]
pub enum ReadStatus<T> {
Completed(Option<T>),
NotReady,
}
#[derive(Debug)]
pub struct FrameReader<F: Framer, const MAX_MESSAGE_SIZE: usize> {
reader: TcpStreamMio,
buffer: BytesMut,
phantom: std::marker::PhantomData<F>,
}
impl<F: Framer, const MAX_MESSAGE_SIZE: usize> FrameReader<F, MAX_MESSAGE_SIZE> {
pub fn new(reader: TcpStreamMio) -> FrameReader<F, MAX_MESSAGE_SIZE> {
Self {
reader,
buffer: BytesMut::with_capacity(MAX_MESSAGE_SIZE),
phantom: std::marker::PhantomData,
}
}
#[inline]
pub fn read_frame(&mut self) -> Result<ReadStatus<Bytes>, Box<dyn Error>> {
let mut buf: [u8; MAX_MESSAGE_SIZE] = unsafe { MaybeUninit::uninit().assume_init() };
// read data from socket
match self.reader.read(&mut buf) {
Ok(EOF) => {
// return error if EOF arrive but there is still data in the buffer
if self.buffer.is_empty() {
Ok(ReadStatus::Completed(None))
} else {
let msg = format!("connection reset by peer, buf: \n{:X?}", &self.buffer[..]);
Err(msg.into())
}
}
Ok(len) => {
// if successfully read N bytes append to the buffer and see if a complete frame is ready
self.buffer.extend_from_slice(&buf[..len]);
if let Some(bytes) = F::get_frame(&mut self.buffer) {
// return frame
Ok(ReadStatus::Completed(Some(bytes)))
} else {
// resunt not ready
Ok(ReadStatus::NotReady)
}
}
// return not ready since this is not a blocking stream
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(ReadStatus::NotReady),
// return io error
Err(e) => Err(format!("read error: {}", e).into()),
}
}
}
pub enum WriteStatus {
Completed,
NotReady,
}
#[derive(Debug)]
pub struct FrameWriter {
writer: TcpStreamMio,
}
impl FrameWriter {
pub fn new(stream: TcpStreamMio) -> Self {
Self { writer: stream }
}
#[inline]
pub fn write_frame(&mut self, bytes: &[u8]) -> Result<WriteStatus, Box<dyn Error>> {
match self.writer.write_all(bytes) {
Ok(()) => {
self.writer.flush()?;
Ok(WriteStatus::Completed)
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(WriteStatus::NotReady),
Err(e) => Err(format!("write error: {}", e).into()),
}
}
}
fn main() {
run_emulated();
run_socket();
}
#[test]
fn test_socket() {
run_socket();
}
#[test]
fn test_emulated() {
run_emulated();
}
fn run_emulated() {
use rand::Rng;
let mut rng = rand::thread_rng();
// create a random fixed size frame of bytes
let send_bytes = random_bytes(TEST_SEND_FRAME_SIZE);
println!("WRITE_N_TIMES: {}", WRITE_N_TIMES);
println!("TEST_SEND_FRAME_SIZE: {}", TEST_SEND_FRAME_SIZE);
println!("send_bytes: {:X?}", send_bytes);
let mut buffer = BytesMut::with_capacity(TEST_SEND_FRAME_SIZE);
let start = Instant::now();
// emulate arrival of bytes from socket incomplete in two parts
let emulated_frame_split = rng.gen_range(0..TEST_SEND_FRAME_SIZE);
println!("emulated_frame_split: {}", emulated_frame_split);
for i in 0..WRITE_N_TIMES {
// split off part one
let s = &send_bytes[..emulated_frame_split];
// append part one
buffer.extend_from_slice(s);
// println!("iteration: {}, split: {}, buffer.len(): {}, s.len(): {}", i, split, buffer.len(), s.len());
loop {
// check if there is enought bytes to make up one frame, note frame size is fixed set as TEST_SEND_FRAME_SIZE
let frame = MsgFramer::get_frame(&mut buffer);
match frame {
Some(frame) => {
// if we got some a complete frame assert that it is in fact matching a frame we sent
let recv_bytes = &frame[..];
// CRITICAL this line fails when running on linux but not on mac // TODO validate
assert_eq!(recv_bytes, send_bytes, "iteration: {}", i);
drop(frame);
break;
}
None => {
// if we did not get a complete frame, append part two and complete the frame in the buffer
let s = &send_bytes[emulated_frame_split..];
buffer.extend_from_slice(s);
// println!("iteration: {}, split: {}, buffer.len(): {}, s.len(): {}", i, split, buffer.len(), s.len());
}
}
}
}
// this is just to see in the output that everything finished clean.
let elapsed = start.elapsed();
println!(
"elapsed per write: {:?}, elapsed total: {:?}",
elapsed / WRITE_N_TIMES as u32,
elapsed,
);
}
fn run_socket() {
// create a random fixed size frame of bytes
let send_frame = random_bytes(TEST_SEND_FRAME_SIZE);
println!("WRITE_N_TIMES: {}", WRITE_N_TIMES);
println!("TEST_SEND_FRAME_SIZE: {}", TEST_SEND_FRAME_SIZE);
println!("send_bytes: {:X?}", send_frame);
let addr = find_available_port(8000..9000);
// CONFIGURE svc
let svc = std::thread::Builder::new()
.name("Thread-Svc".to_owned())
.spawn({
move || {
let listener = TcpListener::bind(addr).unwrap();
let (stream, _) = listener.accept().unwrap();
stream.set_nonblocking(true).unwrap(); // make sure to set non blocking mode
let mut reader = FrameReader::<MsgFramer, TEST_SEND_FRAME_SIZE>::new(
TcpStreamMio::from_std(stream),
);
println!("svc: reader: {:?}", reader);
let mut frame_recv_count = 0_usize;
// for each iteration try to extract a frame untill we ether get EOF/Complete(None) or exception
// when Complete(Some(frame)) - assert bytes recv = bytes sent and count number of frames
loop {
let res = reader.read_frame();
match res {
Ok(ReadStatus::Completed(None)) => {
println!("svc: read_frame is None, client closed connection");
break;
}
Ok(ReadStatus::Completed(Some(recv_frame))) => {
frame_recv_count += 1;
let recv_frame = &recv_frame[..];
assert_eq!(
send_frame, recv_frame,
"\nframe_recv_count: {} \nsend_frame: \n{:X?} \nrecv_frame: \n{:X?} \n",
frame_recv_count, send_frame, recv_frame,
);
}
Ok(ReadStatus::NotReady) => {
continue; // try reading again everything is fine just not enought data in the buffer to make up a frame
}
Err(e) => {
println!("Svc read_rame error: {}", e.to_string());
break;
}
}
}
frame_recv_count
}
})
.unwrap();
sleep(Duration::from_millis(100)); // allow the spawned to bind
// CONFIGUR clt
let stream = TcpStream::connect(addr).unwrap();
stream.set_nonblocking(true).unwrap(); // make sure to set non blocking mode
let mut writer = FrameWriter::new(TcpStreamMio::from_std(stream));
println!("clt: {:?}", writer);
let mut frame_send_count = 0_usize;
let start = Instant::now();
for _ in 0..WRITE_N_TIMES {
loop {
let res = writer.write_frame(send_frame);
match res {
Ok(WriteStatus::Completed) => {
frame_send_count += 1;
break; // break out of the write loop and for into the for loop to write next frame
}
Ok(WriteStatus::NotReady) => {
continue; // try writing again everything is fine but socket is not ready
}
Err(e) => {
panic!("clt write_frame error: {}", e.to_string());
}
}
}
}
let elapsed = start.elapsed();
drop(writer); // this will allow svc.join to complete
let frame_recv_count = svc.join().unwrap();
println!(
"per send elapsed: {:?}, total elapsed: {:?} ",
elapsed / WRITE_N_TIMES as u32,
elapsed
);
println!(
"frame_send_count: {}, frame_recv_count: {}",
frame_send_count, frame_recv_count
);
assert_eq!(frame_send_count, frame_recv_count);
assert_eq!(frame_send_count, WRITE_N_TIMES);
}
```
* start a pod wth rust installed
* run commands in the pod
```sh
docker run -it --rm --user "$(id -u)":"$(id -g)" -v "$PWD":/usr/src/myapp -w /usr/src/myapp/links rust:1.72.0
```
```shell
rustup self update
rustup update
rustup default stable
cargo clean
cargo test --example bytes_bug -- --nocapture --test socket # fails intermitently, usually after a few million frames recieved
cargo test --example bytes_bug -- --nocapture --test emulated # never fails
```
```sh
I have no name!@7b66d28214d9:/usr/src/myapp/links/experimentation$ rustup self update
rustup update
rustup default stable
cargo clean
cargo test --example bytes_bug -- --nocapture --test socket
cargo test --example bytes_bug -- --nocapture --test emulated
info: checking for self-update
rustup unchanged - 1.26.0
info: syncing channel updates for 'stable-aarch64-unknown-linux-gnu'
info: syncing channel updates for 'nightly-aarch64-unknown-linux-gnu'
info: checking for self-update
stable-aarch64-unknown-linux-gnu unchanged - rustc 1.72.0 (5680fa18f 2023-08-23)
nightly-aarch64-unknown-linux-gnu unchanged - rustc 1.74.0-nightly (2f5df8a94 2023-08-31)
info: cleaning up downloads & tmp directories
info: using existing install for 'stable-aarch64-unknown-linux-gnu'
info: default toolchain set to 'stable-aarch64-unknown-linux-gnu'
stable-aarch64-unknown-linux-gnu unchanged - rustc 1.72.0 (5680fa18f 2023-08-23)
Compiling libc v0.2.147
Compiling cfg-if v1.0.0
Compiling ppv-lite86 v0.2.17
Compiling log v0.4.20
Compiling bytes v1.4.0
Compiling getrandom v0.2.10
Compiling mio v0.8.8
Compiling rand_core v0.6.4
Compiling rand_chacha v0.3.1
Compiling rand v0.8.5
Compiling experimentation v0.1.0 (/usr/src/myapp/links/experimentation)
Finished test [unoptimized + debuginfo] target(s) in 2.39s
Running unittests examples/bytes_bug.rs (/usr/src/myapp/links/target/debug/examples/bytes_bug-b655b4dabc0bc051)
running 1 test
WRITE_N_TIMES: 30000000
TEST_SEND_FRAME_SIZE: 128
send_bytes: [6B, B7, 8D, D5, D4, 83, 4A, 16, E4, 24, 20, 97, E7, E5, 96, 26, 75, F7, 2C, 5F, 3E, D4, C9, D0, 56, 45, 90, BE, 2C, B3, FC, D1, B0, E1, 49, 9D, 8C, 4A, 6B, 43, 56, 7C, 7B, D6, 4F, C, FC, 6D, EB, 7E, B3, 65, 77, A6, A, 2A, 8B, 8B, E3, A8, 93, 98, BB, 29, 2A, C3, FA, B4, 17, 87, A6, 85, 86, 5F, E3, 5F, 9B, B8, D, 62, E6, 86, E7, 69, AE, 1B, 5F, A8, 75, A3, C4, AC, FB, CE, C1, D3, 50, 61, 93, AD, 2, 5C, A3, 2B, CC, 74, 2E, A5, 9F, 2A, 18, 81, 7C, EA, 89, FB, AF, EE, 9C, D, FD, D9, 25, CA, 8, B9, 65, 12]
clt: FrameWriter { writer: TcpStream { addr: 127.0.0.1:56796, peer: 127.0.0.1:8000, fd: 5 } }
svc: reader: FrameReader { reader: TcpStream { addr: 127.0.0.1:8000, peer: 127.0.0.1:56796, fd: 4 }, buffer: b"", phantom: PhantomData<bytes_bug::MsgFramer> }
thread 'Thread-Svc' panicked at 'assertion failed: `(left == right)`
left: `[107, 183, 141, 213, 212, 131, 74, 22, 228, 36, 32, 151, 231, 229, 150, 38, 117, 247, 44, 95, 62, 212, 201, 208, 86, 69, 144, 190, 44, 179, 252, 209, 176, 225, 73, 157, 140, 74, 107, 67, 86, 124, 123, 214, 79, 12, 252, 109, 235, 126, 179, 101, 119, 166, 10, 42, 139, 139, 227, 168, 147, 152, 187, 41, 42, 195, 250, 180, 23, 135, 166, 133, 134, 95, 227, 95, 155, 184, 13, 98, 230, 134, 231, 105, 174, 27, 95, 168, 117, 163, 196, 172, 251, 206, 193, 211, 80, 97, 147, 173, 2, 92, 163, 43, 204, 116, 46, 165, 159, 42, 24, 129, 124, 234, 137, 251, 175, 238, 156, 13, 253, 217, 37, 202, 8, 185, 101, 18]`,
right: `[107, 183, 141, 213, 212, 131, 74, 22, 228, 36, 32, 151, 231, 229, 150, 38, 117, 247, 44, 95, 62, 212, 201, 208, 86, 69, 144, 190, 44, 179, 252, 209, 176, 225, 73, 157, 140, 74, 107, 67, 86, 124, 123, 214, 79, 12, 252, 109, 235, 126, 179, 101, 119, 166, 10, 42, 139, 139, 227, 168, 147, 152, 187, 41, 42, 195, 250, 180, 23, 135, 166, 133, 134, 95, 227, 95, 155, 184, 13, 98, 230, 134, 231, 105, 174, 27, 95, 168, 117, 163, 196, 172, 251, 206, 193, 211, 80, 97, 147, 173, 2, 92, 107, 183, 141, 213, 212, 131, 74, 22, 228, 36, 32, 151, 231, 229, 150, 38, 117, 247, 44, 95, 62, 212, 201, 208, 86, 69]`:
frame_recv_count: 16851669
send_frame:
[6B, B7, 8D, D5, D4, 83, 4A, 16, E4, 24, 20, 97, E7, E5, 96, 26, 75, F7, 2C, 5F, 3E, D4, C9, D0, 56, 45, 90, BE, 2C, B3, FC, D1, B0, E1, 49, 9D, 8C, 4A, 6B, 43, 56, 7C, 7B, D6, 4F, C, FC, 6D, EB, 7E, B3, 65, 77, A6, A, 2A, 8B, 8B, E3, A8, 93, 98, BB, 29, 2A, C3, FA, B4, 17, 87, A6, 85, 86, 5F, E3, 5F, 9B, B8, D, 62, E6, 86, E7, 69, AE, 1B, 5F, A8, 75, A3, C4, AC, FB, CE, C1, D3, 50, 61, 93, AD, 2, 5C, A3, 2B, CC, 74, 2E, A5, 9F, 2A, 18, 81, 7C, EA, 89, FB, AF, EE, 9C, D, FD, D9, 25, CA, 8, B9, 65, 12]
recv_frame:
[6B, B7, 8D, D5, D4, 83, 4A, 16, E4, 24, 20, 97, E7, E5, 96, 26, 75, F7, 2C, 5F, 3E, D4, C9, D0, 56, 45, 90, BE, 2C, B3, FC, D1, B0, E1, 49, 9D, 8C, 4A, 6B, 43, 56, 7C, 7B, D6, 4F, C, FC, 6D, EB, 7E, B3, 65, 77, A6, A, 2A, 8B, 8B, E3, A8, 93, 98, BB, 29, 2A, C3, FA, B4, 17, 87, A6, 85, 86, 5F, E3, 5F, 9B, B8, D, 62, E6, 86, E7, 69, AE, 1B, 5F, A8, 75, A3, C4, AC, FB, CE, C1, D3, 50, 61, 93, AD, 2, 5C, 6B, B7, 8D, D5, D4, 83, 4A, 16, E4, 24, 20, 97, E7, E5, 96, 26, 75, F7, 2C, 5F, 3E, D4, C9, D0, 56, 45]
', experimentation/examples/bytes_bug.rs:230:29
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'test_socket' panicked at 'clt write_frame error: write error: Connection reset by peer (os error 104)', experimentation/examples/bytes_bug.rs:271:21
test test_socket ... FAILED
failures:
failures:
test_socket
test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 1 filtered out; finished in 11.45s
error: test failed, to rerun pass `--example bytes_bug`
```