More plumbing, client partially implemented

Working through client implementation. The broadcast recieve + send is
working now, just have to get through all the incoming message types and
state and db work now
This commit is contained in:
2026-05-17 13:45:02 -07:00
parent 42bcebb50c
commit 907c6a8fb0
7 changed files with 363 additions and 34 deletions
Generated
+4 -4
View File
@@ -231,6 +231,7 @@ dependencies = [
"diesel-async", "diesel-async",
"fedichat", "fedichat",
"quinn", "quinn",
"rmp-serde",
"serde", "serde",
"thiserror 2.0.18", "thiserror 2.0.18",
"tokio", "tokio",
@@ -486,9 +487,8 @@ dependencies = [
[[package]] [[package]]
name = "fedichat" name = "fedichat"
version = "0.1.0" version = "0.1.0"
source = "git+https://git.firechicken.net/fedichat/fedichat-lib#ba358085728410eb76681dc675a126871b5bd5da" source = "git+https://git.firechicken.net/fedichat/fedichat-lib#a3f54705495d9aa54ffe80501e1f68876e3e9480"
dependencies = [ dependencies = [
"rmp-serde",
"serde", "serde",
"serde_bytes", "serde_bytes",
"time", "time",
@@ -2248,9 +2248,9 @@ checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
[[package]] [[package]]
name = "winnow" name = "winnow"
version = "1.0.2" version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ee1708bef14716a11bae175f579062d4554d95be2c6829f518df847b7b3fdd0" checksum = "0592e1c9d151f854e6fd382574c3a0855250e1d9b2f99d9281c6e6391af352f1"
[[package]] [[package]]
name = "wit-bindgen" name = "wit-bindgen"
+2
View File
@@ -7,6 +7,7 @@ edition = "2024"
diesel = { version = "2.3.9" } diesel = { version = "2.3.9" }
quinn = "0.11.9" quinn = "0.11.9"
fedichat = {git = "https://git.firechicken.net/fedichat/fedichat-lib"} fedichat = {git = "https://git.firechicken.net/fedichat/fedichat-lib"}
#fedichat = {path = "../fedichat-lib"}
tracing = "0.1.44" tracing = "0.1.44"
tracing-subscriber = "0.3.23" tracing-subscriber = "0.3.23"
clap = { version = "4.6.1", features = ["derive"] } clap = { version = "4.6.1", features = ["derive"] }
@@ -16,3 +17,4 @@ thiserror = "2.0.18"
toml = "1.1.2" toml = "1.1.2"
diesel-async = { version = "0.9.0", features = ["postgres","deadpool"] } diesel-async = { version = "0.9.0", features = ["postgres","deadpool"] }
ctrlc-async = { version = "3.2.2", features = ["termination"] } ctrlc-async = { version = "3.2.2", features = ["termination"] }
rmp-serde = "1.3.1"
+3
View File
@@ -7,6 +7,9 @@ keyfile = "/etc/letsencrypt/confetti/privkey.pem"
media_directory = "/srv/confetti/media" media_directory = "/srv/confetti/media"
statefile = "./confetti.state" statefile = "./confetti.state"
loglevel = "debug" loglevel = "debug"
max_message_len_kb = 10000
# Optional
# account_creation_code = "password1"
[database] [database]
url = "my.db.server" url = "my.db.server"
+308 -19
View File
@@ -1,19 +1,24 @@
use fedichat::RoomId; use fedichat::RoomId;
use fedichat::client::TaggedClientMessage; use fedichat::message::{Relevance,TaggedMessage};
use fedichat::client::{ClientMessage,SignedClientMessage,ServerMessage,ServerError};
use fedichat::state::StatePath; use fedichat::state::StatePath;
use diesel_async::AsyncPgConnection; use diesel_async::AsyncPgConnection;
use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::pooled_connection::deadpool::Pool;
use tokio::sync::{broadcast,mpsc,RwLock}; use rmp_serde;
use tokio::select;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::{Arc}; use std::sync::{Arc};
use quinn::{SendStream,RecvStream}; use thiserror::Error;
use tokio::sync::{broadcast,mpsc,RwLock};
use tokio::select;
use crate::state::State; use crate::state::State;
pub enum MessageId { use tracing::{warn,error,debug};
State(RoomId,StatePath),
Messages(RoomId) #[derive(Error,Debug)]
enum HandlerError {
#[error("Foo")]
Foo
} }
pub struct Client { pub struct Client {
@@ -23,30 +28,31 @@ pub struct Client {
// Same with state changes // Same with state changes
// Potentially could be turned into a more efficient localised filter maybe // Potentially could be turned into a more efficient localised filter maybe
// Remote messages can come through the same channel maybe? // Remote messages can come through the same channel maybe?
message_send: broadcast::Sender<TaggedClientMessage>, message_send: broadcast::Sender<TaggedMessage>,
// This probably could be a more specific type // This probably could be a more specific type
// But we should be filtering and sending back only stuff that matters // But we should be filtering and sending back only stuff that matters
message_recv: broadcast::Receiver<TaggedClientMessage>, message_recv: broadcast::Receiver<TaggedMessage>,
// Sends back match if anything filtered matched the client filters // Sends back match if anything filtered matched the client filters
// Connections are closed after a period of inactivity so this should be fine, // Connections are closed after a period of inactivity so this should be fine,
// if client is connected acks are sent, if inactive connection is closed // if client is connected acks are sent, if inactive connection is closed
message_ack: mpsc::Sender<MessageId>, message_ack: mpsc::Sender<Relevance>,
db_handle: Pool<AsyncPgConnection>, db_handle: Pool<AsyncPgConnection>,
// Filled once user is authed // Filled once user is authed
username: Option<String>, username: Option<String>,
// how do I keep track of activity??? maybe there's a hashmap that gets updated // how do I keep track of activity??? maybe there's a hashmap that gets updated
subscriptions: HashSet<MessageId>, subscriptions: HashSet<Relevance>,
quic_connection: quinn::Connection, quic_connection: quinn::Connection,
close_handle: broadcast::Receiver<()> close_handle: broadcast::Receiver<()>,
in_challenge: Option<(String,SignedClientMessage)>
} }
impl Client { impl Client {
pub fn new( pub fn new(
statehandle: Arc<RwLock<State>>, statehandle: Arc<RwLock<State>>,
(message_send,message_recv): ( (message_send,message_recv): (
broadcast::Sender<TaggedClientMessage>, broadcast::Sender<TaggedMessage>,
broadcast::Receiver<TaggedClientMessage>), broadcast::Receiver<TaggedMessage>),
message_ack: mpsc::Sender<MessageId>, message_ack: mpsc::Sender<Relevance>,
db_handle: Pool<AsyncPgConnection>, db_handle: Pool<AsyncPgConnection>,
quic_connection: quinn::Connection, quic_connection: quinn::Connection,
close_handle: broadcast::Receiver<()> close_handle: broadcast::Receiver<()>
@@ -58,6 +64,7 @@ impl Client {
message_ack, message_ack,
db_handle, db_handle,
username: None, username: None,
in_challenge: None,
subscriptions: HashSet::new(), subscriptions: HashSet::new(),
quic_connection, quic_connection,
close_handle close_handle
@@ -65,17 +72,99 @@ impl Client {
} }
pub async fn run(mut self) { pub async fn run(mut self,config: crate::config::Config) {
let mut chunk_arr = [0u8; 1024];
let mut serde_buf: Vec<u8> = Vec::new();
loop { loop {
// Wait for either a client to send a message, a message that we might need to
// forward, or for the graceful shutdown signal
select!{ select!{
//result = self.quic_recv.read(&mut chunk_arr) => { //result = self.quic_recv.read(&mut chunk_arr) => {
// unimplemented!() // unimplemented!()
//} //}
// Incoming connection, read it to end and put it in a message
result = self.quic_connection.accept_bi() => {
match result {
Err(e) => {
warn!("Failed to accept incoming connection: {:?}",e);
continue;
},
Ok((mut send,mut recv)) => {
let message: fedichat::client::SignedClientMessage = match recv.read_to_end(config.max_message_len_kb*1024).await {
Err(e) => {
warn!("Error while reading client request: {:?}",e);
continue;
},
Ok(bytes) => {
match rmp_serde::from_slice(&bytes) {
Ok(message) => message,
Err(e) => {
warn!("Could not parse message from client: {:?}",e);
continue;
}
}
}
};
let result = match self.handle_message(message).await {
Ok(res) => res,
Err(e) => {
warn!("Failed to handle client message with error {}",e);
continue;
}
};
let buf = match rmp_serde::to_vec(&result) {
Ok(b) => b,
Err(e) => {
error!("Could not serialize message to client");
error!("{:?}",e);
continue;
}
};
match send.write_all(&buf).await {
Ok(()) => (),
Err(e) => debug!("Could not send message to client: {:?}",e)
}
}
}
}
result = self.message_recv.recv() => { result = self.message_recv.recv() => {
let result = match result {
Ok(val) => val,
Err(e) => {
warn!("Problem while processing broadcast stream: {:?}",e);
continue;
}
};
// Check if the message is in our subscriptions
// If it is then send it down to the user
if let Some(relevance) = result.get_relevance() {
if self.subscriptions.contains(&relevance) {
match self.message_ack.send(relevance).await {
Ok(()) => (),
Err(e) => {
error!("Could not send broadcast ACK. Server likely in a broken state.");
error!("{}",e);
}
};
let send = self.quic_connection.open_uni();
// This is duplicated code, maybe could break into a function
let buf = match rmp_serde::to_vec(&result) {
Ok(b) => b,
Err(e) => {
error!("Could not serialize message to client");
error!("{:?}",e);
continue;
}
};
match send.await {
Ok(mut send) => match send.write_all(&buf).await {
Ok(()) => (),
Err(e) => debug!("Could not send message to client: {:?}",e)
},
Err(e) => debug!("Could not send message to client: {:?}",e),
}
}
}
unimplemented!() unimplemented!()
} }
_result = self.close_handle.recv() => { _result = self.close_handle.recv() => {
@@ -88,4 +177,204 @@ impl Client {
// Do any cleanup that needs done // Do any cleanup that needs done
} }
// Handles message and send back the right response.
async fn handle_message(&mut self, message: SignedClientMessage) -> Result<ServerMessage,HandlerError> {
// 3 states
// waiting on challenge
// waiting on auth
// normal operation, no reauth allowed
if let Some((ref challenge,ref saved_msg)) = self.in_challenge {
if let ClientMessage::ChallengeAnswer {response} = message.message {
// If we successfully complete the challenge then perform the given action
if response == *challenge {
unimplemented!()
// If we fail the challenge then reset the challenge
} else {
unimplemented!()
}
// hmm users should probably be able to update state and do everything else still
// right?? Is forcing the to immediately complete the challenge too much?
} else {
unimplemented!()
}
} else if let Some(ref username) = self.username {
use fedichat::client::ClientMessage::*;
match message.message {
Auth{
username: _username,
password: _password
} => {
return Ok(ServerMessage::Error(ServerError::AlreadyAuthenticated))
},
// Maybe ask for email too? Or a potential invite code
UserCreate {
username: _username,
password: _password,
} => {
return Ok(ServerMessage::Error(ServerError::AlreadyAuthenticated))
},
// Used to require accounts to complete some kind of challenge. Simplest
// is giving a password/invite code to create an account or join a room
ChallengeAnswer {
response: _,
} => {
return Ok(ServerMessage::Error(ServerError::NotInChallenge))
},
// Should it be one message type or multiple? How does end-to-end
// encryption work here? It could be done in a hacky way with extra tags
Message {
body,
room_id,
} => {
unimplemented!()
},
// Private message/invite mechanism
MessagePost {
body,
user,
} => {unimplemented!()},
// Replace the body of the message with a new one
MessageEdit {
body,
id,
room_id,
} => {unimplemented!()},
MessageDelete {
id,
room_id,
} => {unimplemented!()},
// State Actions
StateCreate {
room_id,
path,
ty,
permissions,
} => {unimplemented!()},
StateWrite {
room_id,
path,
content,
} => {unimplemented!()},
StateDelete {
room_id,
path,
} => {unimplemented!()},
StateAppend {
room_id,
path,
content,
} => {unimplemented!()},
StateMove {
room_id,
path,
target,
} => {unimplemented!()},
StateRead {
room_id,
path,
} => {unimplemented!()},
PermissionAdd {
permission,
} => {unimplemented!()},
PermissionDelete {
permission,
} => {unimplemented!()},
// Groups really should have a way to add permissions by user
// specifically for who can join or invite others
//
// Maybe make a group -> role -> member hierarchy?
//
//
// Could always do this through a bot that owns a group??
GroupCreate {
group,
users,
} => {unimplemented!()},
// Only the creator of a group or a server admin can delete groups
GroupDelete {
group,
} => {unimplemented!()},
// Only the creator of a group or a server admin can delete groups
// same with adding, though there should be a way to add group officers
// at some point
GroupUserAdd {
group,
users,
} => {unimplemented!()},
GroupUserRemove {
group,
users,
} => {unimplemented!()},
GroupRoleCreate {
group,
role,
} => {unimplemented!()},
GroupRoleDelete {
group,
role,
} => {unimplemented!()},
GroupRoleUserAdd {
group,
role,
users,
} => {unimplemented!()},
GroupRoleUserRemove {
group,
role,
users,
} => {unimplemented!()},
// Should work like discord roles
// Can control who can invite to the group
// Can be used with permissions to make rooms that are private for individual roles
GroupRolePowerAdd {
group,
role,
power,
} => {unimplemented!()},
GroupRolePowerRemove {
group,
role,
power,
} => {unimplemented!()},
// Returns an ID to use for message sending
// The server can potentially use the current username to associate media uploads
// with users
MediaUpload {
bytes,
} => {unimplemented!()},
// Join and subscribe
Join {
room_id,
} => {unimplemented!()},
SubscribeMessages {
room_id,
} => {unimplemented!()},
SubscribeState {
room_id,
state,
} => {unimplemented!()},
FetchMessages {
count,
end,
} => {unimplemented!()}
}
} else if let ClientMessage::Auth {username,password} = message.message {
unimplemented!()
} else if let ClientMessage::UserCreate {username,password} = message.message {
unimplemented!()
} else {
return Ok(ServerMessage::Error(ServerError::NotAuthenticated));
}
}
} }
+4 -2
View File
@@ -14,8 +14,10 @@ pub struct Config {
pub certfile: String, pub certfile: String,
pub keyfile: String, pub keyfile: String,
pub statefile: String, pub statefile: String,
pub loglevel: String, pub loglevel: Option<String>,
pub media_directory: String pub media_directory: String,
pub max_message_len_kb: usize,
pub account_creation_code: Option<String>
} }
#[derive(Clone,Serialize,Deserialize)] #[derive(Clone,Serialize,Deserialize)]
pub struct DBConfig { pub struct DBConfig {
+9 -7
View File
@@ -1,8 +1,8 @@
use crate::client::{Client,MessageId}; use crate::client::{Client};
use crate::state::State; use crate::state::State;
use tokio::sync::{mpsc,RwLock,broadcast}; use tokio::sync::{mpsc,RwLock,broadcast};
use std::sync::Arc; use std::sync::Arc;
use fedichat::client::TaggedClientMessage; use fedichat::message::{Relevance,TaggedMessage};
use diesel_async::pooled_connection::deadpool::Pool; use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::{AsyncPgConnection}; use diesel_async::{AsyncPgConnection};
use tracing::{instrument,warn}; use tracing::{instrument,warn};
@@ -11,12 +11,13 @@ use tracing::{instrument,warn};
pub async fn client_handler( pub async fn client_handler(
statehandle: Arc<RwLock<State>>, statehandle: Arc<RwLock<State>>,
(message_send,message_recv): ( (message_send,message_recv): (
broadcast::Sender<TaggedClientMessage>, broadcast::Sender<TaggedMessage>,
broadcast::Receiver<TaggedClientMessage>), broadcast::Receiver<TaggedMessage>),
message_ack_send: mpsc::Sender<MessageId>, message_ack_send: mpsc::Sender<Relevance>,
db_handle: Pool<AsyncPgConnection>, db_handle: Pool<AsyncPgConnection>,
endpoint: quinn::Endpoint, endpoint: quinn::Endpoint,
close_handle: broadcast::Receiver<()> close_handle: broadcast::Receiver<()>,
config: crate::config::Config
) { ) {
let mut thread_handles = Vec::new(); let mut thread_handles = Vec::new();
@@ -39,6 +40,7 @@ pub async fn client_handler(
let close_handle = close_handle.resubscribe(); let close_handle = close_handle.resubscribe();
let db_handle = db_handle.clone(); let db_handle = db_handle.clone();
let message_ack_send = message_ack_send.clone(); let message_ack_send = message_ack_send.clone();
let config = config.clone();
thread_handles.push(tokio::spawn(async move { thread_handles.push(tokio::spawn(async move {
// listen for the first bidirectional stream // listen for the first bidirectional stream
@@ -52,7 +54,7 @@ pub async fn client_handler(
close_handle close_handle
); );
// run client // run client
client.run() client.run(config)
})); }));
} }
+33 -2
View File
@@ -15,7 +15,7 @@ use std::process::ExitCode;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use serde::{Deserialize,Serialize}; use serde::{Deserialize,Serialize};
use tracing::{error,instrument,warn,debug}; use tracing::{error,instrument,warn,debug,Level};
use tokio::sync::{RwLock,broadcast,mpsc}; use tokio::sync::{RwLock,broadcast,mpsc};
@@ -28,6 +28,11 @@ pub struct Coordinate(Vec<i64>);
#[tokio::main] #[tokio::main]
#[instrument] #[instrument]
async fn main() -> ExitCode { async fn main() -> ExitCode {
// Initial logger so we have something during config
tracing::subscriber::set_global_default(
tracing_subscriber::fmt().with_max_level(Level::WARN).finish()
).expect("Failed to setup logger");
// Read in config // Read in config
let config = match Config::load() { let config = match Config::load() {
Ok(c) => c, Ok(c) => c,
@@ -38,6 +43,30 @@ async fn main() -> ExitCode {
} }
}; };
let level = match config.loglevel {
Some(ref s) => {
let loglevel = s.to_lowercase();
match loglevel.as_str() {
"trace" => Level::TRACE,
"debug" => Level::DEBUG,
"info" => Level::INFO,
"warn" => Level::WARN,
"error" => Level::ERROR,
_ => {
warn!("Invalid loglevel in config: {}",&loglevel);
Level::INFO
},
}
},
// Default to info level
None => Level::INFO
};
tracing::subscriber::set_global_default(
tracing_subscriber::fmt().with_max_level(level).finish()
).expect("Failed to setup logger");
// Set up database connection // Set up database connection
let db_config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(config.database.url.clone()); let db_config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(config.database.url.clone());
let db_pool = match Pool::builder(db_config).build() { let db_pool = match Pool::builder(db_config).build() {
@@ -146,6 +175,7 @@ async fn main() -> ExitCode {
// Create client listener // Create client listener
let statehandle_cloned = statehandle.clone(); let statehandle_cloned = statehandle.clone();
let config_cloned = config.clone();
join_handles.push(tokio::spawn(async move { join_handles.push(tokio::spawn(async move {
connection::client_handler( connection::client_handler(
statehandle_cloned, statehandle_cloned,
@@ -153,7 +183,8 @@ async fn main() -> ExitCode {
message_ack_send, message_ack_send,
db_pool, db_pool,
endpoint, endpoint,
close_recv.resubscribe() close_recv.resubscribe(),
config_cloned
).await; ).await;
} )); } ));