# Steps to replicate * It never fails on macOS * It very reliably fails but not 100% of the time on Linux docker running on macOS or on a github CI pipeline * This is either a bytes::BytesMut or mio::net::TcpStream bug, or this could also be me but i am not able to understand where the bug is. # On Linux * Cargo.toml ```sh [package] name = "experimentation" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] bytes = "1.4" mio = {version = "0.8", features = ["os-poll" , "net"]} rand = "0.8" ``` ## Overall code structure The file has two #[test]'s so you can run each one independently * test #1 called test_emulated() and it writes a fixed size, random bytes frame but arbitrarily splitting it in "half" and then tries to reconstruct the frame but checking if enough bytes are available in the bytes::BytesMut buffer. It seems to never fail on either linux or Mac * test #2 called test_socket() and this test has two thread a new thread which runs server socket, it will read bytes and try to reassemble and count frames. the main thread has a client socket and will write N frames, N has to be set pretty high, to get a failure. Note: in this test, both read_frame & write_frame take place via a helper struct which will indicate whether io operation completed or socket was not ready. This is due to socket being intentionally non-blocking. * experimentation/examples/bytes_bug.rs ```rust use bytes::{Bytes, BytesMut}; use mio::net::TcpStream as TcpStreamMio; use std::{ error::Error, io::{Read, Write}, mem::MaybeUninit, net::{TcpListener, TcpStream}, ops::Range, thread::sleep, time::{Duration, Instant}, }; pub fn random_bytes(size: usize) -> &'static [u8] { let data = (0..size).map(|_| rand::random::<u8>()).collect::<Vec<_>>(); // leaking because this is just for the test and i want to refernece the address in multiple threads let leaked_ref: &'static [u8] = Box::leak(data.into_boxed_slice()); leaked_ref } pub fn find_available_port(range: Range<u16>) -> &'static str { for port in range.clone() { let addr = format!("0.0.0.0:{}", port); if TcpListener::bind(&addr).is_ok() { // leaking because this is just for the test and i want to refernece the address in multiple threads return Box::leak(addr.into_boxed_str()); } } panic!("Unable to find an available port in range: {:?}", range); } const TEST_SEND_FRAME_SIZE: usize = 128; const WRITE_N_TIMES: usize = 30_000_000; pub trait Framer { fn get_frame(bytes: &mut BytesMut) -> Option<Bytes>; } #[derive(Debug)] pub struct MsgFramer; impl Framer for MsgFramer { fn get_frame(bytes: &mut BytesMut) -> Option<Bytes> { if bytes.len() < TEST_SEND_FRAME_SIZE { return None; } else { let frame = bytes.split_to(TEST_SEND_FRAME_SIZE); // println!("frame.len(): {}, bytes.len(): {}", frame.len(), bytes.len()); return Some(frame.freeze()); } } } const EOF: usize = 0; #[derive(Debug)] pub enum ReadStatus<T> { Completed(Option<T>), NotReady, } #[derive(Debug)] pub struct FrameReader<F: Framer, const MAX_MESSAGE_SIZE: usize> { reader: TcpStreamMio, buffer: BytesMut, phantom: std::marker::PhantomData<F>, } impl<F: Framer, const MAX_MESSAGE_SIZE: usize> FrameReader<F, MAX_MESSAGE_SIZE> { pub fn new(reader: TcpStreamMio) -> FrameReader<F, MAX_MESSAGE_SIZE> { Self { reader, buffer: BytesMut::with_capacity(MAX_MESSAGE_SIZE), phantom: std::marker::PhantomData, } } #[inline] pub fn read_frame(&mut self) -> Result<ReadStatus<Bytes>, Box<dyn Error>> { let mut buf: [u8; MAX_MESSAGE_SIZE] = unsafe { MaybeUninit::uninit().assume_init() }; // read data from socket match self.reader.read(&mut buf) { Ok(EOF) => { // return error if EOF arrive but there is still data in the buffer if self.buffer.is_empty() { Ok(ReadStatus::Completed(None)) } else { let msg = format!("connection reset by peer, buf: \n{:X?}", &self.buffer[..]); Err(msg.into()) } } Ok(len) => { // if successfully read N bytes append to the buffer and see if a complete frame is ready self.buffer.extend_from_slice(&buf[..len]); if let Some(bytes) = F::get_frame(&mut self.buffer) { // return frame Ok(ReadStatus::Completed(Some(bytes))) } else { // resunt not ready Ok(ReadStatus::NotReady) } } // return not ready since this is not a blocking stream Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(ReadStatus::NotReady), // return io error Err(e) => Err(format!("read error: {}", e).into()), } } } pub enum WriteStatus { Completed, NotReady, } #[derive(Debug)] pub struct FrameWriter { writer: TcpStreamMio, } impl FrameWriter { pub fn new(stream: TcpStreamMio) -> Self { Self { writer: stream } } #[inline] pub fn write_frame(&mut self, bytes: &[u8]) -> Result<WriteStatus, Box<dyn Error>> { match self.writer.write_all(bytes) { Ok(()) => { self.writer.flush()?; Ok(WriteStatus::Completed) } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(WriteStatus::NotReady), Err(e) => Err(format!("write error: {}", e).into()), } } } fn main() { run_emulated(); run_socket(); } #[test] fn test_socket() { run_socket(); } #[test] fn test_emulated() { run_emulated(); } fn run_emulated() { use rand::Rng; let mut rng = rand::thread_rng(); // create a random fixed size frame of bytes let send_bytes = random_bytes(TEST_SEND_FRAME_SIZE); println!("WRITE_N_TIMES: {}", WRITE_N_TIMES); println!("TEST_SEND_FRAME_SIZE: {}", TEST_SEND_FRAME_SIZE); println!("send_bytes: {:X?}", send_bytes); let mut buffer = BytesMut::with_capacity(TEST_SEND_FRAME_SIZE); let start = Instant::now(); // emulate arrival of bytes from socket incomplete in two parts let emulated_frame_split = rng.gen_range(0..TEST_SEND_FRAME_SIZE); println!("emulated_frame_split: {}", emulated_frame_split); for i in 0..WRITE_N_TIMES { // split off part one let s = &send_bytes[..emulated_frame_split]; // append part one buffer.extend_from_slice(s); // println!("iteration: {}, split: {}, buffer.len(): {}, s.len(): {}", i, split, buffer.len(), s.len()); loop { // check if there is enought bytes to make up one frame, note frame size is fixed set as TEST_SEND_FRAME_SIZE let frame = MsgFramer::get_frame(&mut buffer); match frame { Some(frame) => { // if we got some a complete frame assert that it is in fact matching a frame we sent let recv_bytes = &frame[..]; // CRITICAL this line fails when running on linux but not on mac // TODO validate assert_eq!(recv_bytes, send_bytes, "iteration: {}", i); drop(frame); break; } None => { // if we did not get a complete frame, append part two and complete the frame in the buffer let s = &send_bytes[emulated_frame_split..]; buffer.extend_from_slice(s); // println!("iteration: {}, split: {}, buffer.len(): {}, s.len(): {}", i, split, buffer.len(), s.len()); } } } } // this is just to see in the output that everything finished clean. let elapsed = start.elapsed(); println!( "elapsed per write: {:?}, elapsed total: {:?}", elapsed / WRITE_N_TIMES as u32, elapsed, ); } fn run_socket() { // create a random fixed size frame of bytes let send_frame = random_bytes(TEST_SEND_FRAME_SIZE); println!("WRITE_N_TIMES: {}", WRITE_N_TIMES); println!("TEST_SEND_FRAME_SIZE: {}", TEST_SEND_FRAME_SIZE); println!("send_bytes: {:X?}", send_frame); let addr = find_available_port(8000..9000); // CONFIGURE svc let svc = std::thread::Builder::new() .name("Thread-Svc".to_owned()) .spawn({ move || { let listener = TcpListener::bind(addr).unwrap(); let (stream, _) = listener.accept().unwrap(); stream.set_nonblocking(true).unwrap(); // make sure to set non blocking mode let mut reader = FrameReader::<MsgFramer, TEST_SEND_FRAME_SIZE>::new( TcpStreamMio::from_std(stream), ); println!("svc: reader: {:?}", reader); let mut frame_recv_count = 0_usize; // for each iteration try to extract a frame untill we ether get EOF/Complete(None) or exception // when Complete(Some(frame)) - assert bytes recv = bytes sent and count number of frames loop { let res = reader.read_frame(); match res { Ok(ReadStatus::Completed(None)) => { println!("svc: read_frame is None, client closed connection"); break; } Ok(ReadStatus::Completed(Some(recv_frame))) => { frame_recv_count += 1; let recv_frame = &recv_frame[..]; assert_eq!( send_frame, recv_frame, "\nframe_recv_count: {} \nsend_frame: \n{:X?} \nrecv_frame: \n{:X?} \n", frame_recv_count, send_frame, recv_frame, ); } Ok(ReadStatus::NotReady) => { continue; // try reading again everything is fine just not enought data in the buffer to make up a frame } Err(e) => { println!("Svc read_rame error: {}", e.to_string()); break; } } } frame_recv_count } }) .unwrap(); sleep(Duration::from_millis(100)); // allow the spawned to bind // CONFIGUR clt let stream = TcpStream::connect(addr).unwrap(); stream.set_nonblocking(true).unwrap(); // make sure to set non blocking mode let mut writer = FrameWriter::new(TcpStreamMio::from_std(stream)); println!("clt: {:?}", writer); let mut frame_send_count = 0_usize; let start = Instant::now(); for _ in 0..WRITE_N_TIMES { loop { let res = writer.write_frame(send_frame); match res { Ok(WriteStatus::Completed) => { frame_send_count += 1; break; // break out of the write loop and for into the for loop to write next frame } Ok(WriteStatus::NotReady) => { continue; // try writing again everything is fine but socket is not ready } Err(e) => { panic!("clt write_frame error: {}", e.to_string()); } } } } let elapsed = start.elapsed(); drop(writer); // this will allow svc.join to complete let frame_recv_count = svc.join().unwrap(); println!( "per send elapsed: {:?}, total elapsed: {:?} ", elapsed / WRITE_N_TIMES as u32, elapsed ); println!( "frame_send_count: {}, frame_recv_count: {}", frame_send_count, frame_recv_count ); assert_eq!(frame_send_count, frame_recv_count); assert_eq!(frame_send_count, WRITE_N_TIMES); } ``` * start a pod wth rust installed * run commands in the pod ```sh docker run -it --rm --user "$(id -u)":"$(id -g)" -v "$PWD":/usr/src/myapp -w /usr/src/myapp/links rust:1.72.0 ``` ```shell rustup self update rustup update rustup default stable cargo clean cargo test --example bytes_bug -- --nocapture --test socket # fails intermitently, usually after a few million frames recieved cargo test --example bytes_bug -- --nocapture --test emulated # never fails ``` ```sh I have no name!@7b66d28214d9:/usr/src/myapp/links/experimentation$ rustup self update rustup update rustup default stable cargo clean cargo test --example bytes_bug -- --nocapture --test socket cargo test --example bytes_bug -- --nocapture --test emulated info: checking for self-update rustup unchanged - 1.26.0 info: syncing channel updates for 'stable-aarch64-unknown-linux-gnu' info: syncing channel updates for 'nightly-aarch64-unknown-linux-gnu' info: checking for self-update stable-aarch64-unknown-linux-gnu unchanged - rustc 1.72.0 (5680fa18f 2023-08-23) nightly-aarch64-unknown-linux-gnu unchanged - rustc 1.74.0-nightly (2f5df8a94 2023-08-31) info: cleaning up downloads & tmp directories info: using existing install for 'stable-aarch64-unknown-linux-gnu' info: default toolchain set to 'stable-aarch64-unknown-linux-gnu' stable-aarch64-unknown-linux-gnu unchanged - rustc 1.72.0 (5680fa18f 2023-08-23) Compiling libc v0.2.147 Compiling cfg-if v1.0.0 Compiling ppv-lite86 v0.2.17 Compiling log v0.4.20 Compiling bytes v1.4.0 Compiling getrandom v0.2.10 Compiling mio v0.8.8 Compiling rand_core v0.6.4 Compiling rand_chacha v0.3.1 Compiling rand v0.8.5 Compiling experimentation v0.1.0 (/usr/src/myapp/links/experimentation) Finished test [unoptimized + debuginfo] target(s) in 2.39s Running unittests examples/bytes_bug.rs (/usr/src/myapp/links/target/debug/examples/bytes_bug-b655b4dabc0bc051) running 1 test WRITE_N_TIMES: 30000000 TEST_SEND_FRAME_SIZE: 128 send_bytes: [6B, B7, 8D, D5, D4, 83, 4A, 16, E4, 24, 20, 97, E7, E5, 96, 26, 75, F7, 2C, 5F, 3E, D4, C9, D0, 56, 45, 90, BE, 2C, B3, FC, D1, B0, E1, 49, 9D, 8C, 4A, 6B, 43, 56, 7C, 7B, D6, 4F, C, FC, 6D, EB, 7E, B3, 65, 77, A6, A, 2A, 8B, 8B, E3, A8, 93, 98, BB, 29, 2A, C3, FA, B4, 17, 87, A6, 85, 86, 5F, E3, 5F, 9B, B8, D, 62, E6, 86, E7, 69, AE, 1B, 5F, A8, 75, A3, C4, AC, FB, CE, C1, D3, 50, 61, 93, AD, 2, 5C, A3, 2B, CC, 74, 2E, A5, 9F, 2A, 18, 81, 7C, EA, 89, FB, AF, EE, 9C, D, FD, D9, 25, CA, 8, B9, 65, 12] clt: FrameWriter { writer: TcpStream { addr: 127.0.0.1:56796, peer: 127.0.0.1:8000, fd: 5 } } svc: reader: FrameReader { reader: TcpStream { addr: 127.0.0.1:8000, peer: 127.0.0.1:56796, fd: 4 }, buffer: b"", phantom: PhantomData<bytes_bug::MsgFramer> } thread 'Thread-Svc' panicked at 'assertion failed: `(left == right)` left: `[107, 183, 141, 213, 212, 131, 74, 22, 228, 36, 32, 151, 231, 229, 150, 38, 117, 247, 44, 95, 62, 212, 201, 208, 86, 69, 144, 190, 44, 179, 252, 209, 176, 225, 73, 157, 140, 74, 107, 67, 86, 124, 123, 214, 79, 12, 252, 109, 235, 126, 179, 101, 119, 166, 10, 42, 139, 139, 227, 168, 147, 152, 187, 41, 42, 195, 250, 180, 23, 135, 166, 133, 134, 95, 227, 95, 155, 184, 13, 98, 230, 134, 231, 105, 174, 27, 95, 168, 117, 163, 196, 172, 251, 206, 193, 211, 80, 97, 147, 173, 2, 92, 163, 43, 204, 116, 46, 165, 159, 42, 24, 129, 124, 234, 137, 251, 175, 238, 156, 13, 253, 217, 37, 202, 8, 185, 101, 18]`, right: `[107, 183, 141, 213, 212, 131, 74, 22, 228, 36, 32, 151, 231, 229, 150, 38, 117, 247, 44, 95, 62, 212, 201, 208, 86, 69, 144, 190, 44, 179, 252, 209, 176, 225, 73, 157, 140, 74, 107, 67, 86, 124, 123, 214, 79, 12, 252, 109, 235, 126, 179, 101, 119, 166, 10, 42, 139, 139, 227, 168, 147, 152, 187, 41, 42, 195, 250, 180, 23, 135, 166, 133, 134, 95, 227, 95, 155, 184, 13, 98, 230, 134, 231, 105, 174, 27, 95, 168, 117, 163, 196, 172, 251, 206, 193, 211, 80, 97, 147, 173, 2, 92, 107, 183, 141, 213, 212, 131, 74, 22, 228, 36, 32, 151, 231, 229, 150, 38, 117, 247, 44, 95, 62, 212, 201, 208, 86, 69]`: frame_recv_count: 16851669 send_frame: [6B, B7, 8D, D5, D4, 83, 4A, 16, E4, 24, 20, 97, E7, E5, 96, 26, 75, F7, 2C, 5F, 3E, D4, C9, D0, 56, 45, 90, BE, 2C, B3, FC, D1, B0, E1, 49, 9D, 8C, 4A, 6B, 43, 56, 7C, 7B, D6, 4F, C, FC, 6D, EB, 7E, B3, 65, 77, A6, A, 2A, 8B, 8B, E3, A8, 93, 98, BB, 29, 2A, C3, FA, B4, 17, 87, A6, 85, 86, 5F, E3, 5F, 9B, B8, D, 62, E6, 86, E7, 69, AE, 1B, 5F, A8, 75, A3, C4, AC, FB, CE, C1, D3, 50, 61, 93, AD, 2, 5C, A3, 2B, CC, 74, 2E, A5, 9F, 2A, 18, 81, 7C, EA, 89, FB, AF, EE, 9C, D, FD, D9, 25, CA, 8, B9, 65, 12] recv_frame: [6B, B7, 8D, D5, D4, 83, 4A, 16, E4, 24, 20, 97, E7, E5, 96, 26, 75, F7, 2C, 5F, 3E, D4, C9, D0, 56, 45, 90, BE, 2C, B3, FC, D1, B0, E1, 49, 9D, 8C, 4A, 6B, 43, 56, 7C, 7B, D6, 4F, C, FC, 6D, EB, 7E, B3, 65, 77, A6, A, 2A, 8B, 8B, E3, A8, 93, 98, BB, 29, 2A, C3, FA, B4, 17, 87, A6, 85, 86, 5F, E3, 5F, 9B, B8, D, 62, E6, 86, E7, 69, AE, 1B, 5F, A8, 75, A3, C4, AC, FB, CE, C1, D3, 50, 61, 93, AD, 2, 5C, 6B, B7, 8D, D5, D4, 83, 4A, 16, E4, 24, 20, 97, E7, E5, 96, 26, 75, F7, 2C, 5F, 3E, D4, C9, D0, 56, 45] ', experimentation/examples/bytes_bug.rs:230:29 note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace thread 'test_socket' panicked at 'clt write_frame error: write error: Connection reset by peer (os error 104)', experimentation/examples/bytes_bug.rs:271:21 test test_socket ... FAILED failures: failures: test_socket test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 1 filtered out; finished in 11.45s error: test failed, to rerun pass `--example bytes_bug` ```