From 598a8624f5df60494d18808fbaaef8b142062333 Mon Sep 17 00:00:00 2001 From: Waylon Cude Date: Sun, 31 May 2026 14:16:49 -0700 Subject: [PATCH] Added working message commands Also fixed up the subscription forwarder which was sending the wrong type. Oops --- Cargo.lock | 77 ++++++++++++- .../up.sql | 2 +- src/client.rs | 102 ++++++++++++++---- src/db/mod.rs | 66 ++++++++++++ src/db/models.rs | 64 ++++++++++- src/db/schema.rs | 2 +- 6 files changed, 286 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1151d2b..4d90dbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,6 +81,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "base64ct" +version = "1.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06" + [[package]] name = "bcrypt" version = "0.19.1" @@ -278,6 +284,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "const-oid" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" + [[package]] name = "const-oid" version = "0.10.2" @@ -399,6 +411,16 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2657f61fb1dd8bf37a8d51093cc7cee4e77125b22f7753f49b289f831bec2bae" +[[package]] +name = "der" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb" +dependencies = [ + "const-oid 0.9.6", + "zeroize", +] + [[package]] name = "deranged" version = "0.5.8" @@ -478,7 +500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" dependencies = [ "block-buffer", - "const-oid", + "const-oid 0.10.2", "crypto-common", "ctutils", ] @@ -503,6 +525,16 @@ dependencies = [ "syn", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8", + "signature", +] + [[package]] name = "either" version = "1.15.0" @@ -546,10 +578,13 @@ dependencies = [ [[package]] name = "fedichat" version = "0.1.0" -source = "git+https://git.firechicken.net/fedichat/fedichat-lib#49cbd905ceecb7bf7be463f81836742e3a7ddc24" +source = "git+https://git.firechicken.net/fedichat/fedichat-lib#27cff2ced9f1879ddea03d2db3a0c5c5252b8cee" dependencies = [ + "ed25519", + "rmp-serde", "serde", "serde_bytes", + "thiserror 2.0.18", "time", "uuid", ] @@ -1027,6 +1062,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkcs8" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" +dependencies = [ + "der", + "spki", +] + [[package]] name = "postgres-protocol" version = "0.6.11" @@ -1199,6 +1244,15 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", +] + [[package]] name = "rand_core" version = "0.9.5" @@ -1493,6 +1547,15 @@ dependencies = [ "libc", ] +[[package]] +name = "signature" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "siphasher" version = "1.0.3" @@ -1521,6 +1584,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "spki" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d" +dependencies = [ + "base64ct", + "der", +] + [[package]] name = "stringprep" version = "0.1.5" diff --git a/migrations/2026-05-18-182028-0000_create_messages/up.sql b/migrations/2026-05-18-182028-0000_create_messages/up.sql index 137ff1d..491b7e7 100644 --- a/migrations/2026-05-18-182028-0000_create_messages/up.sql +++ b/migrations/2026-05-18-182028-0000_create_messages/up.sql @@ -7,7 +7,7 @@ CREATE TABLE messages ( id BIGSERIAL PRIMARY KEY, room room_id NOT NULL, body TEXT NOT NULL, - signature TEXT NOT NULL, + signature BYTEA NOT NULL, client_timestamp BIGINT NOT NULL, server_timestamp BIGINT NOT NULL, username user_t NOT NULL, diff --git a/src/client.rs b/src/client.rs index 2605f17..c143777 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,4 +1,6 @@ use fedichat::message::{Relevance,TaggedMessage}; +use rmp_serde::encode::Serializer; +use serde::Serialize; use fedichat::client::{ClientMessage,SignedClientMessage,ServerMessage,ServerError}; use fedichat::state::{StatePermissionKey,StatePermissionValue}; @@ -9,7 +11,6 @@ use std::collections::{HashMap,HashSet}; use std::fs::File; use std::io::{Read,Write}; use std::sync::{Arc}; -use thiserror::Error; use tokio::sync::{broadcast,mpsc,RwLock}; use tokio::select; use uuid::Uuid; @@ -18,7 +19,7 @@ use crate::db; use crate::state::{self,State}; use quinn::ConnectionError; -use tracing::{warn,error,debug,instrument}; +use tracing::{warn,error,debug,instrument,trace}; pub struct Client { statehandle: Arc>, @@ -122,8 +123,8 @@ impl Client { ServerMessage::Error(e) } }; - // TODO: This needs to override the default options to use a map - let buf = match rmp_serde::to_vec(&result) { + let mut buf = Vec::new(); + match result.serialize(&mut Serializer::new(&mut buf).with_struct_map()) { Ok(b) => b, Err(e) => { error!("Could not serialize message to client"); @@ -153,7 +154,7 @@ impl Client { // kicks are going to work. if let Some(relevance) = result.get_relevance() { if self.subscriptions.contains(&relevance) { - match self.message_ack.send(relevance).await { + match self.message_ack.send(relevance.clone()).await { Ok(()) => (), Err(e) => { error!("Could not send broadcast ACK. Server likely in a broken state."); @@ -163,7 +164,14 @@ impl Client { let send = self.quic_connection.open_uni(); // This is duplicated code, maybe could break into a function // TODO: This needs to override the default options to use a map - let buf = match rmp_serde::to_vec(&result) { + let mut buf = Vec::new(); + trace!("Forwarding message {:?}",result); + let message = match relevance { + Relevance::Post(_) => ServerMessage::Post(result), + Relevance::Message(_) => ServerMessage::MessagePub(result), + Relevance::State(_,_) => ServerMessage::StatePub(result) + }; + match message.serialize(&mut Serializer::new(&mut buf).with_struct_map()) { Ok(b) => b, Err(e) => { error!("Could not serialize message to client"); @@ -173,7 +181,14 @@ impl Client { }; match send.await { Ok(mut send) => match send.write_all(&buf).await { - Ok(()) => (), + Ok(()) => match send.finish() { + Ok(()) => (), + Err(e) => { + debug!("Error encountered while sending subscription: {e}"); + + } + + }, Err(e) => debug!("Could not send message to client: {:?}",e) }, Err(e) => debug!("Could not send message to client: {:?}",e), @@ -244,6 +259,36 @@ impl Client { Err(ServerError::MessageNotForwardable) } } else { + //TODO Check to see if the message has any relevane. If it does then rebroadcast + // The federation handler will be listening to the broadcasts and should + // forward relevant messages along + let tagged_message; + if let Some(r) = message.message.get_relevance() { + // check to make sure we're joined + let authed = match r { + Relevance::Message(id) | Relevance::State(id,_) => + self.statehandle.read().await.is_joined(id.into(),fedichat::User{name: username.clone(),server:config.hostname.clone()}).await, + Relevance::Post(_) => true + + }; + if authed { + debug!("Sending post"); + tagged_message = Some(message.clone().tag(user.clone(),my_addr.clone())); + // This is a safe unwrap because the tagged message is set to Some() + // right above + self.message_send.send(tagged_message.clone().unwrap()) + .map_err(|e| { + warn!("Could not forward message locally from {} with error `{e}`",username); + ServerError::Generic + + })?; + } else { + debug!("Failed to send broadcast message, user {} not joined to room",&username); + tagged_message = None; + } + } else { + tagged_message = None; + } match message.message { Auth{ @@ -274,11 +319,19 @@ impl Client { // Should it be one message type or multiple? How does end-to-end // encryption work here? It could be done in a hacky way with extra html tags Message { - body, - room_id, - id, + ref body, + ref room_id, + id: _, } => { - unimplemented!() + if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await { + // Just has to push the message to the database + // Should return the message id + // + // try! should never fail + db::insert_message(self.get_db().await?,tagged_message.ok_or(ServerError::Generic)?,body,room_id).await + } else { + Err(ServerError::NotJoined(room_id.clone())) + } }, // Private message/invite mechanism MessagePost { @@ -286,13 +339,17 @@ impl Client { body: ref _body, user: ref _other_user, } => { - let tagged = message.tag(user,my_addr); - match self.message_send.send(tagged) { - Ok(_) => Ok(ServerMessage::Ok), - // Note: we are a receiver so there should always be at least one - // This error should never happen - Err(_e) => Err(ServerError::Generic) - } + + //NOTE: The logic to forward this is handled before this match statement + // so there is nothing left to do here + Ok(ServerMessage::Ok) + //let tagged = message.tag(user,my_addr); + //match self.message_send.send(tagged) { + // Ok(_) => Ok(ServerMessage::Ok), + // // Note: we are a receiver so there should always be at least one + // // This error should never happen + // Err(_e) => Err(ServerError::Generic) + //} }, // Replace the body of the message with a new one MessageEdit { @@ -558,7 +615,14 @@ impl Client { FetchMessages { count, end, - } => {unimplemented!()} + room_id + } => { + if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await { + db::get_messages(self.get_db().await?,room_id,config.hostname.clone(),count,end).await + } else { + Err(ServerError::NotJoined(room_id)) + } + } } diff --git a/src/db/mod.rs b/src/db/mod.rs index 68ce196..df736b5 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -8,6 +8,7 @@ use diesel_async::AsyncPgConnection; use diesel_async::RunQueryDsl;//, AsyncConnection}; use diesel_async::pooled_connection::deadpool::Object; use fedichat::client::{ServerMessage,ServerError,AuthMethod}; +use fedichat::message::TaggedMessage; use rand::distr::{Alphanumeric, SampleString}; use rand::rngs::{SysRng,StdRng}; use rand::prelude::*; @@ -125,3 +126,68 @@ pub async fn gen_token(mut connection: Object, user: String) } } +#[instrument(skip_all)] +pub async fn insert_message( + mut connection: Object, + message: TaggedMessage, + body: &String, + room_id: &fedichat::RoomId +) -> Result { + let insertable = models::NewMessage{ + room: models::RoomId::from_fedichat(room_id,message.target.0), + body: body.clone(), + signature: message.signature.to_vec(), + client_timestamp: message.client_timestamp.unix_timestamp(), + server_timestamp: message.server_timestamp.unix_timestamp(), + username: message.user.into() + }; + let result = diesel::insert_into(schema::messages::table) + .values(&insertable) + .returning(models::Message::as_returning()) + .get_result(&mut *connection) + .await; + + match result { + // TODO: might need to check this? I don't know what it means + Ok(models::Message{id, ..}) => { + // Oh no type conversion + Ok(ServerMessage::OkMessage(fedichat::message::MessageId(id as u64))) + }, + // TODO: Probably actually check the error + Err(_e) => Err(ServerError::UserAlreadyExists) + } +} + +// Maybe TODO: There is kind of no point storing the server name as part of the +// message ID right? All messages stored locally are for local rooms + +#[instrument(skip_all)] +pub async fn get_messages( + mut connection: Object, + room_id_s: fedichat::RoomId, + server: String, + count: u64, + end: fedichat::message::MessageId, + +) -> Result { + use schema::messages::dsl::*; + let result = messages + .filter(room.eq(models::RoomId::from_fedichat(&room_id_s,server))) + .order(id.desc()) + .filter(id.lt(end.0 as i64)) + .select(models::Message::as_select()) + .limit(count as i64) + .load(&mut *connection) + .await; + + + + match result { + // If we have more than 0 rows then authentication is successful + Ok(m) => { + Ok(ServerMessage::Messages(m.into_iter().map(|x| x.into()).collect())) + + }, + _ => Err(ServerError::Generic) + } +} diff --git a/src/db/models.rs b/src/db/models.rs index a828e10..59281e8 100644 --- a/src/db/models.rs +++ b/src/db/models.rs @@ -5,6 +5,7 @@ use diesel::expression::AsExpression; use diesel_derive_composite::Composite; use diesel::pg::Pg; +use time::OffsetDateTime; // Gets around broken macro in diesel_derive_composite type MyArr = diesel::sql_types::Array; @@ -17,9 +18,26 @@ pub struct UserT { #[diesel_composite(sql_type = diesel::sql_types::Text)] pub server: String } +impl Into for UserT { + fn into(self) -> fedichat::User { + fedichat::User { + name: self.name, + server: self.server + } + } +} + +impl From for UserT { + fn from(other: fedichat::User) -> Self { + UserT { + name: other.name, + server: other.server + } + } +} #[derive(Debug, Composite, FromSqlRow, AsExpression)] -#[diesel(sql_type = crate::db::schema::sql_types::UserT)] +#[diesel(sql_type = crate::db::schema::sql_types::RoomId)] pub struct RoomId { #[diesel_composite(sql_type = MyArr)] pub coordinates: Vec, @@ -27,6 +45,15 @@ pub struct RoomId { pub server: String } +impl RoomId { + pub fn from_fedichat(other: &fedichat::RoomId, server: String) -> Self { + RoomId { + coordinates: other.coordinates.clone(), + server + } + } +} + #[derive(Queryable, Selectable)] #[diesel(table_name = crate::db::schema::groups)] pub struct Group { @@ -62,13 +89,42 @@ pub struct NewUser { pub password: String, } -#[derive(Queryable, Selectable)] +#[derive(Queryable, Selectable,QueryableByName)] #[diesel(table_name = crate::db::schema::messages)] -pub struct Messages { +pub struct Message { pub id: i64, pub room: RoomId, pub body: String, - pub signature: String, + pub signature: Vec, + pub client_timestamp: i64, + pub server_timestamp: i64, + pub username: UserT, + pub edited: Option +} + +impl Into for Message { + fn into(self) -> fedichat::message::TaggedMessage { + fedichat::message::TaggedMessage { + signature: self.signature.into(), + client_timestamp: OffsetDateTime::from_unix_timestamp(self.client_timestamp).expect("Invalid timestamp conversion"), + server_timestamp: OffsetDateTime::from_unix_timestamp(self.server_timestamp).expect("Invalid timestamp conversion"), + target: fedichat::ServerAddr(self.room.server), + user: self.username.into(), + message: fedichat::client::ClientMessage::Message { + body: self.body, + room_id: fedichat::RoomId{ coordinates: self.room.coordinates}, + id: Some(fedichat::message::MessageId(self.id as u64)) + } + } + } +} + +#[derive(Insertable)] +#[diesel(table_name = crate::db::schema::messages)] +pub struct NewMessage { + pub room: RoomId, + pub body: String, + pub signature: Vec, pub client_timestamp: i64, pub server_timestamp: i64, pub username: UserT diff --git a/src/db/schema.rs b/src/db/schema.rs index b1588f7..c6ba6a2 100644 --- a/src/db/schema.rs +++ b/src/db/schema.rs @@ -31,7 +31,7 @@ diesel::table! { id -> Int8, room -> RoomId, body -> Text, - signature -> Text, + signature -> Bytea, client_timestamp -> Int8, server_timestamp -> Int8, username -> UserT,