Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 0 additions & 33 deletions .cirrus.yml

This file was deleted.

43 changes: 43 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,46 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
verbose: true
file: lcov.info

test-rxe:
runs-on: ubuntu-latest
steps:
- name: Checkout repository code
uses: actions/checkout@v4
- name: Install tools required
uses: taiki-e/install-action@v2
with:
tool: just,cargo-nextest,cargo-llvm-cov
- name: Clone rxe kmod
run: |
uname -a
git clone https://github.com/pizhenwei/rxe.git
make -C rxe
sudo insmod ./rxe/rdma_rxe.ko
ip add
sudo rdma link add rxe_eth0 type rxe netdev eth0
- name: Build rdma-core
run: |
sudo apt update
sudo apt install -y make pkg-config cmake libnl-3-dev libnl-route-3-dev libnl-genl-3-dev
git clone https://github.com/linux-rdma/rdma-core.git
./rdma-core/build.sh
- name: Test with RXE
run: |
export LD_LIBRARY_PATH=./rdma-core/build/lib
cargo llvm-cov --no-report run --example ibv_devinfo
if modinfo mlx5_ib >/dev/null 2>&1 && lsmod | grep -q '^mlx5_ib'; then
sudo rmmod mlx5_ib
fi
just test-basic-with-cov
just test-rc-pingpong-with-cov
just test-rc-pingpong-with-events-with-cov
just test-cmtime-with-cov
just generate-cov
- name: Upload coverage information
uses: codecov/codecov-action@v5
with:
fail_ci_if_error: false
token: ${{ secrets.CODECOV_TOKEN }}
verbose: true
file: lcov.info
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "A better wrapper for using RDMA programming APIs in Rust flavor"
license= "MPL-2.0"
repository = "https://github.com/RDMA-Rust/sideway"
readme = "README.md"
keywords = ["RDMA", "verbs", "cm", "libibverbs", "librdmacm"]
keywords = ["RDMA", "verbs", "cm", "libibverbs", "librdmacm", "ibverbs", "rdmacm"]
authors = [
"Luke Yue <lukedyue@gmail.com>",
"FujiZ <i@fujiz.me>",
Expand Down
5 changes: 5 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ test-rc-pingpong-with-cov:
sleep 2
cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 127.0.0.1

test-rc-pingpong-with-events-with-cov:
cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 -e &
sleep 2
cargo llvm-cov --no-report run --example rc_pingpong_split -- -d {{rdma_dev}} -g 1 -e 127.0.0.1

test-cmtime-with-cov:
cargo llvm-cov --no-report run --example cmtime -- -b {{ip}} &
sleep 2
Expand Down
210 changes: 126 additions & 84 deletions examples/rc_pingpong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
use sideway::ibverbs::address::{AddressHandleAttribute, Gid};
use sideway::ibverbs::completion::{
CreateCompletionQueueWorkCompletionFlags, GenericCompletionQueue, WorkCompletionStatus,
CompletionChannel, CreateCompletionQueueWorkCompletionFlags, GenericCompletionQueue, PollCompletionQueueError,
WorkCompletionStatus,
};
use sideway::ibverbs::device::{DeviceInfo, DeviceList};
use sideway::ibverbs::device_context::Mtu;
Expand Down Expand Up @@ -69,6 +70,9 @@ pub struct Args {
/// Get CQE with timestamp
#[arg(long, short = 't', default_value_t = false)]
ts: bool,
/// Use CQ events instead of busy polling
#[arg(long, short = 'e', default_value_t = false)]
use_events: bool,
/// If no value provided, start a server and wait for connection, otherwise, connect to server at [host]
#[arg(name = "host")]
server_ip: Option<String>,
Expand Down Expand Up @@ -156,6 +160,13 @@ fn main() -> anyhow::Result<()> {
}
}

// Create completion channel if using events
let comp_channel = if args.use_events {
Some(CompletionChannel::new(&context).expect("Couldn't create completion channel"))
} else {
None
};

let pd = context.alloc_pd().unwrap_or_else(|_| panic!("Couldn't allocate PD"));
let send_data: Vec<u8> = vec![0; args.size as _];
let send_mr = unsafe {
Expand Down Expand Up @@ -189,16 +200,28 @@ fn main() -> anyhow::Result<()> {
);
}

// Associate completion channel with CQ if using events
if let Some(ref channel) = comp_channel {
cq_builder.setup_comp_channel(channel, 0);
}

let cq = cq_builder.setup_cqe(rx_depth + 1).build_ex().unwrap();

let cq_handle = GenericCompletionQueue::from(Arc::clone(&cq));

// Request initial notification if using events
if args.use_events {
cq_handle
.req_notify_cq(false)
.expect("Couldn't request CQ notification");
}

let mut builder = pd.create_qp_builder();

let mut qp = builder
.setup_max_inline_data(128)
.setup_send_cq(cq_handle.clone())
.setup_recv_cq(cq_handle)
.setup_recv_cq(cq_handle.clone())
.setup_max_send_wr(1)
.setup_max_recv_wr(rx_depth)
.build_ex()
Expand Down Expand Up @@ -324,100 +347,119 @@ fn main() -> anyhow::Result<()> {
outstanding_send = true;
}
// poll for the completion
{
loop {
match cq.start_poll() {
Ok(mut poller) => {
while let Some(wc) = poller.next() {
if wc.status() != WorkCompletionStatus::Success as u32 {
panic!(
"Failed status {:#?} ({}) for wr_id {}",
Into::<WorkCompletionStatus>::into(wc.status()),
wc.status(),
wc.wr_id()
);
}
match wc.wr_id() {
SEND_WR_ID => {
scnt += 1;
outstanding_send = false;
},
RECV_WR_ID => {
rcnt += 1;
rout -= 1;

// Post more receives if the receive side credit is low
if rout <= rx_depth / 2 {
let to_post = rx_depth - rout;
for _ in 0..to_post {
let mut guard = qp.start_post_recv();
let recv_handle = guard.construct_wr(RECV_WR_ID);
unsafe {
recv_handle.setup_sge(
recv_mr.lkey(),
recv_data.as_mut_ptr() as _,
args.size,
);
};
guard.post().unwrap();
}
rout += to_post;
let mut num_cq_events: u32 = 0;
loop {
// If using events, wait for CQ event before polling
if args.use_events {
if let Some(ref channel) = comp_channel {
// Get the CQ event (this blocks until an event arrives)
let _event_cq = channel.get_cq_event().expect("Failed to get CQ event");
num_cq_events += 1;

// Re-arm the notification BEFORE polling to avoid missing events
cq_handle
.req_notify_cq(false)
.expect("Couldn't request CQ notification");
}
}

// Poll for completions
match cq.start_poll() {
Ok(mut poller) => {
while let Some(wc) = poller.next() {
if wc.status() != WorkCompletionStatus::Success as u32 {
panic!(
"Failed status {:#?} ({}) for wr_id {}",
Into::<WorkCompletionStatus>::into(wc.status()),
wc.status(),
wc.wr_id()
);
}
match wc.wr_id() {
SEND_WR_ID => {
scnt += 1;
outstanding_send = false;
},
RECV_WR_ID => {
rcnt += 1;
rout -= 1;

// Post more receives if the receive side credit is low
if rout <= rx_depth / 2 {
let to_post = rx_depth - rout;
for _ in 0..to_post {
let mut guard = qp.start_post_recv();
let recv_handle = guard.construct_wr(RECV_WR_ID);
unsafe {
recv_handle.setup_sge(recv_mr.lkey(), recv_data.as_mut_ptr() as _, args.size);
};
guard.post().unwrap();
}
rout += to_post;
}

if args.ts {
let timestamp = wc.completion_timestamp();
if ts_param.last_completion_with_timestamp != 0 {
let delta: u64 = if timestamp >= ts_param.completion_recv_prev_time {
timestamp - ts_param.completion_recv_prev_time
} else {
completion_timestamp_mask - ts_param.completion_recv_prev_time
+ timestamp
+ 1
};

ts_param.completion_recv_max_time_delta =
ts_param.completion_recv_max_time_delta.max(delta);
ts_param.completion_recv_min_time_delta =
ts_param.completion_recv_min_time_delta.min(delta);
ts_param.completion_recv_total_time_delta += delta;
ts_param.completion_with_time_iters += 1;
}

ts_param.completion_recv_prev_time = timestamp;
ts_param.last_completion_with_timestamp = 1;
} else {
ts_param.last_completion_with_timestamp = 0;
if args.ts {
let timestamp = wc.completion_timestamp();
if ts_param.last_completion_with_timestamp != 0 {
let delta: u64 = if timestamp >= ts_param.completion_recv_prev_time {
timestamp - ts_param.completion_recv_prev_time
} else {
completion_timestamp_mask - ts_param.completion_recv_prev_time + timestamp + 1
};

ts_param.completion_recv_max_time_delta =
ts_param.completion_recv_max_time_delta.max(delta);
ts_param.completion_recv_min_time_delta =
ts_param.completion_recv_min_time_delta.min(delta);
ts_param.completion_recv_total_time_delta += delta;
ts_param.completion_with_time_iters += 1;
}
},
_ => {
panic!("Unknown error!");
},
}

if scnt < args.iter && !outstanding_send {
// Post another send if we haven't reached the iteration limit
let mut guard = qp.start_post_send();
let send_handle = guard.construct_wr(SEND_WR_ID, WorkRequestFlags::Signaled).setup_send();
unsafe {
send_handle.setup_sge(send_mr.lkey(), send_data.as_ptr() as _, args.size);
ts_param.completion_recv_prev_time = timestamp;
ts_param.last_completion_with_timestamp = 1;
} else {
ts_param.last_completion_with_timestamp = 0;
}
guard.post()?;
outstanding_send = true;
},
_ => {
panic!("Unknown error!");
},
}

if scnt < args.iter && !outstanding_send {
// Post another send if we haven't reached the iteration limit
let mut guard = qp.start_post_send();
let send_handle = guard.construct_wr(SEND_WR_ID, WorkRequestFlags::Signaled).setup_send();
unsafe {
send_handle.setup_sge(send_mr.lkey(), send_data.as_ptr() as _, args.size);
}
guard.post()?;
outstanding_send = true;
}
},
Err(_) => {
}
},
Err(PollCompletionQueueError::CompletionQueueEmpty) => {
// CQ is empty - if not using events, continue busy polling
if !args.use_events {
continue;
},
}
}
},
Err(e) => {
panic!("Failed to poll CQ: {:?}", e);
},
}

// Check if we're done
if scnt >= args.iter && rcnt >= args.iter {
break;
}
// Check if we're done
if scnt >= args.iter && rcnt >= args.iter {
break;
}
}

// Acknowledge all CQ events before cleanup
if num_cq_events > 0 {
cq_handle.ack_events(num_cq_events);
}

let end_time = clock.now();
let time = end_time.duration_since(start_time);
let bytes = args.size as u64 * args.iter as u64 * 2;
Expand Down
Loading
Loading