State implementation and initial db work

State system is fully implemented and passing tests. I added in a
handful of unit tests to test most of the available operations.

Other work has involved getting the interface between client and server
right and starting on the actual message handler. This involves starting
work on the db as well
This commit is contained in:
2026-05-24 22:53:52 -07:00
parent c92ee309a9
commit eaeb293915
8 changed files with 1639 additions and 242 deletions
Generated
+45 -2
View File
@@ -81,6 +81,19 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bcrypt"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24ae5479c93d3720e4c1dbd6b945b97457c50cb672781104768190371df1a905"
dependencies = [
"base64",
"blowfish",
"getrandom 0.4.2",
"subtle",
"zeroize",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@@ -102,6 +115,16 @@ dependencies = [
"hybrid-array",
]
[[package]]
name = "blowfish"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62ce3946557b35e71d1bbe07ec385073ce9eda05043f95de134eb578fcf1a298"
dependencies = [
"byteorder",
"cipher",
]
[[package]]
name = "bumpalo"
version = "3.20.2"
@@ -159,6 +182,16 @@ dependencies = [
"rand_core 0.10.1",
]
[[package]]
name = "cipher"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8cf2a2c93cd704877c0858356ed03480ff301ee950b43f1cbe4573b088bfa6c"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "4.6.1"
@@ -225,6 +258,7 @@ dependencies = [
name = "confetti"
version = "0.1.0"
dependencies = [
"bcrypt",
"clap",
"ctrlc-async",
"diesel",
@@ -283,9 +317,9 @@ dependencies = [
[[package]]
name = "crypto-common"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77727bb15fa921304124b128af125e7e3b968275d1b108b379190264f4423710"
checksum = "ce6e4c961d6cd6c9a86db418387425e8bdeaf05b3c8bc1411e6dca4c252f1453"
dependencies = [
"hybrid-array",
]
@@ -686,6 +720,15 @@ dependencies = [
"serde_core",
]
[[package]]
name = "inout"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4250ce6452e92010fdf7268ccc5d14faa80bb12fc741938534c58f16804e03c7"
dependencies = [
"hybrid-array",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.2"
+2
View File
@@ -19,3 +19,5 @@ diesel-async = { version = "0.9.0", features = ["postgres","deadpool"] }
ctrlc-async = { version = "3.2.2", features = ["termination"] }
rmp-serde = "1.3.1"
diesel-derive-composite = "0.1.0"
bcrypt = "0.19.1"
#postcard = {version = "1.1.3", features = ["use-std"]}
+123 -20
View File
@@ -1,16 +1,16 @@
use fedichat::RoomId;
use fedichat::message::{Relevance,TaggedMessage};
use fedichat::client::{ClientMessage,SignedClientMessage,ServerMessage,ServerError};
use fedichat::state::StatePath;
use diesel_async::AsyncPgConnection;
use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::pooled_connection::deadpool::{Pool,Object};
use rmp_serde;
use std::collections::HashSet;
use std::sync::{Arc};
use thiserror::Error;
use tokio::sync::{broadcast,mpsc,RwLock};
use tokio::select;
use crate::Coordinate;
use crate::db;
use crate::state::State;
use tracing::{warn,error,debug};
@@ -73,6 +73,7 @@ impl Client {
}
pub async fn run(mut self,config: crate::config::Config) {
// This can probably use a refactoring at some point, this function is huge
loop {
// Wait for either a client to send a message, a message that we might need to
@@ -104,13 +105,14 @@ impl Client {
}
}
};
let result = match self.handle_message(message).await {
let result = match self.handle_message(&config,message).await {
Ok(res) => res,
Err(e) => {
warn!("Failed to handle client message with error {}",e);
continue;
warn!("Failed to handle client message with error {:?}",e);
ServerMessage::Error(e)
}
};
// TODO: This needs to override the default options to use a map
let buf = match rmp_serde::to_vec(&result) {
Ok(b) => b,
Err(e) => {
@@ -137,6 +139,8 @@ impl Client {
};
// Check if the message is in our subscriptions
// If it is then send it down to the user
// Probably should check that we're still in a room maybe? Not sure how
// kicks are going to work.
if let Some(relevance) = result.get_relevance() {
if self.subscriptions.contains(&relevance) {
match self.message_ack.send(relevance).await {
@@ -148,6 +152,7 @@ impl Client {
};
let send = self.quic_connection.open_uni();
// This is duplicated code, maybe could break into a function
// TODO: This needs to override the default options to use a map
let buf = match rmp_serde::to_vec(&result) {
Ok(b) => b,
Err(e) => {
@@ -165,7 +170,6 @@ impl Client {
}
}
}
unimplemented!()
}
_result = self.close_handle.recv() => {
// Maybe TODO do I need to check the result?
@@ -178,8 +182,15 @@ impl Client {
}
// This opens up a quic connection to the remote server, serializes our message, and
// processes the response. Should use a long-lived connection but for the prototype its
// going to open a new one each time.
async fn forward(&self, server: &fedichat::ServerAddr, message: TaggedMessage) -> Result<ServerMessage,ServerError>{
unimplemented!()
}
// Handles message and send back the right response.
async fn handle_message(&mut self, message: SignedClientMessage) -> Result<ServerMessage,HandlerError> {
async fn handle_message(&mut self, config: &crate::config::Config, message: SignedClientMessage) -> Result<ServerMessage,ServerError> {
// 3 states
// waiting on challenge
// waiting on auth
@@ -197,13 +208,28 @@ impl Client {
}
// hmm users should probably be able to update state and do everything else still
// right?? Is forcing the to immediately complete the challenge too much?
// right?? Is forcing them to immediately complete the challenge too much?
} else {
unimplemented!()
}
} else if let Some(ref username) = self.username {
let user = fedichat::User {
name: username.clone(),
server: config.hostname.clone()
};
use fedichat::client::ClientMessage::*;
// Forward the message if it is addressed to a remote server
if let Some(servername) = message.target.clone()
&& servername != fedichat::ServerAddr(config.hostname.clone()) {
if message.message.is_forwardable() {
let user = self.get_user(config)?;
self.forward(&servername,message.tag(user,fedichat::ServerAddr(config.hostname.clone()))).await
} else {
Err(ServerError::MessageNotForwardable)
}
} else {
match message.message {
Auth{
@@ -228,10 +254,11 @@ impl Client {
},
// Should it be one message type or multiple? How does end-to-end
// encryption work here? It could be done in a hacky way with extra tags
// encryption work here? It could be done in a hacky way with extra html tags
Message {
body,
room_id,
id,
} => {
unimplemented!()
},
@@ -254,7 +281,7 @@ impl Client {
StateCreate {
room_id,
path,
ty,
content,
permissions,
} => {unimplemented!()},
StateWrite {
@@ -282,9 +309,17 @@ impl Client {
} => {unimplemented!()},
PermissionAdd {
permission,
path,
room_id
} => {unimplemented!()},
PermissionRead {
path,
room_id
} => {unimplemented!()},
PermissionDelete {
permission,
path,
room_id
} => {unimplemented!()},
// Groups really should have a way to add permissions by user
// specifically for who can join or invite others
@@ -350,16 +385,52 @@ impl Client {
bytes,
} => {unimplemented!()},
// Join and subscribe
Join {
RoomJoin {
room_id,
} => {unimplemented!()},
} => {
let coords: Coordinate = room_id.clone().into();
if !self.statehandle.read().await.room_exists(coords.clone()) {
Ok(ServerMessage::Error(ServerError::RoomNotFound(room_id.clone())))
} else {
self.statehandle.write().await.join(coords,user).await?;
Ok(ServerMessage::Ok)
}
},
RoomCreate {
room_id,
} => {
let coords: Coordinate = room_id.clone().into();
if self.statehandle.read().await.room_exists(coords.clone()) {
Ok(ServerMessage::Error(ServerError::RoomExists(room_id.clone())))
} else {
// This surely does not deadlock
self.statehandle.write().await.create_room(coords,user)?;
Ok(ServerMessage::Ok)
}
},
SubscribeMessages {
room_id,
} => {unimplemented!()},
} => {
// TODO: Has to check if user is joined to room first
self.subscriptions.insert(Relevance::Message(room_id));
Ok(ServerMessage::Ok)
},
SubscribeState {
room_id,
state,
} => {unimplemented!()},
} => {
// TODO: Has to check if user is joined to room first
if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await {
self.subscriptions.insert(Relevance::State(room_id,state));
Ok(ServerMessage::Ok)
} else {
Err(ServerError::NotJoined(room_id))
}
},
FetchMessages {
count,
end,
@@ -367,14 +438,46 @@ impl Client {
}
}
} else if let ClientMessage::Auth {username,password} = message.message {
unimplemented!()
} else if let ClientMessage::UserCreate {username,password} = message.message {
unimplemented!()
} else if let ClientMessage::Auth {ref username,ref password} = message.message {
match db::verify_user(self.get_db().await?,username,password).await? {
true => {
self.username = Some(username.clone());
Ok(ServerMessage::Ok)
},
false => {
Err(ServerError::AuthenticationFailed)
}
}
} else if let ClientMessage::UserCreate {username,password} = &message.message {
if let Some(ref code) = config.account_creation_code {
self.in_challenge = Some((code.to_string(),message));
Err(ServerError::ChallengeInviteCode)
} else {
db::maybe_create_user(self.get_db().await ?,username,password).await
}
} else {
return Ok(ServerMessage::Error(ServerError::NotAuthenticated));
}
}
fn get_user(&self,config: &crate::config::Config) -> Result<fedichat::User,ServerError> {
Ok(fedichat::User {
name: self.username.clone().ok_or(ServerError::NotAuthenticated)?,
server: config.hostname.clone()
})
}
async fn get_db(&self) -> Result<Object<AsyncPgConnection>,ServerError> {
match self.db_handle.get().await {
Ok(conn) => Ok(conn),
Err(e) => {
error!("Could not connect to database server: {e}");
// This error is not relevant to the client in any way
Err(ServerError::Generic)
}
}
}
}
+3
View File
@@ -7,6 +7,9 @@ use thiserror::Error;
#[derive(Clone,Serialize,Deserialize)]
pub struct Config {
pub hostname: String,
//NOTE: Changing the federation port breaks federation
// Changing the client port is also probably a bad idea. There might
// need to be .wellknown support at some point
pub port: u16,
pub federation_port: u16,
pub listen_address: String,
+66
View File
@@ -1,2 +1,68 @@
use diesel::prelude::*;
pub mod models;
pub mod schema;
use bcrypt::{DEFAULT_COST, hash, verify};
use diesel_async::AsyncPgConnection;
use diesel_async::RunQueryDsl;//, AsyncConnection};
use diesel_async::pooled_connection::deadpool::Object;
use fedichat::client::{ServerMessage,ServerError};
use tracing::{instrument,warn};
#[instrument(skip_all)]
pub async fn maybe_create_user(mut connection: Object<AsyncPgConnection>, username: &str, password: &str) -> Result<ServerMessage,ServerError> {
let password = match hash(password,DEFAULT_COST) {
Ok(val) => val,
Err(e) => {
warn!("Error encountered while generating password hash");
warn!("{e}");
return Err(ServerError::Generic)
}
};
let username = username.to_string();
let user = models::NewUser{ username, password};
let result = diesel::insert_into(schema::users::table)
.values(&user)
.execute(&mut *connection)
.await;
match result {
// TODO: might need to check this? I don't know what it means
Ok(_) => Ok(ServerMessage::Ok),
// TODO: Probably actually check the error
Err(_e) => Err(ServerError::UserAlreadyExists)
}
}
/// Try to authenticate a user against information in the database
#[instrument(skip_all)]
pub async fn verify_user(mut connection: Object<AsyncPgConnection>, user: &str, pass: &str) -> Result<bool,ServerError> {
use schema::users::dsl::*;
let result = users
.filter(username.eq(user))
.select(models::User::as_select())
.first(&mut *connection)
.await;
match result {
// If we have more than 0 rows then authentication is successful
Ok(user) => match verify(pass,&user.password) {
Ok(val) => Ok(val),
Err(e) => {
warn!("Error encountered while generating password hash");
warn!("{e}");
return Err(ServerError::Generic)
}
},
// TODO: Probably actually check the error
_ => Err(ServerError::AuthenticationFailed)
}
}
+7
View File
@@ -55,6 +55,13 @@ pub struct User {
pub password: String,
}
#[derive(Insertable)]
#[diesel(table_name = crate::db::schema::users)]
pub struct NewUser {
pub username: String,
pub password: String,
}
#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::db::schema::messages)]
pub struct Messages {
+18 -16
View File
@@ -6,7 +6,6 @@ mod state;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::{AsyncConnection,AsyncPgConnection};
use quinn::rustls::pki_types::{PrivateKeyDer,CertificateDer,pem::PemObject};
use quinn::Endpoint;
use std::io;
@@ -21,25 +20,32 @@ use tokio::sync::{RwLock,broadcast,mpsc};
use crate::config::Config;
use crate::state::State;
use crate::state::{State,StateError};
#[derive(Hash,Eq,PartialEq,Clone,Serialize,Deserialize)]
#[derive(Hash,Eq,PartialEq,Clone,Serialize,Deserialize,Debug)]
pub struct Coordinate(Vec<i64>);
impl From<fedichat::RoomId> for Coordinate {
fn from(other: fedichat::RoomId) -> Coordinate {
Coordinate(other.coordinates)
}
}
#[tokio::main]
#[instrument]
async fn main() -> ExitCode {
// NOTE: This doesn't work as you can only initialize the global logger once
// Initial logger so we have something during config
tracing::subscriber::set_global_default(
tracing_subscriber::fmt().with_max_level(Level::WARN).finish()
).expect("Failed to setup logger");
//tracing::subscriber::set_global_default(
// tracing_subscriber::fmt().with_max_level(Level::WARN).finish()
//).expect("Failed to setup logger");
// Read in config
let config = match Config::load() {
Ok(c) => c,
Err(e) => {
error!("Problem while reading config file");
error!("{:?}",e);
eprintln!("Problem while reading config file");
eprintln!("{:?}",e);
return ExitCode::FAILURE;
}
};
@@ -54,7 +60,7 @@ async fn main() -> ExitCode {
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => {
warn!("Invalid loglevel in config: {}",&loglevel);
eprintln!("Invalid loglevel in config: {}",&loglevel);
Level::INFO
},
}
@@ -120,9 +126,7 @@ async fn main() -> ExitCode {
let state = match State::load_from_file(&config.statefile) {
Ok(state) => state,
// Create file if it does not exist
Err(e) => {
match e.kind() {
io::ErrorKind::NotFound => {
Err(StateError::IOError(e)) if e.kind() == io::ErrorKind::NotFound => {
match fs::File::create(&config.statefile) {
// If the statefile is writable then create an empty state
// and use that
@@ -136,13 +140,11 @@ async fn main() -> ExitCode {
}
},
_ => {
Err(e) => {
error!("Could not open or create statefile. Check your config.");
error!("{:?}",e);
return ExitCode::FAILURE;
}
}
},
};
let statehandle = Arc::new(RwLock::new(state));
@@ -201,7 +203,7 @@ async fn main() -> ExitCode {
}
//Save state
match statehandle.write().await.write_to_file(&config.statefile) {
match statehandle.write().await.write_to_file(&config.statefile).await {
Ok(()) => debug!("Successfully wrote state to {:?}",config.statefile),
Err(e) => {
error!("Problem while writing to statefile");
+1207 -36
View File
File diff suppressed because it is too large Load Diff