From 73cec69f9e22a00cc8f70dc48d62932b334a9165 Mon Sep 17 00:00:00 2001 From: Luke Yue Date: Mon, 29 Jun 2026 10:21:29 +0800 Subject: [PATCH 1/2] feat(rdmacm): add rdma_reject support --- src/rdmacm/communication_manager.rs | 96 +++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 18 deletions(-) diff --git a/src/rdmacm/communication_manager.rs b/src/rdmacm/communication_manager.rs index 6f76fc2..22a9271 100644 --- a/src/rdmacm/communication_manager.rs +++ b/src/rdmacm/communication_manager.rs @@ -169,7 +169,7 @@ use rdma_mummy_sys::{ ibv_qp_attr, ibv_qp_type, rdma_accept, rdma_ack_cm_event, rdma_bind_addr, rdma_cm_event, rdma_cm_event_type, rdma_cm_id, rdma_conn_param, rdma_connect, rdma_create_event_channel, rdma_create_id, rdma_destroy_event_channel, rdma_destroy_id, rdma_disconnect, rdma_establish, rdma_event_channel, rdma_get_cm_event, rdma_init_qp_attr, - rdma_listen, rdma_port_space, rdma_resolve_addr, rdma_resolve_route, + rdma_listen, rdma_port_space, rdma_reject, rdma_resolve_addr, rdma_resolve_route, }; use crate::ibverbs::device_context::DeviceContext; @@ -423,6 +423,20 @@ pub enum AcceptErrorKind { Rdmacm(#[from] io::Error), } +/// Error returned by [`Identifier::reject`] for rejecting a connection request. +#[derive(Debug, thiserror::Error)] +#[error("failed to reject")] +#[non_exhaustive] +pub struct RejectError(#[from] pub RejectErrorKind); + +/// The enum type for [`RejectError`]. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +#[non_exhaustive] +pub enum RejectErrorKind { + Rdmacm(#[from] io::Error), +} + /// Error returned by [`Identifier::establish`] for establishing connection setup. #[derive(Debug, thiserror::Error)] #[error("failed to establish")] @@ -986,6 +1000,35 @@ impl Identifier { Ok(()) } + /// Called from the listening side to reject an incoming connection on the [`Identifier`]. + /// + /// # Note + /// + /// This method is only useful for [`EventType::ConnectRequest`] events. A new [`Identifier`] + /// is automatically created to handle the incoming connection request. This is distinct from + /// the listener [`Identifier`]. The new [`Identifier`] could be obtained by [`Event::cm_id`]. + /// + /// Use [`ConnectionParameter::setup_private_data`] to attach optional rejection private data. + /// + /// [`Event::cm_id`]: crate::rdmacm::communication_manager::Event::cm_id + /// + pub fn reject(&self, conn_param: ConnectionParameter) -> Result<(), RejectError> { + let cm_id = self.cm_id; + let ret = unsafe { + rdma_reject( + cm_id.as_ptr(), + conn_param.conn_param.private_data, + conn_param.conn_param.private_data_len, + ) + }; + + if ret < 0 { + return Err(RejectErrorKind::Rdmacm(io::Error::last_os_error()).into()); + } + + Ok(()) + } + /// Acknowledge an incoming connection response event and complete the connection establishment /// on the [`Identifier`]. /// @@ -1080,18 +1123,17 @@ impl ConnectionParameter { self } - /// Setup the private data to be sent with connect or accept. + /// Setup the private data to be sent with connect, accept, or reject. /// /// # Private data size /// /// This method copies the provided slice into the [`ConnectionParameter`] /// and stores that owned buffer's pointer and length in the raw RDMA CM /// parameter. It does not cap the length to any specific RDMA CM operation. - /// Check the operation limit - /// before calling [`Identifier::connect`] or [`Identifier::accept`], or when - /// using the lower-level [`rdma_reject(3)`] API. + /// Check the operation limit before calling [`Identifier::connect`], + /// [`Identifier::accept`], or [`Identifier::reject`]. /// - /// | Port space | Service type | [`connect`] | [`accept`] | [`rdma_reject(3)`] | + /// | Port space | Service type | [`connect`] | [`accept`] | [`reject`] | /// | --- | --- | ---: | ---: | ---: | /// | [`PortSpace::Tcp`] | connected | 56 | 196 | 148 | /// | [`PortSpace::Udp`] | datagram | 180 | 136 | 136 | @@ -1099,8 +1141,8 @@ impl ConnectionParameter { /// | [`PortSpace::InfiniBand`] | datagram | 216 | 136 | 136 | /// /// [`PortSpace::Tcp`] and [`PortSpace::Udp`] values are the user-visible - /// payload sizes documented by the [`rdma_connect(3)`] and - /// [`rdma_accept(3)`] man pages, plus the [`rdma_reject(3)`] sizes implied + /// payload sizes documented by the [`rdma_connect`] and + /// [`rdma_accept`] man pages, plus the [`rdma_reject`] sizes implied /// by Linux's IB CM message constants and RDMA CM routing. /// [`PortSpace::InfiniBand`] is derived from Linux CMA's /// `id->qp_type == IB_QPT_UD` branch: connected QPs use IB CM REQ/REP/REJ @@ -1114,9 +1156,10 @@ impl ConnectionParameter { /// /// [`connect`]: Identifier::connect /// [`accept`]: Identifier::accept - /// [`rdma_connect(3)`]: https://man7.org/linux/man-pages/man3/rdma_connect.3.html - /// [`rdma_accept(3)`]: https://man7.org/linux/man-pages/man3/rdma_accept.3.html - /// [`rdma_reject(3)`]: https://man7.org/linux/man-pages/man3/rdma_reject.3.html + /// [`reject`]: Identifier::reject + /// [`rdma_connect`]: https://man7.org/linux/man-pages/man3/rdma_connect.3.html + /// [`rdma_accept`]: https://man7.org/linux/man-pages/man3/rdma_accept.3.html + /// [`rdma_reject`]: https://man7.org/linux/man-pages/man3/rdma_reject.3.html /// /// [`setup_private_data`]: ConnectionParameter::setup_private_data /// @@ -1358,7 +1401,11 @@ mod tests { assert_eq!(ev.key, key); let event = channel.get_cm_event().unwrap(); - assert_eq!(event.event_type(), EventType::AddressResolved); + assert!( + matches!(event.event_type(), EventType::AddressResolved | EventType::AddressError), + "unexpected RDMA CM event: {:?}", + event.event_type() + ); assert_eq!(Arc::strong_count(&channel), 2); event.ack().unwrap(); @@ -1582,16 +1629,29 @@ mod tests { fn test_get_device_context_caches_correctly() -> Result<(), Box> { match EventChannel::new() { Ok(channel) => { + let Some(cm_addr) = first_ib_or_roce_v2_gid_addr() else { + eprintln!( + "skipping RDMA CM device-context test: no usable IB GID or non-link-local RoCEv2 GID found" + ); + return Ok(()); + }; + let id = channel.create_id(PortSpace::Tcp)?; - let _ = id.resolve_addr( - None, - SocketAddr::from((IpAddr::from_str("127.0.0.1")?, 0)), - Duration::new(0, 200000000), - ); + if let Err(err) = id.resolve_addr(None, cm_addr.socket_addr(0), Duration::new(0, 200000000)) { + eprintln!("skipping RDMA CM device-context test: resolve_addr failed synchronously: {err}"); + return Ok(()); + } let event = channel.get_cm_event()?; - assert_eq!(event.event_type(), EventType::AddressResolved); + if event.event_type() != EventType::AddressResolved { + eprintln!( + "skipping RDMA CM device-context test: resolve_addr completed with {:?}", + event.event_type() + ); + event.ack()?; + return Ok(()); + } let ctx1 = id.get_device_context(); let ctx2 = id.get_device_context(); From d10e5f005c4399a06df82790905402aa8570895e Mon Sep 17 00:00:00 2001 From: Luke Yue Date: Tue, 30 Jun 2026 09:26:27 +0800 Subject: [PATCH 2/2] feat(rdmacm): add device and address getters --- src/rdmacm/communication_manager.rs | 230 ++++++++++++++++++++++++++-- 1 file changed, 214 insertions(+), 16 deletions(-) diff --git a/src/rdmacm/communication_manager.rs b/src/rdmacm/communication_manager.rs index 22a9271..1a61954 100644 --- a/src/rdmacm/communication_manager.rs +++ b/src/rdmacm/communication_manager.rs @@ -166,9 +166,10 @@ use std::{io, mem::MaybeUninit, net::SocketAddr, ptr::NonNull, sync::Arc}; use os_socketaddr::OsSocketAddr; use rdma_mummy_sys::{ - ibv_qp_attr, ibv_qp_type, rdma_accept, rdma_ack_cm_event, rdma_bind_addr, rdma_cm_event, rdma_cm_event_type, - rdma_cm_id, rdma_conn_param, rdma_connect, rdma_create_event_channel, rdma_create_id, rdma_destroy_event_channel, - rdma_destroy_id, rdma_disconnect, rdma_establish, rdma_event_channel, rdma_get_cm_event, rdma_init_qp_attr, + ibv_context, ibv_qp_attr, ibv_qp_type, rdma_accept, rdma_ack_cm_event, rdma_bind_addr, rdma_cm_event, + rdma_cm_event_type, rdma_cm_id, rdma_conn_param, rdma_connect, rdma_create_event_channel, rdma_create_id, + rdma_destroy_event_channel, rdma_destroy_id, rdma_disconnect, rdma_establish, rdma_event_channel, + rdma_free_devices, rdma_get_cm_event, rdma_get_devices, rdma_get_local_addr, rdma_get_peer_addr, rdma_init_qp_attr, rdma_listen, rdma_port_space, rdma_reject, rdma_resolve_addr, rdma_resolve_route, }; @@ -480,6 +481,24 @@ pub enum GetQueuePairAttributeErrorKind { Rdmacm(#[from] io::Error), } +/// Error returned by [`get_devices`] for getting RDMA devices opened by RDMA CM. +#[derive(Debug, thiserror::Error)] +#[error("failed to get rdma devices")] +#[non_exhaustive] +pub struct GetDevicesError(#[from] pub GetDevicesErrorKind); + +/// The enum type for [`GetDevicesError`]. +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +#[non_exhaustive] +pub enum GetDevicesErrorKind { + Rdmacm(#[from] io::Error), + #[error("rdma_get_devices returned invalid device count: {0}")] + InvalidDeviceCount(i32), + #[error("rdma_get_devices returned null context at index {0}")] + NullDeviceContext(usize), +} + impl Drop for EventChannel { fn drop(&mut self) { unsafe { @@ -637,6 +656,101 @@ impl Drop for Event { } } +struct RdmaDeviceList { + devices: NonNull<*mut ibv_context>, +} + +#[repr(C)] +struct SockAddrIb { + sib_family: libc::sa_family_t, + sib_pkey: u16, + sib_flowinfo: u32, + sib_addr: [u8; 16], + sib_sid: u64, + sib_sid_mask: u64, + sib_scope_id: u64, +} + +impl Drop for RdmaDeviceList { + fn drop(&mut self) { + unsafe { rdma_free_devices(self.devices.as_ptr()) }; + } +} + +fn cached_device_context(context: NonNull) -> Arc { + let mut guard = DEVICE_LISTS.lock().unwrap(); + guard + .entry(context.as_ptr() as usize) + .or_insert_with(|| Arc::new(DeviceContext { context })) + .clone() +} + +fn socket_addr_from_raw(addr: &libc::sockaddr) -> Option { + let len = match addr.sa_family as i32 { + libc::AF_INET => std::mem::size_of::(), + libc::AF_INET6 => std::mem::size_of::(), + _ => return None, + }; + + unsafe { OsSocketAddr::copy_from_raw(addr, len as libc::socklen_t).into_addr() } +} + +fn network_port_from_raw_addr(addr: &libc::sockaddr) -> u16 { + match addr.sa_family as i32 { + libc::AF_INET => { + let addr = unsafe { &*(std::ptr::from_ref(addr).cast::()) }; + addr.sin_port + }, + libc::AF_INET6 => { + let addr = unsafe { &*(std::ptr::from_ref(addr).cast::()) }; + addr.sin6_port + }, + libc::AF_IB => { + let addr = unsafe { &*(std::ptr::from_ref(addr).cast::()) }; + u16::to_be(u64::from_be(addr.sib_sid) as u16) + }, + _ => 0, + } +} + +/// Get a list of RDMA devices currently available. +/// +/// This wraps [`rdma_get_devices`], which returns a temporary array of +/// RDMA-CM-opened device contexts. The temporary array is released with +/// `rdma_free_devices`, while the returned [`DeviceContext`] handles are reused +/// through this module's global context cache. +/// +/// The cache is intentionally insertion-only. RDMA CM owns these opened +/// contexts, and dropping a [`DeviceContext`] closes its raw context. Keeping a +/// cached [`Arc`] alive prevents Rust from closing a context that librdmacm may +/// still manage and reuse internally. This mirrors [`Identifier::get_device_context`] +/// and does not attempt hot-unplug invalidation. +/// +/// [`rdma_get_devices`]: https://man7.org/linux/man-pages/man3/rdma_get_devices.3.html +pub fn get_devices() -> Result>, GetDevicesError> { + let mut num_devices = 0; + let devices = unsafe { rdma_get_devices(&mut num_devices) }; + + if devices.is_null() || num_devices == 0 { + return Ok(Vec::new()); + } + + let devices = RdmaDeviceList { + devices: unsafe { NonNull::new_unchecked(devices) }, + }; + let num_devices = usize::try_from(num_devices).map_err(|_| GetDevicesErrorKind::InvalidDeviceCount(num_devices))?; + let contexts = unsafe { std::slice::from_raw_parts(devices.devices.as_ptr(), num_devices) }; + contexts + .iter() + .enumerate() + .map(|(index, &context)| { + NonNull::new(context) + .map(cached_device_context) + .ok_or_else(|| GetDevicesErrorKind::NullDeviceContext(index).into()) + }) + .collect() +} + fn new_cm_id_for_raw(event_channel: Arc, raw: *mut rdma_cm_id) -> Arc { let cm = unsafe { Arc::new(Identifier { @@ -812,6 +926,36 @@ impl Identifier { unsafe { cm_id.as_ref().port_num } } + /// Get the local port number of a bound [`Identifier`] in network byte order. If the + /// [`Identifier`] is not bound to a port, the returned value is 0. + pub fn get_src_port(&self) -> u16 { + unsafe { network_port_from_raw_addr(rdma_get_local_addr(self.cm_id.as_ref())) } + } + + /// Get the remote port number of a bound [`Identifier`] in network byte order. If the + /// [`Identifier`] is not connected, the returned value is 0. + pub fn get_dst_port(&self) -> u16 { + unsafe { network_port_from_raw_addr(rdma_get_peer_addr(self.cm_id.as_ref())) } + } + + /// Get the local IP socket address of a bound [`Identifier`]. + /// + /// Returns [`None`] when the RDMA CM address is not representable as + /// [`SocketAddr`], for example when the underlying address family is + /// `AF_IB`, or when the [`Identifier`] is not bound to an address. + pub fn get_local_addr(&self) -> Option { + unsafe { socket_addr_from_raw(rdma_get_local_addr(self.cm_id.as_ref())) } + } + + /// Get the remote IP socket address of the [`Identifier`]. + /// + /// Returns [`None`] when the RDMA CM address is not representable as + /// [`SocketAddr`], for example when the underlying address family is + /// `AF_IB`, or when the [`Identifier`] is not connected. + pub fn get_peer_addr(&self) -> Option { + unsafe { socket_addr_from_raw(rdma_get_peer_addr(self.cm_id.as_ref())) } + } + /// Bind the [`Identifier`] to a specific address. Note that users shouldn't bind to a loopback /// address like `127.0.0.1`, or the connection would fail. /// @@ -927,19 +1071,8 @@ impl Identifier { let cm_id = self.cm_id; unsafe { - if (*cm_id.as_ptr()).verbs.is_null() { - return None; - } - - let mut guard = DEVICE_LISTS.lock().unwrap(); - let device_ctx = guard.entry((*cm_id.as_ptr()).verbs as usize).or_insert_with(|| { - Arc::new(DeviceContext { - // Safe due to the is_null() check above. - context: NonNull::new((*cm_id.as_ptr()).verbs).unwrap(), - }) - }); - - Some(device_ctx.clone()) + let context = NonNull::new((*cm_id.as_ptr()).verbs)?; + Some(cached_device_context(context)) } } @@ -1488,6 +1621,71 @@ mod tests { } } + #[test] + fn test_socket_addr_from_raw() { + let ipv4 = SocketAddr::from((std::net::Ipv4Addr::new(192, 0, 2, 1), 18515)); + let raw_ipv4 = OsSocketAddr::from(ipv4); + assert_eq!( + unsafe { socket_addr_from_raw(raw_ipv4.as_ptr().as_ref().unwrap()) }, + Some(ipv4) + ); + assert_eq!( + unsafe { network_port_from_raw_addr(raw_ipv4.as_ptr().as_ref().unwrap()) }, + 18515_u16.to_be() + ); + + let ipv6 = SocketAddr::from((std::net::Ipv6Addr::LOCALHOST, 18516)); + let raw_ipv6 = OsSocketAddr::from(ipv6); + assert_eq!( + unsafe { socket_addr_from_raw(raw_ipv6.as_ptr().as_ref().unwrap()) }, + Some(ipv6) + ); + assert_eq!( + unsafe { network_port_from_raw_addr(raw_ipv6.as_ptr().as_ref().unwrap()) }, + 18516_u16.to_be() + ); + + let ib = SockAddrIb { + sib_family: libc::AF_IB as _, + sib_pkey: 0, + sib_flowinfo: 0, + sib_addr: [0; 16], + sib_sid: u64::to_be(18517), + sib_sid_mask: 0, + sib_scope_id: 0, + }; + let ib_addr = unsafe { &*(std::ptr::from_ref(&ib).cast::()) }; + assert_eq!(socket_addr_from_raw(ib_addr), None); + assert_eq!(network_port_from_raw_addr(ib_addr), 18517_u16.to_be()); + + let unsupported = libc::sockaddr { + sa_family: libc::AF_UNIX as _, + sa_data: [0; 14], + }; + assert_eq!(socket_addr_from_raw(&unsupported), None); + assert_eq!(network_port_from_raw_addr(&unsupported), 0); + } + + #[test] + fn test_get_devices_smoke_and_cache() { + let devices = match get_devices() { + Ok(devices) => devices, + Err(err) => { + eprintln!("skipping RDMA CM get_devices smoke test: {err}"); + return; + }, + }; + + let second = get_devices().expect("second rdma_get_devices call should be consistent after first success"); + assert_eq!(devices.len(), second.len()); + for (first, second) in devices.iter().zip(second.iter()) { + assert!( + Arc::ptr_eq(first, second), + "rdma_get_devices contexts should reuse the global DeviceContext cache" + ); + } + } + #[test] fn test_connect_request_and_response_private_data() -> Result<(), Box> { match EventChannel::new() {