Added working message commands

Also fixed up the subscription forwarder which was sending the wrong
type. Oops
This commit is contained in:
2026-05-31 14:16:49 -07:00
parent 0e3600c31f
commit 598a8624f5
6 changed files with 286 additions and 27 deletions
Generated
+75 -2
View File
@@ -81,6 +81,12 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "base64ct"
version = "1.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af50177e190e07a26ab74f8b1efbfe2ef87da2116221318cb1c2e82baf7de06"
[[package]]
name = "bcrypt"
version = "0.19.1"
@@ -278,6 +284,12 @@ dependencies = [
"uuid",
]
[[package]]
name = "const-oid"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8"
[[package]]
name = "const-oid"
version = "0.10.2"
@@ -399,6 +411,16 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2657f61fb1dd8bf37a8d51093cc7cee4e77125b22f7753f49b289f831bec2bae"
[[package]]
name = "der"
version = "0.7.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7c1832837b905bbfb5101e07cc24c8deddf52f93225eee6ead5f4d63d53ddcb"
dependencies = [
"const-oid 0.9.6",
"zeroize",
]
[[package]]
name = "deranged"
version = "0.5.8"
@@ -478,7 +500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2"
dependencies = [
"block-buffer",
"const-oid",
"const-oid 0.10.2",
"crypto-common",
"ctutils",
]
@@ -503,6 +525,16 @@ dependencies = [
"syn",
]
[[package]]
name = "ed25519"
version = "2.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53"
dependencies = [
"pkcs8",
"signature",
]
[[package]]
name = "either"
version = "1.15.0"
@@ -546,10 +578,13 @@ dependencies = [
[[package]]
name = "fedichat"
version = "0.1.0"
source = "git+https://git.firechicken.net/fedichat/fedichat-lib#49cbd905ceecb7bf7be463f81836742e3a7ddc24"
source = "git+https://git.firechicken.net/fedichat/fedichat-lib#27cff2ced9f1879ddea03d2db3a0c5c5252b8cee"
dependencies = [
"ed25519",
"rmp-serde",
"serde",
"serde_bytes",
"thiserror 2.0.18",
"time",
"uuid",
]
@@ -1027,6 +1062,16 @@ version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd"
[[package]]
name = "pkcs8"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
dependencies = [
"der",
"spki",
]
[[package]]
name = "postgres-protocol"
version = "0.6.11"
@@ -1199,6 +1244,15 @@ dependencies = [
"rand_core 0.9.5",
]
[[package]]
name = "rand_core"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [
"getrandom 0.2.17",
]
[[package]]
name = "rand_core"
version = "0.9.5"
@@ -1493,6 +1547,15 @@ dependencies = [
"libc",
]
[[package]]
name = "signature"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77549399552de45a898a580c1b41d445bf730df867cc44e6c0233bbc4b8329de"
dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "siphasher"
version = "1.0.3"
@@ -1521,6 +1584,16 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "spki"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d91ed6c858b01f942cd56b37a94b3e0a1798290327d1236e4d9cf4eaca44d29d"
dependencies = [
"base64ct",
"der",
]
[[package]]
name = "stringprep"
version = "0.1.5"
@@ -7,7 +7,7 @@ CREATE TABLE messages (
id BIGSERIAL PRIMARY KEY,
room room_id NOT NULL,
body TEXT NOT NULL,
signature TEXT NOT NULL,
signature BYTEA NOT NULL,
client_timestamp BIGINT NOT NULL,
server_timestamp BIGINT NOT NULL,
username user_t NOT NULL,
+82 -18
View File
@@ -1,4 +1,6 @@
use fedichat::message::{Relevance,TaggedMessage};
use rmp_serde::encode::Serializer;
use serde::Serialize;
use fedichat::client::{ClientMessage,SignedClientMessage,ServerMessage,ServerError};
use fedichat::state::{StatePermissionKey,StatePermissionValue};
@@ -9,7 +11,6 @@ use std::collections::{HashMap,HashSet};
use std::fs::File;
use std::io::{Read,Write};
use std::sync::{Arc};
use thiserror::Error;
use tokio::sync::{broadcast,mpsc,RwLock};
use tokio::select;
use uuid::Uuid;
@@ -18,7 +19,7 @@ use crate::db;
use crate::state::{self,State};
use quinn::ConnectionError;
use tracing::{warn,error,debug,instrument};
use tracing::{warn,error,debug,instrument,trace};
pub struct Client {
statehandle: Arc<RwLock<State>>,
@@ -122,8 +123,8 @@ impl Client {
ServerMessage::Error(e)
}
};
// TODO: This needs to override the default options to use a map
let buf = match rmp_serde::to_vec(&result) {
let mut buf = Vec::new();
match result.serialize(&mut Serializer::new(&mut buf).with_struct_map()) {
Ok(b) => b,
Err(e) => {
error!("Could not serialize message to client");
@@ -153,7 +154,7 @@ impl Client {
// kicks are going to work.
if let Some(relevance) = result.get_relevance() {
if self.subscriptions.contains(&relevance) {
match self.message_ack.send(relevance).await {
match self.message_ack.send(relevance.clone()).await {
Ok(()) => (),
Err(e) => {
error!("Could not send broadcast ACK. Server likely in a broken state.");
@@ -163,7 +164,14 @@ impl Client {
let send = self.quic_connection.open_uni();
// This is duplicated code, maybe could break into a function
// TODO: This needs to override the default options to use a map
let buf = match rmp_serde::to_vec(&result) {
let mut buf = Vec::new();
trace!("Forwarding message {:?}",result);
let message = match relevance {
Relevance::Post(_) => ServerMessage::Post(result),
Relevance::Message(_) => ServerMessage::MessagePub(result),
Relevance::State(_,_) => ServerMessage::StatePub(result)
};
match message.serialize(&mut Serializer::new(&mut buf).with_struct_map()) {
Ok(b) => b,
Err(e) => {
error!("Could not serialize message to client");
@@ -173,7 +181,14 @@ impl Client {
};
match send.await {
Ok(mut send) => match send.write_all(&buf).await {
Ok(()) => match send.finish() {
Ok(()) => (),
Err(e) => {
debug!("Error encountered while sending subscription: {e}");
}
},
Err(e) => debug!("Could not send message to client: {:?}",e)
},
Err(e) => debug!("Could not send message to client: {:?}",e),
@@ -244,6 +259,36 @@ impl Client {
Err(ServerError::MessageNotForwardable)
}
} else {
//TODO Check to see if the message has any relevane. If it does then rebroadcast
// The federation handler will be listening to the broadcasts and should
// forward relevant messages along
let tagged_message;
if let Some(r) = message.message.get_relevance() {
// check to make sure we're joined
let authed = match r {
Relevance::Message(id) | Relevance::State(id,_) =>
self.statehandle.read().await.is_joined(id.into(),fedichat::User{name: username.clone(),server:config.hostname.clone()}).await,
Relevance::Post(_) => true
};
if authed {
debug!("Sending post");
tagged_message = Some(message.clone().tag(user.clone(),my_addr.clone()));
// This is a safe unwrap because the tagged message is set to Some()
// right above
self.message_send.send(tagged_message.clone().unwrap())
.map_err(|e| {
warn!("Could not forward message locally from {} with error `{e}`",username);
ServerError::Generic
})?;
} else {
debug!("Failed to send broadcast message, user {} not joined to room",&username);
tagged_message = None;
}
} else {
tagged_message = None;
}
match message.message {
Auth{
@@ -274,11 +319,19 @@ impl Client {
// Should it be one message type or multiple? How does end-to-end
// encryption work here? It could be done in a hacky way with extra html tags
Message {
body,
room_id,
id,
ref body,
ref room_id,
id: _,
} => {
unimplemented!()
if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await {
// Just has to push the message to the database
// Should return the message id
//
// try! should never fail
db::insert_message(self.get_db().await?,tagged_message.ok_or(ServerError::Generic)?,body,room_id).await
} else {
Err(ServerError::NotJoined(room_id.clone()))
}
},
// Private message/invite mechanism
MessagePost {
@@ -286,13 +339,17 @@ impl Client {
body: ref _body,
user: ref _other_user,
} => {
let tagged = message.tag(user,my_addr);
match self.message_send.send(tagged) {
Ok(_) => Ok(ServerMessage::Ok),
// Note: we are a receiver so there should always be at least one
// This error should never happen
Err(_e) => Err(ServerError::Generic)
}
//NOTE: The logic to forward this is handled before this match statement
// so there is nothing left to do here
Ok(ServerMessage::Ok)
//let tagged = message.tag(user,my_addr);
//match self.message_send.send(tagged) {
// Ok(_) => Ok(ServerMessage::Ok),
// // Note: we are a receiver so there should always be at least one
// // This error should never happen
// Err(_e) => Err(ServerError::Generic)
//}
},
// Replace the body of the message with a new one
MessageEdit {
@@ -558,7 +615,14 @@ impl Client {
FetchMessages {
count,
end,
} => {unimplemented!()}
room_id
} => {
if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await {
db::get_messages(self.get_db().await?,room_id,config.hostname.clone(),count,end).await
} else {
Err(ServerError::NotJoined(room_id))
}
}
}
+66
View File
@@ -8,6 +8,7 @@ use diesel_async::AsyncPgConnection;
use diesel_async::RunQueryDsl;//, AsyncConnection};
use diesel_async::pooled_connection::deadpool::Object;
use fedichat::client::{ServerMessage,ServerError,AuthMethod};
use fedichat::message::TaggedMessage;
use rand::distr::{Alphanumeric, SampleString};
use rand::rngs::{SysRng,StdRng};
use rand::prelude::*;
@@ -125,3 +126,68 @@ pub async fn gen_token(mut connection: Object<AsyncPgConnection>, user: String)
}
}
#[instrument(skip_all)]
pub async fn insert_message(
mut connection: Object<AsyncPgConnection>,
message: TaggedMessage,
body: &String,
room_id: &fedichat::RoomId
) -> Result<ServerMessage,ServerError> {
let insertable = models::NewMessage{
room: models::RoomId::from_fedichat(room_id,message.target.0),
body: body.clone(),
signature: message.signature.to_vec(),
client_timestamp: message.client_timestamp.unix_timestamp(),
server_timestamp: message.server_timestamp.unix_timestamp(),
username: message.user.into()
};
let result = diesel::insert_into(schema::messages::table)
.values(&insertable)
.returning(models::Message::as_returning())
.get_result(&mut *connection)
.await;
match result {
// TODO: might need to check this? I don't know what it means
Ok(models::Message{id, ..}) => {
// Oh no type conversion
Ok(ServerMessage::OkMessage(fedichat::message::MessageId(id as u64)))
},
// TODO: Probably actually check the error
Err(_e) => Err(ServerError::UserAlreadyExists)
}
}
// Maybe TODO: There is kind of no point storing the server name as part of the
// message ID right? All messages stored locally are for local rooms
#[instrument(skip_all)]
pub async fn get_messages(
mut connection: Object<AsyncPgConnection>,
room_id_s: fedichat::RoomId,
server: String,
count: u64,
end: fedichat::message::MessageId,
) -> Result<ServerMessage,ServerError> {
use schema::messages::dsl::*;
let result = messages
.filter(room.eq(models::RoomId::from_fedichat(&room_id_s,server)))
.order(id.desc())
.filter(id.lt(end.0 as i64))
.select(models::Message::as_select())
.limit(count as i64)
.load(&mut *connection)
.await;
match result {
// If we have more than 0 rows then authentication is successful
Ok(m) => {
Ok(ServerMessage::Messages(m.into_iter().map(|x| x.into()).collect()))
},
_ => Err(ServerError::Generic)
}
}
+60 -4
View File
@@ -5,6 +5,7 @@ use diesel::expression::AsExpression;
use diesel_derive_composite::Composite;
use diesel::pg::Pg;
use time::OffsetDateTime;
// Gets around broken macro in diesel_derive_composite
type MyArr = diesel::sql_types::Array<diesel::sql_types::BigInt>;
@@ -17,9 +18,26 @@ pub struct UserT {
#[diesel_composite(sql_type = diesel::sql_types::Text)]
pub server: String
}
impl Into<fedichat::User> for UserT {
fn into(self) -> fedichat::User {
fedichat::User {
name: self.name,
server: self.server
}
}
}
impl From<fedichat::User> for UserT {
fn from(other: fedichat::User) -> Self {
UserT {
name: other.name,
server: other.server
}
}
}
#[derive(Debug, Composite, FromSqlRow, AsExpression)]
#[diesel(sql_type = crate::db::schema::sql_types::UserT)]
#[diesel(sql_type = crate::db::schema::sql_types::RoomId)]
pub struct RoomId {
#[diesel_composite(sql_type = MyArr)]
pub coordinates: Vec<i64>,
@@ -27,6 +45,15 @@ pub struct RoomId {
pub server: String
}
impl RoomId {
pub fn from_fedichat(other: &fedichat::RoomId, server: String) -> Self {
RoomId {
coordinates: other.coordinates.clone(),
server
}
}
}
#[derive(Queryable, Selectable)]
#[diesel(table_name = crate::db::schema::groups)]
pub struct Group {
@@ -62,13 +89,42 @@ pub struct NewUser {
pub password: String,
}
#[derive(Queryable, Selectable)]
#[derive(Queryable, Selectable,QueryableByName)]
#[diesel(table_name = crate::db::schema::messages)]
pub struct Messages {
pub struct Message {
pub id: i64,
pub room: RoomId,
pub body: String,
pub signature: String,
pub signature: Vec<u8>,
pub client_timestamp: i64,
pub server_timestamp: i64,
pub username: UserT,
pub edited: Option<bool>
}
impl Into<fedichat::message::TaggedMessage> for Message {
fn into(self) -> fedichat::message::TaggedMessage {
fedichat::message::TaggedMessage {
signature: self.signature.into(),
client_timestamp: OffsetDateTime::from_unix_timestamp(self.client_timestamp).expect("Invalid timestamp conversion"),
server_timestamp: OffsetDateTime::from_unix_timestamp(self.server_timestamp).expect("Invalid timestamp conversion"),
target: fedichat::ServerAddr(self.room.server),
user: self.username.into(),
message: fedichat::client::ClientMessage::Message {
body: self.body,
room_id: fedichat::RoomId{ coordinates: self.room.coordinates},
id: Some(fedichat::message::MessageId(self.id as u64))
}
}
}
}
#[derive(Insertable)]
#[diesel(table_name = crate::db::schema::messages)]
pub struct NewMessage {
pub room: RoomId,
pub body: String,
pub signature: Vec<u8>,
pub client_timestamp: i64,
pub server_timestamp: i64,
pub username: UserT
+1 -1
View File
@@ -31,7 +31,7 @@ diesel::table! {
id -> Int8,
room -> RoomId,
body -> Text,
signature -> Text,
signature -> Bytea,
client_timestamp -> Int8,
server_timestamp -> Int8,
username -> UserT,