From 0faebc67859d9b8b154a0ac34f285791a00cb57b Mon Sep 17 00:00:00 2001 From: Davis Muro Date: Fri, 2 May 2025 20:49:32 -0700 Subject: [PATCH 1/6] feat: add `bincode` to handle message serialization --- Cargo.lock | 33 +++++++++++++++++++++++++++++++++ Cargo.toml | 1 + 2 files changed, 34 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index f5a6033..cc6416c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,26 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "clap" version = "4.5.37" @@ -111,6 +131,7 @@ checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" name = "cryo" version = "0.1.0" dependencies = [ + "bincode", "clap", "env_logger", "log", @@ -388,12 +409,24 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "utf8parse" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index bb8aaac..a8c3ad3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ description = "A file-backed row database engine built on B-Trees" about = "A B-Tree storage engine." [dependencies] +bincode = { version = "2.0.1", features = ["derive"] } clap = { version = "4.5.37", features = ["derive"] } env_logger = "0.11.8" log = "0.4.27" From 8f65857dff9c5205605cf74287b3ca2da36ca7e4 Mon Sep 17 00:00:00 2001 From: Davis Muro Date: Fri, 2 May 2025 23:12:21 -0700 Subject: [PATCH 2/6] feat: add network protocol --- src/lib.rs | 1 + src/protocol/mod.rs | 55 +++++++++++++++ src/protocol/request.rs | 96 ++++++++++++++++++++++++++ src/protocol/response.rs | 25 +++++++ src/protocol/server.rs | 140 ++++++++++++++++++++++++++++++++++++++ src/protocol/thread.rs | 79 +++++++++++++++++++++ src/protocol/transport.rs | 85 +++++++++++++++++++++++ 7 files changed, 481 insertions(+) create mode 100644 src/protocol/mod.rs create mode 100644 src/protocol/request.rs create mode 100644 src/protocol/response.rs create mode 100644 src/protocol/server.rs create mode 100644 src/protocol/thread.rs create mode 100644 src/protocol/transport.rs diff --git a/src/lib.rs b/src/lib.rs index 6bf2c36..3e8998c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod command; +pub mod protocol; pub mod statement; pub mod storage; diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs new file mode 100644 index 0000000..569cc0e --- /dev/null +++ b/src/protocol/mod.rs @@ -0,0 +1,55 @@ +//! Client-server communication protocol. +//! +//! This module defines the communication protocol used between Cryo clients and servers, +//! including message formats, encoding strategies, and transport abstractions. It provides +//! the foundational types and logic required to serialize, deserialize, and interpret +//! requests and responses over the network. +//! +//! # Overview +//! +//! The protocol layer is responsible for defining how structured queries and control +//! commands are exchanged between the client and the Cryo server. It ensures compatibility, +//! extensibility, and robustness of communication across distributed components. +//! +//! Messages are encoded using a binary format optimized for low-latency parsing and minimal +//! overhead. This module includes both low-level message definitions and higher-level +//! abstractions for sending and receiving protocol messages over a network stream. +//! +//! # Key Components +//! +//! - [`Message`]: Structure representing a message from either a Client or Server. +//! - [`ProtocolTransport`]: Abstraction over a bidirectional transport (e.g., TCP, TLS) used to exchange messages. +//! +//! # Binary Format +//! +//! Protocol messages are serialized with a compact framing format: +//! +//! - Each message begins with a fixed-size header, containing the message type and payload length. +//! - The payload follows, encoded according to message-specific rules (e.g., row sets, error codes). +//! - All integers are little-endian by default. +//! +//! This format allows efficient streaming and multiplexing of messages over a single connection. +//! +//! # Extensibility +//! +//! The protocol is designed to be versioned and forward-compatible: +//! +//! - Message enums are tagged with discriminants for stable wire representation. +//! - Unknown or unsupported message types can be ignored or handled gracefully. +//! - Future versions can extend the format while preserving backward compatibility. +//! +//! # See Also +//! +//! - [`storage`](crate::storage): Data layer that ultimately executes protocol-level queries. +mod request; +mod response; +mod server; +mod thread; +mod transport; + +use thread::ThreadPool; + +pub use request::Request; +pub use response::Response; +pub use server::StorageServer; +pub use transport::ProtocolTransport; diff --git a/src/protocol/request.rs b/src/protocol/request.rs new file mode 100644 index 0000000..4b1271a --- /dev/null +++ b/src/protocol/request.rs @@ -0,0 +1,96 @@ +use bincode::{Decode, Encode}; + +use crate::{Command, storage::Row}; + +#[derive(Debug, Encode, Decode, PartialEq, Eq)] +pub enum QueryKind { + Select, + Insert, + Delete, + Update, +} + +#[derive(Debug, Encode, Decode, PartialEq, Eq)] +pub enum Request { + Query { kind: QueryKind, row: Vec }, + CloseConnection, + Populate(usize), + PrintStructure, + Ping, +} + +impl From for Request { + fn from(value: Command) -> Self { + match value { + Command::Statement(statement) => { + let kind = match statement { + crate::Statement::Insert { .. } => QueryKind::Insert, + crate::Statement::Update { .. } => QueryKind::Update, + crate::Statement::Select => QueryKind::Select, + crate::Statement::Delete { .. } => QueryKind::Delete, + }; + + let row: Row = statement.into(); + Request::Query { + kind, + row: row.as_bytes(), + } + } + Command::Exit => Request::CloseConnection, + Command::Populate(i) => Request::Populate(i), + Command::Structure(_) => Request::PrintStructure, + Command::Ping => Request::Ping, + } + } +} + +#[cfg(test)] +mod tests { + use crate::Statement; + + use super::*; + + #[test] + fn query_statement_command() { + let statement = Statement::Insert { + id: 0, + username: vec![], + email: vec![], + }; + let command = Command::Statement(statement.clone()); + let row: Row = statement.into(); + let request: Request = command.into(); + + assert_eq!( + request, + Request::Query { + kind: QueryKind::Insert, + row: row.as_bytes() + } + ) + } + + #[test] + fn request_exit_command() { + let command = Command::Exit; + let request: Request = command.into(); + + assert_eq!(request, Request::CloseConnection) + } + + #[test] + fn request_populate_command() { + let command = Command::Populate(1); + let request: Request = command.into(); + + assert_eq!(request, Request::Populate(1)) + } + + #[test] + fn request_structure_command() { + let command = Command::Structure(None); + let request: Request = command.into(); + + assert_eq!(request, Request::PrintStructure) + } +} diff --git a/src/protocol/response.rs b/src/protocol/response.rs new file mode 100644 index 0000000..fb63c21 --- /dev/null +++ b/src/protocol/response.rs @@ -0,0 +1,25 @@ +use bincode::{Decode, Encode}; + +#[derive(Debug, Encode, Decode, PartialEq, Eq)] +pub enum Response { + Ok, + Pong, + Query { + rows: Vec, + }, + Structure { + out: String, + }, + Err { + code: ResponseError, + description: String, + }, + ConnectionClosed, +} + +#[derive(Debug, Encode, Decode, PartialEq, Eq)] +pub enum ResponseError { + Query, + Read, + Command, +} diff --git a/src/protocol/server.rs b/src/protocol/server.rs new file mode 100644 index 0000000..b8c040f --- /dev/null +++ b/src/protocol/server.rs @@ -0,0 +1,140 @@ +use std::{ + error::Error, + net::{SocketAddr, TcpListener, TcpStream}, + path::PathBuf, + sync::{Arc, Mutex}, +}; + +use log::{info, warn}; + +use crate::{ + Command, + protocol::{ProtocolTransport, Request, Response, response::ResponseError}, + storage::{Row, StorageEngine, btree::BTree, pager::Pager}, +}; + +use super::{thread::ThreadPool, transport::TransportError}; + +#[derive(Debug)] +pub struct StorageServer { + address: SocketAddr, + pager: Arc>, + pool: ThreadPool, +} + +impl StorageServer { + pub fn new(address: SocketAddr, path: PathBuf) -> Result> { + Ok(Self { + pager: Arc::new(Mutex::new(Pager::open(path)?)), + address, + pool: ThreadPool::new(15), + }) + } + + pub fn listen(self) -> Result<(), TransportError> { + info!("listening at {}", self.address); + let listener = TcpListener::bind(self.address)?; + + for stream in listener.incoming() { + match stream { + Ok(stream) => { + let handle = Arc::clone(&self.pager); + self.pool.execute(move || { + handle_connection(stream, handle).expect("failed to handle connection") + }); + } + Err(e) => warn!("broken connection: {e:?}"), + } + } + Ok(()) + } +} + +fn handle_connection(stream: TcpStream, pager: Arc>) -> Result<(), TransportError> { + let mut transport = ProtocolTransport::new(stream); + + loop { + let req = transport.read_request()?; + info!("received request: {req:?}"); + + let resp = match req { + Request::CloseConnection => { + transport.write_response(Response::ConnectionClosed)?; + return Ok(()); + } + Request::PrintStructure => { + let mut pager = pager.lock().unwrap(); + let mut btree = BTree::new(&mut pager); + + match btree.structure() { + Ok(structure) => Response::Structure { out: structure }, + Err(e) => Response::Err { + code: ResponseError::Command, + description: e.to_string(), + }, + } + } + Request::Populate(size) => { + let mut pager = pager.lock().unwrap(); + let mut btree = BTree::new(&mut pager); + + match btree.execute(Command::Populate(size)) { + Ok(_) => Response::Ok, + Err(e) => Response::Err { + code: ResponseError::Command, + description: e.to_string(), + }, + } + } + Request::Ping => Response::Pong, + Request::Query { kind, row } => { + let res: Result = row.as_slice().try_into(); + let mut pager = pager.lock().unwrap(); + let mut btree = BTree::new(&mut pager); + + match res { + Ok(row) => match kind { + crate::protocol::request::QueryKind::Select => match btree.select() { + Ok(rows) => { + let rows = + rows.iter().flat_map(|r| r.as_bytes()).collect::>(); + Response::Query { rows } + } + Err(e) => Response::Err { + code: ResponseError::Query, + description: e.to_string(), + }, + }, + crate::protocol::request::QueryKind::Insert => match btree.insert(row) { + Ok(_) => Response::Ok, + Err(e) => Response::Err { + code: ResponseError::Query, + description: e.to_string(), + }, + }, + crate::protocol::request::QueryKind::Delete => match btree.delete(row) { + Ok(_) => Response::Ok, + Err(e) => Response::Err { + code: ResponseError::Query, + description: e.to_string(), + }, + }, + crate::protocol::request::QueryKind::Update => match btree.update(row) { + Ok(_) => Response::Ok, + Err(e) => Response::Err { + code: ResponseError::Query, + description: e.to_string(), + }, + }, + }, + Err(e) => Response::Err { + code: ResponseError::Read, + description: e.to_string(), + }, + } + } + }; + + transport.write_response(resp)?; + } +} diff --git a/src/protocol/thread.rs b/src/protocol/thread.rs new file mode 100644 index 0000000..97b0c54 --- /dev/null +++ b/src/protocol/thread.rs @@ -0,0 +1,79 @@ +use std::{ + sync::{Arc, Mutex, mpsc}, + thread, +}; + +use log::debug; + +pub type Job = Box; + +#[derive(Debug)] +pub struct ThreadPool { + workers: Vec, + sender: Option>, +} + +impl ThreadPool { + pub fn new(size: usize) -> Self { + assert!(size > 0); + + let mut workers = Vec::with_capacity(size); + let (sender, receiver) = mpsc::channel(); + + let receiver = Arc::new(Mutex::new(receiver)); + let sender = Some(sender); + + for i in 0..size { + workers.push(Worker::new(i, Arc::clone(&receiver))); + } + + Self { workers, sender } + } + + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + self.sender.as_ref().unwrap().send(job).unwrap(); + } +} + +impl Drop for ThreadPool { + fn drop(&mut self) { + drop(self.sender.take()); + + for worker in self.workers.drain(..) { + println!("Shutting down worker {}", worker.id); + + worker.thread.join().unwrap(); + } + } +} + +#[derive(Debug)] +struct Worker { + id: usize, + thread: thread::JoinHandle<()>, +} + +impl Worker { + pub fn new(id: usize, receiver: Arc>>) -> Self { + let thread = thread::spawn(move || { + loop { + let msg = receiver.lock().unwrap().recv(); + match msg { + Ok(job) => { + debug!("worker {id} handling a connection!"); + job(); + } + Err(_) => { + debug!("worker {id} disconnected!"); + } + } + } + }); + + Self { id, thread } + } +} diff --git a/src/protocol/transport.rs b/src/protocol/transport.rs new file mode 100644 index 0000000..16bd8c2 --- /dev/null +++ b/src/protocol/transport.rs @@ -0,0 +1,85 @@ +use std::io::{self, Read, Write}; + +use bincode::{ + config::{BigEndian, Configuration, Fixint}, + decode_from_std_read, encode_into_std_write, +}; +use thiserror::Error; + +use crate::Command; + +use super::{Request, Response}; + +#[derive(Debug, Error)] +pub enum TransportError { + #[error("failed to encode message: {0}")] + Serialize(#[from] bincode::error::EncodeError), + #[error("failed to decode message: {0}")] + Deserialize(#[from] bincode::error::DecodeError), + #[error("Transport IO Error: {0}")] + Io(#[from] io::Error), +} + +pub struct ProtocolTransport { + stream: T, + config: Configuration, +} + +impl ProtocolTransport { + pub fn new(stream: T) -> Self { + let config = bincode::config::standard() + .with_big_endian() + .with_fixed_int_encoding(); + Self { stream, config } + } + + pub fn write_command(&mut self, command: Command) -> Result<(), TransportError> { + let req: Request = command.into(); + encode_into_std_write(req, &mut self.stream, self.config)?; + Ok(()) + } + + pub fn write_response(&mut self, resp: Response) -> Result<(), TransportError> { + encode_into_std_write(resp, &mut self.stream, self.config)?; + Ok(()) + } + + pub fn read_response(&mut self) -> Result { + let resp: Response = decode_from_std_read(&mut self.stream, self.config)?; + Ok(resp) + } + + pub fn read_request(&mut self) -> Result { + let req: Request = decode_from_std_read(&mut self.stream, self.config)?; + Ok(req) + } +} + +#[cfg(test)] +mod tests { + use std::io::{Cursor, Seek}; + + use super::*; + + #[test] + fn read_write_request() { + let stream = Cursor::new(Vec::new()); + let mut transport = ProtocolTransport::new(stream); + + transport.write_command(Command::Exit).unwrap(); + transport.stream.seek(std::io::SeekFrom::Start(0)).unwrap(); + let req = transport.read_request().unwrap(); + assert_eq!(req, Request::CloseConnection); + } + + #[test] + fn read_write_response() { + let stream = Cursor::new(Vec::new()); + let mut transport = ProtocolTransport::new(stream); + + transport.write_response(Response::Pong).unwrap(); + transport.stream.seek(std::io::SeekFrom::Start(0)).unwrap(); + let resp = transport.read_response().unwrap(); + assert_eq!(resp, Response::Pong); + } +} From 9141a1f3ac2c45c001ef1e0d0a2d1fcd4c0e377f Mon Sep 17 00:00:00 2001 From: Davis Muro Date: Fri, 2 May 2025 23:12:38 -0700 Subject: [PATCH 3/6] feat: add `Command::Ping` --- src/command.rs | 3 +++ src/storage/btree.rs | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/command.rs b/src/command.rs index df42928..c131af8 100644 --- a/src/command.rs +++ b/src/command.rs @@ -70,6 +70,8 @@ pub enum Command { /// Requests the storage engine to write out a representation /// of it's storage structure. Structure(Option), + /// Checks if the connect is still open. + Ping, } impl TryInto for &str { @@ -78,6 +80,7 @@ impl TryInto for &str { fn try_into(self) -> Result { match self.trim() { ".exit" => Ok(Command::Exit), + ".ping" => Ok(Command::Ping), s if s.starts_with(".structure") => { let parts = s.split(' ').collect::>(); diff --git a/src/storage/btree.rs b/src/storage/btree.rs index 34e0e41..fb9228c 100644 --- a/src/storage/btree.rs +++ b/src/storage/btree.rs @@ -105,7 +105,7 @@ impl StorageEngine for BTree<'_> { self.insert(row)?; } } - Command::Exit => {} + Command::Exit | Command::Ping => {} } Ok(()) From d4618b756babae90d6cde712fafab818904a2611 Mon Sep 17 00:00:00 2001 From: Davis Muro Date: Fri, 2 May 2025 23:12:51 -0700 Subject: [PATCH 4/6] refactor: switch to a client-server approach --- src/bin/cryo_cli.rs | 73 +++++++++++++++++++++++++++++++----------- src/bin/cryo_server.rs | 22 +++++++++++++ 2 files changed, 76 insertions(+), 19 deletions(-) create mode 100644 src/bin/cryo_server.rs diff --git a/src/bin/cryo_cli.rs b/src/bin/cryo_cli.rs index e0b078b..fc5a936 100644 --- a/src/bin/cryo_cli.rs +++ b/src/bin/cryo_cli.rs @@ -1,36 +1,40 @@ use clap::Parser; use std::{ error::Error, + fs::OpenOptions, io::{self, BufRead, Write}, + net::{SocketAddr, TcpStream}, path::PathBuf, + str::FromStr, }; use cryo::{ Command, - storage::{StorageEngine, btree::BTree, pager::Pager}, + protocol::{ProtocolTransport, Response}, + statement::print_row, + storage::Row, }; #[derive(Parser)] #[command(version, about, long_about = None)] struct Cli { - /// Path to storage directory - path: PathBuf, + /// Address of the Cryo server. + address: Option, } fn main() -> Result<(), Box> { // Initialize env_logger; For logging to STDOUT/STDERR env_logger::init(); - let cli = Cli::parse(); let mut stdio = io::stdin().lock(); let mut stdout = io::stdout().lock(); + let cli = Cli::parse(); - if !cli.path.is_dir() { - panic!("'{:?}' is not a directory", cli.path); - } - let store = cli.path.join("cryo.db"); - let mut pager = Pager::open(store)?; - let mut btree = BTree::new(&mut pager); + let address = cli + .address + .unwrap_or(SocketAddr::from_str("127.0.0.1:8000")?); + let stream = TcpStream::connect(address)?; + let mut transport = ProtocolTransport::new(stream); loop { let mut s = String::default(); @@ -41,18 +45,49 @@ fn main() -> Result<(), Box> { stdio.read_line(&mut s)?; match <&str as TryInto>::try_into(s.as_str()) { - Ok(cmd) => match cmd { - c if c == Command::Exit => { - btree.execute(c)?; - break; + Ok(cmd) => { + let mut path: Option = None; + if let Command::Structure(out) = cmd.clone() { + path = out; } - c => { - btree.execute(c)?; + if let Err(e) = transport.write_command(cmd) { + eprintln!("failed to write request: {e}"); + continue; } - }, + + match transport.read_response()? { + Response::Ok => {} + Response::Pong => println!("PONG"), + Response::Query { mut rows } => { + while !rows.is_empty() { + let row: Row = rows.as_slice().try_into()?; + rows.drain(0..row.as_bytes().len()); + + println!("{}", print_row(&row)) + } + } + Response::Structure { out } => { + if let Some(path) = path { + let mut f = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(path)?; + f.write_all(out.as_bytes())?; + } else { + println!("Structure:\n{out}"); + } + } + Response::Err { code, description } => { + eprintln!("error({code:?}): {description}") + } + Response::ConnectionClosed => { + println!("connection closed"); + return Ok(()); + } + } + } Err(e) => eprintln!("error: {e}"), } } - - Ok(()) } diff --git a/src/bin/cryo_server.rs b/src/bin/cryo_server.rs new file mode 100644 index 0000000..f040381 --- /dev/null +++ b/src/bin/cryo_server.rs @@ -0,0 +1,22 @@ +use std::{error::Error, net::SocketAddr, path::PathBuf}; + +use clap::Parser; +use cryo::protocol::StorageServer; + +#[derive(Debug, Parser)] +struct Cli { + /// Path to storage directory + path: PathBuf, + /// Listen for new connection at address + address: SocketAddr, +} + +fn main() -> Result<(), Box> { + env_logger::init(); + + let cli = Cli::parse(); + let server = StorageServer::new(cli.address, cli.path.join("cryo.db"))?; + + server.listen()?; + Ok(()) +} From dc43716fed8d34e09a3dc8683b76377c1d22d97f Mon Sep 17 00:00:00 2001 From: Davis Muro Date: Fri, 2 May 2025 23:13:43 -0700 Subject: [PATCH 5/6] chore: use import from super --- src/protocol/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/protocol/server.rs b/src/protocol/server.rs index b8c040f..87af67c 100644 --- a/src/protocol/server.rs +++ b/src/protocol/server.rs @@ -13,7 +13,7 @@ use crate::{ storage::{Row, StorageEngine, btree::BTree, pager::Pager}, }; -use super::{thread::ThreadPool, transport::TransportError}; +use super::{ThreadPool, transport::TransportError}; #[derive(Debug)] pub struct StorageServer { From 666d6e7177af63355d798f95066ed894c7febc5f Mon Sep 17 00:00:00 2001 From: Davis Muro Date: Fri, 2 May 2025 23:29:56 -0700 Subject: [PATCH 6/6] doc: update documentation on protocol module --- src/protocol/mod.rs | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 569cc0e..7b92a0d 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -17,27 +17,10 @@ //! //! # Key Components //! -//! - [`Message`]: Structure representing a message from either a Client or Server. +//! - [`Request`]: Enum of all possible requests from a Client. +//! - [`Response`]: Enum of all possible responses from a Server. //! - [`ProtocolTransport`]: Abstraction over a bidirectional transport (e.g., TCP, TLS) used to exchange messages. //! -//! # Binary Format -//! -//! Protocol messages are serialized with a compact framing format: -//! -//! - Each message begins with a fixed-size header, containing the message type and payload length. -//! - The payload follows, encoded according to message-specific rules (e.g., row sets, error codes). -//! - All integers are little-endian by default. -//! -//! This format allows efficient streaming and multiplexing of messages over a single connection. -//! -//! # Extensibility -//! -//! The protocol is designed to be versioned and forward-compatible: -//! -//! - Message enums are tagged with discriminants for stable wire representation. -//! - Unknown or unsupported message types can be ignored or handled gracefully. -//! - Future versions can extend the format while preserving backward compatibility. -//! //! # See Also //! //! - [`storage`](crate::storage): Data layer that ultimately executes protocol-level queries.