diff --git a/Cargo.lock b/Cargo.lock index d646d98..a4223ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -231,6 +231,7 @@ dependencies = [ "diesel-async", "fedichat", "quinn", + "rmp-serde", "serde", "thiserror 2.0.18", "tokio", @@ -486,9 +487,8 @@ dependencies = [ [[package]] name = "fedichat" version = "0.1.0" -source = "git+https://git.firechicken.net/fedichat/fedichat-lib#ba358085728410eb76681dc675a126871b5bd5da" +source = "git+https://git.firechicken.net/fedichat/fedichat-lib#a3f54705495d9aa54ffe80501e1f68876e3e9480" dependencies = [ - "rmp-serde", "serde", "serde_bytes", "time", @@ -2248,9 +2248,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" [[package]] name = "winnow" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ee1708bef14716a11bae175f579062d4554d95be2c6829f518df847b7b3fdd0" +checksum = "0592e1c9d151f854e6fd382574c3a0855250e1d9b2f99d9281c6e6391af352f1" [[package]] name = "wit-bindgen" diff --git a/Cargo.toml b/Cargo.toml index 40333b9..26a98c6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2024" diesel = { version = "2.3.9" } quinn = "0.11.9" fedichat = {git = "https://git.firechicken.net/fedichat/fedichat-lib"} +#fedichat = {path = "../fedichat-lib"} tracing = "0.1.44" tracing-subscriber = "0.3.23" clap = { version = "4.6.1", features = ["derive"] } @@ -16,3 +17,4 @@ thiserror = "2.0.18" toml = "1.1.2" diesel-async = { version = "0.9.0", features = ["postgres","deadpool"] } ctrlc-async = { version = "3.2.2", features = ["termination"] } +rmp-serde = "1.3.1" diff --git a/confetti.toml.example b/confetti.toml.example index 2716108..7253108 100644 --- a/confetti.toml.example +++ b/confetti.toml.example @@ -7,6 +7,9 @@ keyfile = "/etc/letsencrypt/confetti/privkey.pem" media_directory = "/srv/confetti/media" statefile = "./confetti.state" loglevel = "debug" +max_message_len_kb = 10000 +# Optional +# account_creation_code = "password1" [database] url = "my.db.server" diff --git a/src/client.rs b/src/client.rs index aafa07b..e2fc3f1 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,19 +1,24 @@ use fedichat::RoomId; -use fedichat::client::TaggedClientMessage; +use fedichat::message::{Relevance,TaggedMessage}; +use fedichat::client::{ClientMessage,SignedClientMessage,ServerMessage,ServerError}; use fedichat::state::StatePath; use diesel_async::AsyncPgConnection; use diesel_async::pooled_connection::deadpool::Pool; -use tokio::sync::{broadcast,mpsc,RwLock}; -use tokio::select; +use rmp_serde; use std::collections::HashSet; use std::sync::{Arc}; -use quinn::{SendStream,RecvStream}; +use thiserror::Error; +use tokio::sync::{broadcast,mpsc,RwLock}; +use tokio::select; use crate::state::State; -pub enum MessageId { - State(RoomId,StatePath), - Messages(RoomId) +use tracing::{warn,error,debug}; + +#[derive(Error,Debug)] +enum HandlerError { + #[error("Foo")] + Foo } pub struct Client { @@ -23,30 +28,31 @@ pub struct Client { // Same with state changes // Potentially could be turned into a more efficient localised filter maybe // Remote messages can come through the same channel maybe? - message_send: broadcast::Sender, + message_send: broadcast::Sender, // This probably could be a more specific type // But we should be filtering and sending back only stuff that matters - message_recv: broadcast::Receiver, + message_recv: broadcast::Receiver, // Sends back match if anything filtered matched the client filters // Connections are closed after a period of inactivity so this should be fine, // if client is connected acks are sent, if inactive connection is closed - message_ack: mpsc::Sender, + message_ack: mpsc::Sender, db_handle: Pool, // Filled once user is authed username: Option, // how do I keep track of activity??? maybe there's a hashmap that gets updated - subscriptions: HashSet, + subscriptions: HashSet, quic_connection: quinn::Connection, - close_handle: broadcast::Receiver<()> + close_handle: broadcast::Receiver<()>, + in_challenge: Option<(String,SignedClientMessage)> } impl Client { pub fn new( statehandle: Arc>, (message_send,message_recv): ( - broadcast::Sender, - broadcast::Receiver), - message_ack: mpsc::Sender, + broadcast::Sender, + broadcast::Receiver), + message_ack: mpsc::Sender, db_handle: Pool, quic_connection: quinn::Connection, close_handle: broadcast::Receiver<()> @@ -58,6 +64,7 @@ impl Client { message_ack, db_handle, username: None, + in_challenge: None, subscriptions: HashSet::new(), quic_connection, close_handle @@ -65,17 +72,99 @@ impl Client { } - pub async fn run(mut self) { - let mut chunk_arr = [0u8; 1024]; - let mut serde_buf: Vec = Vec::new(); + pub async fn run(mut self,config: crate::config::Config) { loop { - + // Wait for either a client to send a message, a message that we might need to + // forward, or for the graceful shutdown signal select!{ //result = self.quic_recv.read(&mut chunk_arr) => { // unimplemented!() //} + // Incoming connection, read it to end and put it in a message + result = self.quic_connection.accept_bi() => { + match result { + Err(e) => { + warn!("Failed to accept incoming connection: {:?}",e); + continue; + }, + Ok((mut send,mut recv)) => { + let message: fedichat::client::SignedClientMessage = match recv.read_to_end(config.max_message_len_kb*1024).await { + Err(e) => { + warn!("Error while reading client request: {:?}",e); + continue; + }, + Ok(bytes) => { + match rmp_serde::from_slice(&bytes) { + Ok(message) => message, + Err(e) => { + warn!("Could not parse message from client: {:?}",e); + continue; + } + } + } + }; + let result = match self.handle_message(message).await { + Ok(res) => res, + Err(e) => { + warn!("Failed to handle client message with error {}",e); + continue; + } + }; + let buf = match rmp_serde::to_vec(&result) { + Ok(b) => b, + Err(e) => { + error!("Could not serialize message to client"); + error!("{:?}",e); + continue; + } + }; + match send.write_all(&buf).await { + Ok(()) => (), + Err(e) => debug!("Could not send message to client: {:?}",e) + } + } + } + + } result = self.message_recv.recv() => { + let result = match result { + Ok(val) => val, + Err(e) => { + warn!("Problem while processing broadcast stream: {:?}",e); + continue; + } + }; + // Check if the message is in our subscriptions + // If it is then send it down to the user + if let Some(relevance) = result.get_relevance() { + if self.subscriptions.contains(&relevance) { + match self.message_ack.send(relevance).await { + Ok(()) => (), + Err(e) => { + error!("Could not send broadcast ACK. Server likely in a broken state."); + error!("{}",e); + } + }; + let send = self.quic_connection.open_uni(); + // This is duplicated code, maybe could break into a function + let buf = match rmp_serde::to_vec(&result) { + Ok(b) => b, + Err(e) => { + error!("Could not serialize message to client"); + error!("{:?}",e); + continue; + } + }; + match send.await { + Ok(mut send) => match send.write_all(&buf).await { + Ok(()) => (), + Err(e) => debug!("Could not send message to client: {:?}",e) + }, + Err(e) => debug!("Could not send message to client: {:?}",e), + } + } + } unimplemented!() } _result = self.close_handle.recv() => { @@ -88,4 +177,204 @@ impl Client { // Do any cleanup that needs done } + + // Handles message and send back the right response. + async fn handle_message(&mut self, message: SignedClientMessage) -> Result { + // 3 states + // waiting on challenge + // waiting on auth + // normal operation, no reauth allowed + + if let Some((ref challenge,ref saved_msg)) = self.in_challenge { + if let ClientMessage::ChallengeAnswer {response} = message.message { + // If we successfully complete the challenge then perform the given action + if response == *challenge { + unimplemented!() + + // If we fail the challenge then reset the challenge + } else { + unimplemented!() + + } + // hmm users should probably be able to update state and do everything else still + // right?? Is forcing the to immediately complete the challenge too much? + } else { + unimplemented!() + + } + } else if let Some(ref username) = self.username { + use fedichat::client::ClientMessage::*; + match message.message { + + Auth{ + username: _username, + password: _password + } => { + return Ok(ServerMessage::Error(ServerError::AlreadyAuthenticated)) + }, + // Maybe ask for email too? Or a potential invite code + UserCreate { + username: _username, + password: _password, + } => { + return Ok(ServerMessage::Error(ServerError::AlreadyAuthenticated)) + }, + // Used to require accounts to complete some kind of challenge. Simplest + // is giving a password/invite code to create an account or join a room + ChallengeAnswer { + response: _, + } => { + return Ok(ServerMessage::Error(ServerError::NotInChallenge)) + }, + + // Should it be one message type or multiple? How does end-to-end + // encryption work here? It could be done in a hacky way with extra tags + Message { + body, + room_id, + } => { + unimplemented!() + }, + // Private message/invite mechanism + MessagePost { + body, + user, + } => {unimplemented!()}, + // Replace the body of the message with a new one + MessageEdit { + body, + id, + room_id, + } => {unimplemented!()}, + MessageDelete { + id, + room_id, + } => {unimplemented!()}, + // State Actions + StateCreate { + room_id, + path, + ty, + permissions, + } => {unimplemented!()}, + StateWrite { + room_id, + path, + content, + } => {unimplemented!()}, + StateDelete { + room_id, + path, + } => {unimplemented!()}, + StateAppend { + room_id, + path, + content, + } => {unimplemented!()}, + StateMove { + room_id, + path, + target, + } => {unimplemented!()}, + StateRead { + room_id, + path, + } => {unimplemented!()}, + PermissionAdd { + permission, + } => {unimplemented!()}, + PermissionDelete { + permission, + } => {unimplemented!()}, + // Groups really should have a way to add permissions by user + // specifically for who can join or invite others + // + // Maybe make a group -> role -> member hierarchy? + // + // + // Could always do this through a bot that owns a group?? + GroupCreate { + group, + users, + } => {unimplemented!()}, + // Only the creator of a group or a server admin can delete groups + GroupDelete { + group, + } => {unimplemented!()}, + // Only the creator of a group or a server admin can delete groups + // same with adding, though there should be a way to add group officers + // at some point + GroupUserAdd { + group, + users, + } => {unimplemented!()}, + GroupUserRemove { + group, + users, + } => {unimplemented!()}, + GroupRoleCreate { + group, + role, + } => {unimplemented!()}, + GroupRoleDelete { + group, + role, + } => {unimplemented!()}, + GroupRoleUserAdd { + group, + role, + users, + } => {unimplemented!()}, + GroupRoleUserRemove { + group, + role, + users, + } => {unimplemented!()}, + // Should work like discord roles + // Can control who can invite to the group + // Can be used with permissions to make rooms that are private for individual roles + GroupRolePowerAdd { + group, + role, + power, + } => {unimplemented!()}, + GroupRolePowerRemove { + group, + role, + power, + } => {unimplemented!()}, + // Returns an ID to use for message sending + // The server can potentially use the current username to associate media uploads + // with users + MediaUpload { + bytes, + } => {unimplemented!()}, + // Join and subscribe + Join { + room_id, + } => {unimplemented!()}, + SubscribeMessages { + room_id, + } => {unimplemented!()}, + SubscribeState { + room_id, + state, + } => {unimplemented!()}, + FetchMessages { + count, + end, + } => {unimplemented!()} + + + } + + } else if let ClientMessage::Auth {username,password} = message.message { + unimplemented!() + } else if let ClientMessage::UserCreate {username,password} = message.message { + unimplemented!() + } else { + return Ok(ServerMessage::Error(ServerError::NotAuthenticated)); + } + + } } diff --git a/src/config.rs b/src/config.rs index b2ab5a3..44dbd8a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,8 +14,10 @@ pub struct Config { pub certfile: String, pub keyfile: String, pub statefile: String, - pub loglevel: String, - pub media_directory: String + pub loglevel: Option, + pub media_directory: String, + pub max_message_len_kb: usize, + pub account_creation_code: Option } #[derive(Clone,Serialize,Deserialize)] pub struct DBConfig { diff --git a/src/connection.rs b/src/connection.rs index dd8ebe1..82de96d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,8 +1,8 @@ -use crate::client::{Client,MessageId}; +use crate::client::{Client}; use crate::state::State; use tokio::sync::{mpsc,RwLock,broadcast}; use std::sync::Arc; -use fedichat::client::TaggedClientMessage; +use fedichat::message::{Relevance,TaggedMessage}; use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::{AsyncPgConnection}; use tracing::{instrument,warn}; @@ -11,12 +11,13 @@ use tracing::{instrument,warn}; pub async fn client_handler( statehandle: Arc>, (message_send,message_recv): ( - broadcast::Sender, - broadcast::Receiver), - message_ack_send: mpsc::Sender, + broadcast::Sender, + broadcast::Receiver), + message_ack_send: mpsc::Sender, db_handle: Pool, endpoint: quinn::Endpoint, - close_handle: broadcast::Receiver<()> + close_handle: broadcast::Receiver<()>, + config: crate::config::Config ) { let mut thread_handles = Vec::new(); @@ -39,6 +40,7 @@ pub async fn client_handler( let close_handle = close_handle.resubscribe(); let db_handle = db_handle.clone(); let message_ack_send = message_ack_send.clone(); + let config = config.clone(); thread_handles.push(tokio::spawn(async move { // listen for the first bidirectional stream @@ -52,7 +54,7 @@ pub async fn client_handler( close_handle ); // run client - client.run() + client.run(config) })); } diff --git a/src/main.rs b/src/main.rs index f9a0a84..38530b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use std::process::ExitCode; use std::str::FromStr; use std::sync::Arc; use serde::{Deserialize,Serialize}; -use tracing::{error,instrument,warn,debug}; +use tracing::{error,instrument,warn,debug,Level}; use tokio::sync::{RwLock,broadcast,mpsc}; @@ -28,6 +28,11 @@ pub struct Coordinate(Vec); #[tokio::main] #[instrument] async fn main() -> ExitCode { + // Initial logger so we have something during config + tracing::subscriber::set_global_default( + tracing_subscriber::fmt().with_max_level(Level::WARN).finish() + ).expect("Failed to setup logger"); + // Read in config let config = match Config::load() { Ok(c) => c, @@ -38,6 +43,30 @@ async fn main() -> ExitCode { } }; + let level = match config.loglevel { + Some(ref s) => { + let loglevel = s.to_lowercase(); + match loglevel.as_str() { + "trace" => Level::TRACE, + "debug" => Level::DEBUG, + "info" => Level::INFO, + "warn" => Level::WARN, + "error" => Level::ERROR, + _ => { + warn!("Invalid loglevel in config: {}",&loglevel); + Level::INFO + }, + } + }, + // Default to info level + None => Level::INFO + }; + + tracing::subscriber::set_global_default( + tracing_subscriber::fmt().with_max_level(level).finish() + ).expect("Failed to setup logger"); + + // Set up database connection let db_config = AsyncDieselConnectionManager::::new(config.database.url.clone()); let db_pool = match Pool::builder(db_config).build() { @@ -146,6 +175,7 @@ async fn main() -> ExitCode { // Create client listener let statehandle_cloned = statehandle.clone(); + let config_cloned = config.clone(); join_handles.push(tokio::spawn(async move { connection::client_handler( statehandle_cloned, @@ -153,7 +183,8 @@ async fn main() -> ExitCode { message_ack_send, db_pool, endpoint, - close_recv.resubscribe() + close_recv.resubscribe(), + config_cloned ).await; } ));