Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .claude/rules/broadcast.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Every notification is wrapped in a versioned envelope: `{ v, seq, type, createdA
|---|---|---|---|
| `ChatMessage { message, room_preview_text, sender }` | all room members | new message (`sender: RoomMember`) | no |
| `RoomChangeEvent { message, room_preview_text }` | all room members | join/leave/invite | no |
| `NewRoom { room, created_by }` | invited user | room creation / invite | no |
| `NewRoom { room, created_by, first_message }` | invited user | room creation / invite (`first_message`: optional, embedded on creation) | no |
| `LeaveRoom { room_id }` | leaving user | user leaves room | no |
| `FriendRequestReceived { from_user }` | target user | friend request sent | no |
| `FriendRequestAccepted { from_user }` | requester | request accepted | no |
Expand Down
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ BroadcastChannel::get().unsubscribe(user_id).await;
|---|---|---|
| `ChatMessage { message, room_preview_text, sender }` | all room members | new message (`sender: RoomMember` so clients render a first-time sender without a lookup) |
| `RoomChangeEvent { message, room_preview_text }` | all room members | join/leave/invite |
| `NewRoom { room, created_by }` | invited user | room creation / invite |
| `NewRoom { room, created_by, first_message }` | invited user | room creation / invite (`first_message`: optional first message, embedded on creation) |
| `LeaveRoom { room_id }` | leaving user | user leaves room |
| `FriendRequestReceived { from_user }` | target user | friend request sent |
| `FriendRequestAccepted { from_user }` | requester | request accepted |
Expand Down
11 changes: 9 additions & 2 deletions src/broadcast/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,17 @@ pub enum NotificationEvent {
SystemMessage { message: serde_json::Value },

/**
* Sending this event to a newly invited user
* Sending this event to a newly invited user. `first_message` carries the optional
* first message the room was created with (authored by `created_by`), so the client
* can render it immediately without a separate timeline fetch. `None` for invites
* and rooms created without a first message.
*/
#[serde(rename_all = "camelCase")]
NewRoom { room: ChatRoomDto, created_by: User },
NewRoom {
room: ChatRoomDto,
created_by: User,
first_message: Option<MessageDto>,
},

/**
* Sending this event to a user who has left a room
Expand Down
20 changes: 20 additions & 0 deletions src/messaging/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,26 @@ impl Validate for NewMessageBody {
}
}

/// Body of the optional first message that can be sent together with a new room.
/// A brand-new room has no prior messages, so a `Reply` is impossible here — only
/// `Text` and `Media` (a link to a post) are valid. `chat_room_id` is intentionally
/// absent: the room id does not exist until the room has been created.
#[derive(Deserialize, Serialize, Debug, Clone)]
#[serde(untagged)]
pub enum FirstMessageBody {
Text(TextBody),
Media(MediaBody),
}

impl Validate for FirstMessageBody {
fn validate(&self) -> Result<(), validator::ValidationErrors> {
match self {
FirstMessageBody::Text(body) => body.validate(),
FirstMessageBody::Media(body) => body.validate(),
}
}
}

#[derive(Deserialize, Serialize, Debug, Clone, Validate)]
#[serde(rename_all = "camelCase")]
pub struct NewReplyBody {
Expand Down
21 changes: 21 additions & 0 deletions src/rooms/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::rooms::room::{
};
use crate::rooms::room_member::RoomMember;
use crate::rooms::room_service::RoomService;
use crate::rooms::share_service::{ShareService, ShareTarget, ShareTargetCursor};
use crate::rooms::timeline_service::TimelineService;
use crate::users::user_service::UserService;
use crate::utils::check_user_in_room;
Expand All @@ -21,6 +22,7 @@ use serde::Deserialize;
use std::collections::HashSet;
use std::sync::Arc;
use uuid::Uuid;
use validator::Validate;

#[derive(Deserialize, Debug)]
pub struct RoomSearchQueryParam {
Expand Down Expand Up @@ -76,6 +78,21 @@ pub async fn handle_get_joined_rooms(
Ok(Json(rooms))
}

pub async fn handle_get_share_targets(
State(state): State<Arc<AppState>>,
Extension(token): Extension<KeycloakToken<String>>,
Query(params): Query<RoomListQueryParams>,
) -> Result<Json<CursorResults<ShareTarget>>, AppError> {
let cursor: ShareTargetCursor = decode_cursor(params.cursor)
.map_err(|_| AppError::Validation("Invalid Cursor-Parameters.".to_string()))?;
let page_size = clamp_page_size(params.limit);

let targets =
ShareService::get_share_targets(state, token.subject, params.name, cursor, page_size)
.await?;
Ok(Json(targets))
}

pub async fn handle_get_room_with_details(
State(state): State<Arc<AppState>>,
Extension(token): Extension<KeycloakToken<String>>,
Expand Down Expand Up @@ -105,6 +122,10 @@ pub async fn handle_create_room(
));
}

if let Some(first_message) = &payload.first_message {
first_message.validate().map_err(AppError::from)?;
}

//filter out all users that have an ignore-relationship with the sender
let ignored =
UserService::get_blocked_users(state.clone(), &token.subject, &payload.invited_users)
Expand Down
1 change: 1 addition & 0 deletions src/rooms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub mod room_member;
pub mod room_repository;
pub mod room_service;
pub mod routes;
pub mod share_service;
mod timeline_service;
6 changes: 6 additions & 0 deletions src/rooms/room.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::messaging::model::FirstMessageBody;
use crate::rooms::room_member::RoomMember;
use crate::utils::truncate_and_serialize;
use chrono::prelude::*;
Expand Down Expand Up @@ -99,6 +100,11 @@ pub struct NewRoom {
pub room_type: RoomType,
pub room_name: Option<String>,
pub invited_users: Vec<Uuid>,
/// Optional first message sent together with the room. Only `Text` or `Media`
/// (a link to a post) — never a `Reply`, since the room starts empty. Embedded
/// into the `NewRoom` broadcast event so recipients render it without a lookup.
#[serde(default)]
pub first_message: Option<FirstMessageBody>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
Expand Down
152 changes: 143 additions & 9 deletions src/rooms/room_repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::rooms::room::{
ChatRoomEntity, LastMessagePreviewText, NewRoom, RoomPaginationCursor, RoomType,
};
use crate::rooms::room_member::RoomMember;
use crate::rooms::share_service::{ActiveShareRow, InactiveShareRow};
use chrono::{DateTime, Utc};
use sqlx::types::Json;
use sqlx::{Error, PgConnection, Pool, Postgres, QueryBuilder, Transaction};
Expand Down Expand Up @@ -117,6 +118,136 @@ impl RoomRepository {
Ok(rooms)
}

/// *Active* section of the share-target list: group rooms the client is in, plus
/// friends with whom an 1-1 room already exists, merged and ordered by recent
/// activity (`active_at DESC`, `room_id` tie-breaker). Friends without a 1-1 room
/// are excluded here — they belong to the inactive section (`inactive_share_targets`),
/// so the two halves of the friend set never overlap.
///
/// Optional case-insensitive name filter (friend `raw_name`, group `room_name`).
/// Keyset over `(active_at, room_id)`; callers pass `limit = page_size + 1`.
///
/// Runtime query (not the `query_as!` macro) because of the optional cursor/name
/// binds — consistent with `get_joined_rooms` and the `UserRepository` queries.
pub async fn active_share_targets(
&self,
client_id: &Uuid,
name_filter: Option<&str>,
cursor_active_at: Option<DateTime<Utc>>,
cursor_id: Option<Uuid>,
limit: i64,
) -> Result<Vec<ActiveShareRow>, sqlx::Error> {
let rows = sqlx::query_as::<_, ActiveShareRow>(
r#"
SELECT name, room_id, image_url, active_at, is_group, user_id
FROM (
-- Friends with an existing 1-1 room (the share target is that room).
SELECT
u.display_name AS name,
sr.room_id AS room_id,
u.profile_picture AS image_url,
sr.active_at AS active_at,
false AS is_group,
u.id AS user_id
FROM app_user u
JOIN user_relationship rl
ON u.id = CASE
WHEN rl.user_a_id = $1 THEN rl.user_b_id
WHEN rl.user_b_id = $1 THEN rl.user_a_id
END
AND rl.state = 'FRIEND'
JOIN LATERAL (
SELECT r.id AS room_id,
COALESCE(r.latest_message, r.created_at) AS active_at
FROM chat_room r
JOIN chat_room_participant p1 ON p1.room_id = r.id AND p1.user_id = $1
JOIN chat_room_participant p2 ON p2.room_id = r.id AND p2.user_id = u.id
WHERE r.room_type = 'Single'
LIMIT 1
) sr ON true
WHERE ($2::text IS NULL OR u.raw_name LIKE lower(concat('%', $2, '%')))

UNION ALL

-- Group rooms the client is a member of.
SELECT
r.room_name AS name,
r.id AS room_id,
r.room_image_url AS image_url,
COALESCE(r.latest_message, r.created_at) AS active_at,
true AS is_group,
NULL::uuid AS user_id
FROM chat_room r
JOIN chat_room_participant p ON p.room_id = r.id AND p.user_id = $1
WHERE r.room_type = 'Group'
AND ($2::text IS NULL OR r.room_name ILIKE concat('%', $2, '%'))
) AS merged
WHERE (
$3::timestamptz IS NULL
OR active_at < $3
OR (active_at = $3 AND room_id < $4)
)
ORDER BY active_at DESC, room_id DESC
LIMIT $5
"#,
)
.bind(client_id)
.bind(name_filter)
.bind(cursor_active_at)
.bind(cursor_id)
.bind(limit)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}

/// *Inactive* section of the share-target list: friends the client has no 1-1 room
/// with yet (sharing requires creating the room first). Ordered alphabetically
/// (`display_name ASC`, `id` tie-breaker), keyset over `(display_name, id)`.
///
/// The `NOT EXISTS` is the exact complement of the 1-1-room join in
/// `active_share_targets`, so every friend appears in exactly one of the two sections.
pub async fn inactive_share_targets(
&self,
client_id: &Uuid,
name_filter: Option<&str>,
cursor_name: Option<String>,
cursor_id: Option<Uuid>,
limit: i64,
) -> Result<Vec<InactiveShareRow>, sqlx::Error> {
let rows = sqlx::query_as::<_, InactiveShareRow>(
r#"
SELECT u.display_name AS name, u.id AS user_id, u.profile_picture AS image_url
FROM app_user u
JOIN user_relationship rl
ON u.id = CASE
WHEN rl.user_a_id = $1 THEN rl.user_b_id
WHEN rl.user_b_id = $1 THEN rl.user_a_id
END
AND rl.state = 'FRIEND'
WHERE ($2::text IS NULL OR u.raw_name LIKE lower(concat('%', $2, '%')))
AND NOT EXISTS (
SELECT 1
FROM chat_room r
JOIN chat_room_participant p1 ON p1.room_id = r.id AND p1.user_id = $1
JOIN chat_room_participant p2 ON p2.room_id = r.id AND p2.user_id = u.id
WHERE r.room_type = 'Single'
)
AND ($3::text IS NULL OR (u.display_name, u.id) > ($3, $4))
ORDER BY u.display_name ASC, u.id ASC
LIMIT $5
"#,
)
.bind(client_id)
.bind(name_filter)
.bind(cursor_name)
.bind(cursor_id)
.bind(limit)
.fetch_all(&self.pool)
.await?;
Ok(rows)
}

pub async fn delete_room(
&self,
conn: &mut PgConnection,
Expand Down Expand Up @@ -178,21 +309,25 @@ impl RoomRepository {
Ok(room)
}

pub async fn insert_room(&self, new_room: NewRoom) -> Result<ChatRoomEntity, sqlx::Error> {
/// Inserts the room row and its participants on the given connection. The caller
/// owns the transaction so room creation can be made atomic together with an
/// optional first message (see `RoomService::create_room`).
pub async fn insert_room(
&self,
conn: &mut PgConnection,
new_room: &NewRoom,
) -> Result<ChatRoomEntity, sqlx::Error> {
let room_entity = ChatRoomEntity {
id: Uuid::new_v4(),
room_type: new_room.room_type,
room_name: new_room.room_name,
room_type: new_room.room_type.clone(),
room_name: new_room.room_name.clone(),
room_image_url: None,
created_at: Utc::now(),
latest_message: Some(Utc::now()),
latest_message_preview_text: Some(Json(LastMessagePreviewText::New)),
unread: None,
};

//https://docs.rs/sqlx/latest/sqlx/struct.Transaction.html
let mut tx = self.pool.begin().await?;

let room = sqlx::query_as!(
ChatRoomEntity,
r#"
Expand All @@ -206,7 +341,7 @@ impl RoomRepository {
room_entity.created_at,
room_entity.latest_message,
room_entity.latest_message_preview_text as Option<Json<LastMessagePreviewText>>
).fetch_one(&mut *tx).await?;
).fetch_one(&mut *conn).await?;

//https://docs.rs/sqlx-core/0.5.13/sqlx_core/query_builder/struct.QueryBuilder.html#method.push_values
let mut builder: QueryBuilder<Postgres> =
Expand All @@ -216,10 +351,9 @@ impl RoomRepository {
db.push_bind(user).push_bind(&room.id).push_bind(Utc::now());
})
.build()
.fetch_all(&mut *tx)
.execute(&mut *conn)
.await?;

tx.commit().await?;
Ok(room)
}

Expand Down
Loading