diff --git a/.gitignore b/.gitignore index cdde1f0..c0a569f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target confetti.toml +confetti.state .env diff --git a/Cargo.lock b/Cargo.lock index bb1d864..71984bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -266,9 +266,11 @@ dependencies = [ "diesel-derive-composite", "fedichat", "quinn", + "rand 0.10.1", "rmp-serde", "serde", "thiserror 2.0.18", + "time", "tokio", "toml", "tracing", diff --git a/Cargo.toml b/Cargo.toml index a3a6273..3a73735 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,8 +6,8 @@ edition = "2024" [dependencies] diesel = { version = "2.3.9", features = ["postgres_backend"] } quinn = "0.11.9" -fedichat = {git = "https://git.firechicken.net/fedichat/fedichat-lib"} -#fedichat = {path = "../fedichat-lib"} +#fedichat = {git = "https://git.firechicken.net/fedichat/fedichat-lib"} +fedichat = {path = "../fedichat-lib"} tracing = "0.1.44" tracing-subscriber = "0.3.23" clap = { version = "4.6.1", features = ["derive"] } @@ -20,4 +20,6 @@ ctrlc-async = { version = "3.2.2", features = ["termination"] } rmp-serde = "1.3.1" diesel-derive-composite = "0.1.0" bcrypt = "0.19.1" +time = { version = "0.3.47", features = ["serde"] } +rand = "0.10.1" #postcard = {version = "1.1.3", features = ["use-std"]} diff --git a/confetti.toml.example b/confetti.toml.example index 67ad4a3..063102d 100644 --- a/confetti.toml.example +++ b/confetti.toml.example @@ -19,4 +19,5 @@ admins = [] url = "my.db.server" user = "confetti" password = "my_secret_password" +db = "confetti" num_connections = 8 diff --git a/migrations/2026-05-31-000742-0000_tokens/down.sql b/migrations/2026-05-31-000742-0000_tokens/down.sql new file mode 100644 index 0000000..05d1f58 --- /dev/null +++ b/migrations/2026-05-31-000742-0000_tokens/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE tokens diff --git a/migrations/2026-05-31-000742-0000_tokens/up.sql b/migrations/2026-05-31-000742-0000_tokens/up.sql new file mode 100644 index 0000000..b564cf2 --- /dev/null +++ b/migrations/2026-05-31-000742-0000_tokens/up.sql @@ -0,0 +1,7 @@ +-- Your SQL goes here +CREATE TABLE tokens ( + id SERIAL PRIMARY KEY, + username TEXT NOT NULL, + token TEXT NOT NULL, + expiry BIGINT NOT NULL +) diff --git a/src/client.rs b/src/client.rs index ca1df49..be3f613 100644 --- a/src/client.rs +++ b/src/client.rs @@ -14,7 +14,8 @@ use crate::Coordinate; use crate::db; use crate::state::{self,State}; -use tracing::{warn,error,debug}; +use quinn::ConnectionError; +use tracing::{warn,error,debug,instrument}; #[derive(Error,Debug)] enum HandlerError { @@ -73,9 +74,11 @@ impl Client { } + #[instrument(skip_all)] pub async fn run(mut self,config: crate::config::Config) { // This can probably use a refactoring at some point, this function is huge + debug!("Running handler thread"); loop { // Wait for either a client to send a message, a message that we might need to // forward, or for the graceful shutdown signal @@ -85,10 +88,19 @@ impl Client { //} // Incoming connection, read it to end and put it in a message result = self.quic_connection.accept_bi() => { + debug!("Handling incoming message"); match result { + Err(ConnectionError::ConnectionClosed(_)) + | Err(ConnectionError::TimedOut) + | Err(ConnectionError::Reset) + | Err(ConnectionError::LocallyClosed) + | Err(ConnectionError::ApplicationClosed(_))=> { + debug!("Connection closed"); + return; + }, Err(e) => { warn!("Failed to accept incoming connection: {:?}",e); - continue; + return; }, Ok((mut send,mut recv)) => { let message: fedichat::client::SignedClientMessage = match recv.read_to_end(config.max_message_len_kb*1024).await { @@ -191,12 +203,15 @@ impl Client { } // Handles message and send back the right response. + #[instrument(skip_all)] async fn handle_message(&mut self, config: &crate::config::Config, message: SignedClientMessage) -> Result { // 3 states // waiting on challenge // waiting on auth // normal operation, no reauth allowed + debug!("Handling message: {:?}",&message); + if let Some((ref challenge,ref saved_msg)) = self.in_challenge { if let ClientMessage::ChallengeAnswer {response} = message.message { // If we successfully complete the challenge then perform the given action @@ -255,6 +270,10 @@ impl Client { return Ok(ServerMessage::Error(ServerError::NotInChallenge)) }, + CreateToken => { + Ok(ServerMessage::Token(db::gen_token(self.get_db().await?, username.clone()).await?)) + }, + // Should it be one message type or multiple? How does end-to-end // encryption work here? It could be done in a hacky way with extra html tags Message { @@ -452,6 +471,18 @@ impl Client { Ok(ServerMessage::Ok) } }, + RoomLeave { + room_id, + } => { + let coords: Coordinate = room_id.clone().into(); + + if !self.statehandle.read().await.room_exists(coords.clone()) { + Ok(ServerMessage::Error(ServerError::RoomNotFound(room_id.clone()))) + } else { + self.statehandle.write().await.leave(coords,user).await?; + Ok(ServerMessage::Ok) + } + }, RoomCreate { room_id, } => { @@ -470,15 +501,17 @@ impl Client { SubscribeMessages { room_id, } => { - // TODO: Has to check if user is joined to room first - self.subscriptions.insert(Relevance::Message(room_id)); - Ok(ServerMessage::Ok) + if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await { + self.subscriptions.insert(Relevance::Message(room_id)); + Ok(ServerMessage::Ok) + } else { + Err(ServerError::NotJoined(room_id)) + } }, SubscribeState { room_id, state, } => { - // TODO: Has to check if user is joined to room first if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await { self.subscriptions.insert(Relevance::State(room_id,state)); Ok(ServerMessage::Ok) @@ -511,6 +544,7 @@ impl Client { Err(ServerError::ChallengeInviteCode) } else { + self.username = Some(username.clone()); db::maybe_create_user(self.get_db().await ?,username,password).await } } else { @@ -578,6 +612,12 @@ impl Client { operation: state::StateOperation ) -> Result,ServerError> { + // check if user is joined to the given room. If not return early + if !self.statehandle.read().await.is_joined(coord.clone().into(),user.clone()).await { + return Err(ServerError::NotJoined(coord.clone().to_roomid())) + } + + // If they are then process the action let unlockable = self.statehandle.read().await.to_unlockable_operation(operation).await?; let locks = unlockable.get_locks(); let room_id = coord.clone().into(); diff --git a/src/config.rs b/src/config.rs index e304d8f..27980d0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -29,6 +29,7 @@ pub struct DBConfig { pub url: String, pub user: String, pub password: String, + pub db_name: String, pub num_connections: usize } diff --git a/src/connection.rs b/src/connection.rs index 82de96d..76d9998 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use fedichat::message::{Relevance,TaggedMessage}; use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::{AsyncPgConnection}; -use tracing::{instrument,warn}; +use tracing::{instrument,warn,trace,debug}; #[instrument(skip_all)] pub async fn client_handler( @@ -16,48 +16,60 @@ pub async fn client_handler( message_ack_send: mpsc::Sender, db_handle: Pool, endpoint: quinn::Endpoint, - close_handle: broadcast::Receiver<()>, + mut close_handle: broadcast::Receiver<()>, config: crate::config::Config ) { let mut thread_handles = Vec::new(); // Start iterating over incoming connections. - while let Some(conn) = endpoint.accept().await { - let connection = match conn.await { - Ok(conn) => conn, - Err(e) => { - warn!("Error while processing client connection"); - warn!("{:?}",e); - continue - } - }; - + debug!("Starting client handler"); + loop { + tokio::select! { + Some(conn) = endpoint.accept() => { + let connection = match conn.await { + Ok(conn) => conn, + Err(e) => { + warn!("Error while processing client connection"); + warn!("{:?}",e); + continue - let statehandle = statehandle.clone(); - let message_send = message_send.clone(); - let message_recv = message_recv.resubscribe(); - let close_handle = close_handle.resubscribe(); - let db_handle = db_handle.clone(); - let message_ack_send = message_ack_send.clone(); - let config = config.clone(); + } + }; + debug!("Handling new client connection from {}",connection.remote_address()); + - thread_handles.push(tokio::spawn(async move { - // listen for the first bidirectional stream - // create client - let client = Client::new( - statehandle, - (message_send,message_recv), - message_ack_send, - db_handle, - connection, - close_handle - ); - // run client - client.run(config) - })); + let statehandle = statehandle.clone(); + let message_send = message_send.clone(); + let message_recv = message_recv.resubscribe(); + let close_handle = close_handle.resubscribe(); + let db_handle = db_handle.clone(); + let message_ack_send = message_ack_send.clone(); + let config = config.clone(); + + thread_handles.push(tokio::spawn(async move { + debug!("Creating client handler"); + // listen for the first bidirectional stream + // create client + let client = Client::new( + statehandle, + (message_send,message_recv), + message_ack_send, + db_handle, + connection, + close_handle + ); + debug!("Running client handler"); + // run client + client.run(config).await + })); + }, + Ok(()) = close_handle.recv() => break, + // Connection error of some sort + else => break, + } } - + debug!("Exiting client handler"); } diff --git a/src/db/mod.rs b/src/db/mod.rs index 2aa57d9..68ce196 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -7,9 +7,15 @@ use bcrypt::{DEFAULT_COST, hash, verify}; use diesel_async::AsyncPgConnection; use diesel_async::RunQueryDsl;//, AsyncConnection}; use diesel_async::pooled_connection::deadpool::Object; -use fedichat::client::{ServerMessage,ServerError}; +use fedichat::client::{ServerMessage,ServerError,AuthMethod}; +use rand::distr::{Alphanumeric, SampleString}; +use rand::rngs::{SysRng,StdRng}; +use rand::prelude::*; + +use time::{OffsetDateTime,Duration}; use tracing::{instrument,warn}; + #[instrument(skip_all)] pub async fn maybe_create_user(mut connection: Object, username: &str, password: &str) -> Result { let password = match hash(password,DEFAULT_COST) { @@ -37,32 +43,85 @@ pub async fn maybe_create_user(mut connection: Object, userna /// Try to authenticate a user against information in the database #[instrument(skip_all)] -pub async fn verify_user(mut connection: Object, user: &str, pass: &str) -> Result { - use schema::users::dsl::*; +pub async fn verify_user(mut connection: Object, user: &str, pass: &AuthMethod) -> Result { - let result = users - .filter(username.eq(user)) - .select(models::User::as_select()) - .first(&mut *connection) + match pass { + AuthMethod::Password(pass) => { + use schema::users::dsl::*; + let result = users + .filter(username.eq(user)) + .select(models::User::as_select()) + .first(&mut *connection) + .await; + + + + match result { + // If we have more than 0 rows then authentication is successful + Ok(user) => match verify(pass,&user.password) { + Ok(val) => Ok(val), + Err(e) => { + warn!("Error encountered while generating password hash"); + warn!("{e}"); + return Err(ServerError::Generic) + } + + }, + // TODO: Probably actually check the error + _ => Err(ServerError::AuthenticationFailed) + } + }, + AuthMethod::Token(tk) => { + use schema::tokens::dsl::*; + let result = tokens + .filter(username.eq(user)) + .filter(token.eq(tk)) + // Check that expiry is in the future + .filter(expiry.gt(OffsetDateTime::now_utc().unix_timestamp())) + .select(models::Token::as_select()) + .first(&mut *connection) + .await; + + + + match result { + // If we have more than 0 rows then authentication is successful + // Only need one token, no need to hash or do anything else + Ok(_) => Ok(true), + // TODO: Probably actually check the error + _ => Err(ServerError::AuthenticationFailed) + } + + } + + } +} + +/// Create a new token for the current user with an expiry set 3 months into the future +pub async fn gen_token(mut connection: Object, user: String) + -> Result +{ + let mut rng = StdRng::try_from_rng(&mut SysRng).unwrap(); + let new_token = Alphanumeric.sample_string(&mut rng, 32); + let expiry = OffsetDateTime::now_utc() + Duration::days(90); + + + let insertable = models::NewToken { + username: user, + token: new_token.clone(), + expiry: expiry.unix_timestamp() + }; + + let result = diesel::insert_into(schema::tokens::table) + .values(&insertable) + .execute(&mut *connection) .await; - - match result { - // If we have more than 0 rows then authentication is successful - Ok(user) => match verify(pass,&user.password) { - Ok(val) => Ok(val), - Err(e) => { - warn!("Error encountered while generating password hash"); - warn!("{e}"); - return Err(ServerError::Generic) - } - - }, + // TODO: might need to check this? I don't know what it means + Ok(_) => Ok(new_token), // TODO: Probably actually check the error - _ => Err(ServerError::AuthenticationFailed) + Err(_e) => Err(ServerError::UserAlreadyExists) } - - - } + diff --git a/src/db/models.rs b/src/db/models.rs index 7187e9e..a828e10 100644 --- a/src/db/models.rs +++ b/src/db/models.rs @@ -73,3 +73,19 @@ pub struct Messages { pub server_timestamp: i64, pub username: UserT } + +#[derive(Queryable, Selectable)] +#[diesel(table_name = crate::db::schema::tokens)] +pub struct Token { + pub id: i32, + pub username: String, + pub token: String, + pub expiry: i64, +} +#[derive(Insertable)] +#[diesel(table_name = crate::db::schema::tokens)] +pub struct NewToken { + pub username: String, + pub token: String, + pub expiry: i64, +} diff --git a/src/db/schema.rs b/src/db/schema.rs index 182da84..b1588f7 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -48,6 +48,15 @@ diesel::table! { } } +diesel::table! { + tokens (id) { + id -> Int4, + username -> Text, + token -> Text, + expiry -> Int8, + } +} + diesel::table! { users (id) { id -> Int4, @@ -56,4 +65,4 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!(groups, messages, roles, users,); +diesel::allow_tables_to_appear_in_same_query!(groups, messages, roles, tokens, users,); diff --git a/src/main.rs b/src/main.rs index a287ef0..976fd13 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use std::process::ExitCode; use std::str::FromStr; use std::sync::Arc; use serde::{Deserialize,Serialize}; -use tracing::{error,instrument,warn,debug,Level}; +use tracing::{error,instrument,warn,debug,info,Level}; use tokio::sync::{RwLock,broadcast,mpsc}; @@ -25,6 +25,12 @@ use crate::state::{State,StateError}; #[derive(Hash,Eq,PartialEq,Clone,Serialize,Deserialize,Debug)] pub struct Coordinate(Vec); +impl Coordinate { + pub fn to_roomid(self) -> fedichat::RoomId { + fedichat::RoomId{coordinates: self.0} + } +} + impl From for Coordinate { fn from(other: fedichat::RoomId) -> Coordinate { Coordinate(other.coordinates) @@ -75,8 +81,12 @@ async fn main() -> ExitCode { // Set up database connection - let db_config = AsyncDieselConnectionManager::::new(config.database.url.clone()); - let db_pool = match Pool::builder(db_config).build() { + let db_string = format!("postgres://{}:{}@{}/{}",config.database.user,config.database.password,config.database.url,config.database.db_name); + let db_config = AsyncDieselConnectionManager::::new(db_string); + let db_pool = match Pool::builder(db_config) + .max_size(config.database.num_connections) + .build() + { Ok(val) => val, Err(e) => { error!("Error while creating database connection pool"); @@ -101,6 +111,7 @@ async fn main() -> ExitCode { } }; + let quinn_config = match quinn::ServerConfig::with_single_cert(certs, key){ Ok(val) => val, Err(e) => { @@ -122,6 +133,8 @@ async fn main() -> ExitCode { } }; + debug!("Bound to {address} on port {}",config.port); + // Load or create new state let state = match State::load_from_file(&config.statefile) { Ok(state) => state, @@ -130,7 +143,10 @@ async fn main() -> ExitCode { match fs::File::create(&config.statefile) { // If the statefile is writable then create an empty state // and use that - Ok(_) => State::new(), + Ok(_) => { + debug!("Creating fresh state"); + State::new() + }, Err(e) => { error!("Could not open or create statefile. Check your config."); error!("{:?}",e); @@ -153,9 +169,10 @@ async fn main() -> ExitCode { let (message_send,message_recv) = broadcast::channel(128); let (message_ack_send,message_ack_recv) = mpsc::channel(128); + debug!("Setting ctrl-c handler"); match ctrlc_async::set_async_handler( async move { match close_send.send(()) { - Ok(_val) => (), + Ok(_val) => debug!("Propogating ctrl-c"), Err(e) => { error!("Shutdown handler is broken. Cannot gracefully exit."); error!("{:?}",e); @@ -177,6 +194,7 @@ async fn main() -> ExitCode { // Create client listener + debug!("Setting up client handler"); let statehandle_cloned = statehandle.clone(); let config_cloned = config.clone(); join_handles.push(tokio::spawn(async move { @@ -191,6 +209,8 @@ async fn main() -> ExitCode { ).await; } )); + info!("Successfully started confetti"); + // Wait for child threads to exit for handle in join_handles { match handle.await { @@ -202,6 +222,7 @@ async fn main() -> ExitCode { } } + info!("Shutting down"); //Save state match statehandle.write().await.write_to_file(&config.statefile).await { Ok(()) => debug!("Successfully wrote state to {:?}",config.statefile), @@ -211,9 +232,5 @@ async fn main() -> ExitCode { } } - - - // Should never reach this - // How should I gracefully shutdown? Can I trap ctrlc ExitCode::SUCCESS } diff --git a/src/state.rs b/src/state.rs index 8a86fe1..9cb7f77 100644 --- a/src/state.rs +++ b/src/state.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use rmp_serde::{decode,encode}; use fedichat::state::{StatePermissionKey,StatePermissionValue}; use fedichat::client::ServerError; +use tracing::{warn,debug}; // These ergonomics kind of suck, might get reworked // @@ -178,7 +179,8 @@ impl State { pub async fn is_joined(&self, coord: Coordinate, user: fedichat::User) -> bool { match self.get_bakedin(coord.clone(),"members".to_owned()).await { - Ok(val) => val.0.contains_key(&StatePermissionKey::User(user.clone())), + // There surely is a way to do this without taking ownership of user + Ok(val) => val.0.contains_key(&StatePermissionKey::User(user)), // Dunno if I should hide this error Err(_e) => false } @@ -187,6 +189,8 @@ impl State { // Get all banned users. If we are not banned, add our name into the joined list pub async fn join(&self, coord: Coordinate, user: fedichat::User) -> Result<(),StateError> { let bans = self.get_bakedin(coord.clone(),"banned".to_owned()).await?; + debug!("Banned user {:?} trying to join {:?}",user,coord); + debug!("BANLIST: {:?}",bans); if bans.0.contains_key(&StatePermissionKey::User(user.clone())) { return Err(StateError::Banned(coord)); } @@ -198,7 +202,17 @@ impl State { path: fedichat::state::StatePath(vec!["room".to_owned(),"members".to_owned()]), op: Operation::PermAdd(permission) }).await.map(|_| ()) + } + pub async fn leave(&self, coord: Coordinate, user: fedichat::User) -> Result<(),StateError> { + + //let permission = fedichat::state::StatePermission(StatePermissionKey::User(user),StatePermissionValue::None); + // Remove ourselves from the joined list + self.execute_operation(UnlockedOperation { + room: coord.clone(), + path: fedichat::state::StatePath(vec!["room".to_owned(),"members".to_owned()]), + op: Operation::PermDel(StatePermissionKey::User(user)) + }).await.map(|_| ()) } // Get a baked in state value. Useful for checking if a user is joined to a room @@ -708,7 +722,6 @@ impl StateNode { let banned_perms = fedichat::state::PermissionTable( HashMap::from([ (StatePermissionKey::Operator,StatePermissionValue::Owner), - (StatePermissionKey::User(operator),StatePermissionValue::None), (StatePermissionKey::Everyone,StatePermissionValue::Read), ]) ); @@ -838,6 +851,7 @@ pub enum StateError { impl From for ServerError { fn from(value: StateError) -> ServerError { + warn!("StateError found: {}",value); // TODO: // Probably needs a pass, throwing away all this error information feels wrong. // Maybe this should debug print every time it is called lol