Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ description = "A file-backed row database engine built on B-Trees"
about = "A B-Tree storage engine."

[dependencies]
bincode = { version = "2.0.1", features = ["derive"] }
clap = { version = "4.5.37", features = ["derive"] }
env_logger = "0.11.8"
log = "0.4.27"
Expand Down
73 changes: 54 additions & 19 deletions src/bin/cryo_cli.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
use clap::Parser;
use std::{
error::Error,
fs::OpenOptions,
io::{self, BufRead, Write},
net::{SocketAddr, TcpStream},
path::PathBuf,
str::FromStr,
};

use cryo::{
Command,
storage::{StorageEngine, btree::BTree, pager::Pager},
protocol::{ProtocolTransport, Response},
statement::print_row,
storage::Row,
};

#[derive(Parser)]
#[command(version, about, long_about = None)]
struct Cli {
/// Path to storage directory
path: PathBuf,
/// Address of the Cryo server.
address: Option<SocketAddr>,
}

fn main() -> Result<(), Box<dyn Error>> {
// Initialize env_logger; For logging to STDOUT/STDERR
env_logger::init();

let cli = Cli::parse();
let mut stdio = io::stdin().lock();
let mut stdout = io::stdout().lock();
let cli = Cli::parse();

if !cli.path.is_dir() {
panic!("'{:?}' is not a directory", cli.path);
}
let store = cli.path.join("cryo.db");
let mut pager = Pager::open(store)?;
let mut btree = BTree::new(&mut pager);
let address = cli
.address
.unwrap_or(SocketAddr::from_str("127.0.0.1:8000")?);
let stream = TcpStream::connect(address)?;
let mut transport = ProtocolTransport::new(stream);

loop {
let mut s = String::default();
Expand All @@ -41,18 +45,49 @@ fn main() -> Result<(), Box<dyn Error>> {
stdio.read_line(&mut s)?;

match <&str as TryInto<Command>>::try_into(s.as_str()) {
Ok(cmd) => match cmd {
c if c == Command::Exit => {
btree.execute(c)?;
break;
Ok(cmd) => {
let mut path: Option<PathBuf> = None;
if let Command::Structure(out) = cmd.clone() {
path = out;
}
c => {
btree.execute(c)?;
if let Err(e) = transport.write_command(cmd) {
eprintln!("failed to write request: {e}");
continue;
}
},

match transport.read_response()? {
Response::Ok => {}
Response::Pong => println!("PONG"),
Response::Query { mut rows } => {
while !rows.is_empty() {
let row: Row = rows.as_slice().try_into()?;
rows.drain(0..row.as_bytes().len());

println!("{}", print_row(&row))
}
}
Response::Structure { out } => {
if let Some(path) = path {
let mut f = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(path)?;
f.write_all(out.as_bytes())?;
} else {
println!("Structure:\n{out}");
}
}
Response::Err { code, description } => {
eprintln!("error({code:?}): {description}")
}
Response::ConnectionClosed => {
println!("connection closed");
return Ok(());
}
}
}
Err(e) => eprintln!("error: {e}"),
}
}

Ok(())
}
22 changes: 22 additions & 0 deletions src/bin/cryo_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::{error::Error, net::SocketAddr, path::PathBuf};

use clap::Parser;
use cryo::protocol::StorageServer;

#[derive(Debug, Parser)]
struct Cli {
/// Path to storage directory
path: PathBuf,
/// Listen for new connection at address
address: SocketAddr,
}

fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();

let cli = Cli::parse();
let server = StorageServer::new(cli.address, cli.path.join("cryo.db"))?;

server.listen()?;
Ok(())
}
3 changes: 3 additions & 0 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub enum Command {
/// Requests the storage engine to write out a representation
/// of it's storage structure.
Structure(Option<PathBuf>),
/// Checks if the connect is still open.
Ping,
}

impl TryInto<Command> for &str {
Expand All @@ -78,6 +80,7 @@ impl TryInto<Command> for &str {
fn try_into(self) -> Result<Command, Self::Error> {
match self.trim() {
".exit" => Ok(Command::Exit),
".ping" => Ok(Command::Ping),
s if s.starts_with(".structure") => {
let parts = s.split(' ').collect::<Vec<&str>>();

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod command;
pub mod protocol;
pub mod statement;
pub mod storage;

Expand Down
38 changes: 38 additions & 0 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! Client-server communication protocol.
//!
//! This module defines the communication protocol used between Cryo clients and servers,
//! including message formats, encoding strategies, and transport abstractions. It provides
//! the foundational types and logic required to serialize, deserialize, and interpret
//! requests and responses over the network.
//!
//! # Overview
//!
//! The protocol layer is responsible for defining how structured queries and control
//! commands are exchanged between the client and the Cryo server. It ensures compatibility,
//! extensibility, and robustness of communication across distributed components.
//!
//! Messages are encoded using a binary format optimized for low-latency parsing and minimal
//! overhead. This module includes both low-level message definitions and higher-level
//! abstractions for sending and receiving protocol messages over a network stream.
//!
//! # Key Components
//!
//! - [`Request`]: Enum of all possible requests from a Client.
//! - [`Response`]: Enum of all possible responses from a Server.
//! - [`ProtocolTransport`]: Abstraction over a bidirectional transport (e.g., TCP, TLS) used to exchange messages.
//!
//! # See Also
//!
//! - [`storage`](crate::storage): Data layer that ultimately executes protocol-level queries.
mod request;
mod response;
mod server;
mod thread;
mod transport;

use thread::ThreadPool;

pub use request::Request;
pub use response::Response;
pub use server::StorageServer;
pub use transport::ProtocolTransport;
96 changes: 96 additions & 0 deletions src/protocol/request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use bincode::{Decode, Encode};

use crate::{Command, storage::Row};

#[derive(Debug, Encode, Decode, PartialEq, Eq)]
pub enum QueryKind {
Select,
Insert,
Delete,
Update,
}

#[derive(Debug, Encode, Decode, PartialEq, Eq)]
pub enum Request {
Query { kind: QueryKind, row: Vec<u8> },
CloseConnection,
Populate(usize),
PrintStructure,
Ping,
}

impl From<Command> for Request {
fn from(value: Command) -> Self {
match value {
Command::Statement(statement) => {
let kind = match statement {
crate::Statement::Insert { .. } => QueryKind::Insert,
crate::Statement::Update { .. } => QueryKind::Update,
crate::Statement::Select => QueryKind::Select,
crate::Statement::Delete { .. } => QueryKind::Delete,
};

let row: Row = statement.into();
Request::Query {
kind,
row: row.as_bytes(),
}
}
Command::Exit => Request::CloseConnection,
Command::Populate(i) => Request::Populate(i),
Command::Structure(_) => Request::PrintStructure,
Command::Ping => Request::Ping,
}
}
}

#[cfg(test)]
mod tests {
use crate::Statement;

use super::*;

#[test]
fn query_statement_command() {
let statement = Statement::Insert {
id: 0,
username: vec![],
email: vec![],
};
let command = Command::Statement(statement.clone());
let row: Row = statement.into();
let request: Request = command.into();

assert_eq!(
request,
Request::Query {
kind: QueryKind::Insert,
row: row.as_bytes()
}
)
}

#[test]
fn request_exit_command() {
let command = Command::Exit;
let request: Request = command.into();

assert_eq!(request, Request::CloseConnection)
}

#[test]
fn request_populate_command() {
let command = Command::Populate(1);
let request: Request = command.into();

assert_eq!(request, Request::Populate(1))
}

#[test]
fn request_structure_command() {
let command = Command::Structure(None);
let request: Request = command.into();

assert_eq!(request, Request::PrintStructure)
}
}
25 changes: 25 additions & 0 deletions src/protocol/response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use bincode::{Decode, Encode};

#[derive(Debug, Encode, Decode, PartialEq, Eq)]
pub enum Response {
Ok,
Pong,
Query {
rows: Vec<u8>,
},
Structure {
out: String,
},
Err {
code: ResponseError,
description: String,
},
ConnectionClosed,
}

#[derive(Debug, Encode, Decode, PartialEq, Eq)]
pub enum ResponseError {
Query,
Read,
Command,
}
Loading