Added lots of debugging + token auth
Now you dont have to send your password over the wire every time and can make do with a token instead. There's lots of debugging info now to make it easier for me to fix bugs with the server
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
/target
|
/target
|
||||||
confetti.toml
|
confetti.toml
|
||||||
|
confetti.state
|
||||||
.env
|
.env
|
||||||
|
|||||||
Generated
+2
@@ -266,9 +266,11 @@ dependencies = [
|
|||||||
"diesel-derive-composite",
|
"diesel-derive-composite",
|
||||||
"fedichat",
|
"fedichat",
|
||||||
"quinn",
|
"quinn",
|
||||||
|
"rand 0.10.1",
|
||||||
"rmp-serde",
|
"rmp-serde",
|
||||||
"serde",
|
"serde",
|
||||||
"thiserror 2.0.18",
|
"thiserror 2.0.18",
|
||||||
|
"time",
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
|||||||
+4
-2
@@ -6,8 +6,8 @@ edition = "2024"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
diesel = { version = "2.3.9", features = ["postgres_backend"] }
|
diesel = { version = "2.3.9", features = ["postgres_backend"] }
|
||||||
quinn = "0.11.9"
|
quinn = "0.11.9"
|
||||||
fedichat = {git = "https://git.firechicken.net/fedichat/fedichat-lib"}
|
#fedichat = {git = "https://git.firechicken.net/fedichat/fedichat-lib"}
|
||||||
#fedichat = {path = "../fedichat-lib"}
|
fedichat = {path = "../fedichat-lib"}
|
||||||
tracing = "0.1.44"
|
tracing = "0.1.44"
|
||||||
tracing-subscriber = "0.3.23"
|
tracing-subscriber = "0.3.23"
|
||||||
clap = { version = "4.6.1", features = ["derive"] }
|
clap = { version = "4.6.1", features = ["derive"] }
|
||||||
@@ -20,4 +20,6 @@ ctrlc-async = { version = "3.2.2", features = ["termination"] }
|
|||||||
rmp-serde = "1.3.1"
|
rmp-serde = "1.3.1"
|
||||||
diesel-derive-composite = "0.1.0"
|
diesel-derive-composite = "0.1.0"
|
||||||
bcrypt = "0.19.1"
|
bcrypt = "0.19.1"
|
||||||
|
time = { version = "0.3.47", features = ["serde"] }
|
||||||
|
rand = "0.10.1"
|
||||||
#postcard = {version = "1.1.3", features = ["use-std"]}
|
#postcard = {version = "1.1.3", features = ["use-std"]}
|
||||||
|
|||||||
@@ -19,4 +19,5 @@ admins = []
|
|||||||
url = "my.db.server"
|
url = "my.db.server"
|
||||||
user = "confetti"
|
user = "confetti"
|
||||||
password = "my_secret_password"
|
password = "my_secret_password"
|
||||||
|
db = "confetti"
|
||||||
num_connections = 8
|
num_connections = 8
|
||||||
|
|||||||
@@ -0,0 +1,2 @@
|
|||||||
|
-- This file should undo anything in `up.sql`
|
||||||
|
DROP TABLE tokens
|
||||||
@@ -0,0 +1,7 @@
|
|||||||
|
-- Your SQL goes here
|
||||||
|
CREATE TABLE tokens (
|
||||||
|
id SERIAL PRIMARY KEY,
|
||||||
|
username TEXT NOT NULL,
|
||||||
|
token TEXT NOT NULL,
|
||||||
|
expiry BIGINT NOT NULL
|
||||||
|
)
|
||||||
+46
-6
@@ -14,7 +14,8 @@ use crate::Coordinate;
|
|||||||
use crate::db;
|
use crate::db;
|
||||||
use crate::state::{self,State};
|
use crate::state::{self,State};
|
||||||
|
|
||||||
use tracing::{warn,error,debug};
|
use quinn::ConnectionError;
|
||||||
|
use tracing::{warn,error,debug,instrument};
|
||||||
|
|
||||||
#[derive(Error,Debug)]
|
#[derive(Error,Debug)]
|
||||||
enum HandlerError {
|
enum HandlerError {
|
||||||
@@ -73,9 +74,11 @@ impl Client {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all)]
|
||||||
pub async fn run(mut self,config: crate::config::Config) {
|
pub async fn run(mut self,config: crate::config::Config) {
|
||||||
// This can probably use a refactoring at some point, this function is huge
|
// This can probably use a refactoring at some point, this function is huge
|
||||||
|
|
||||||
|
debug!("Running handler thread");
|
||||||
loop {
|
loop {
|
||||||
// Wait for either a client to send a message, a message that we might need to
|
// Wait for either a client to send a message, a message that we might need to
|
||||||
// forward, or for the graceful shutdown signal
|
// forward, or for the graceful shutdown signal
|
||||||
@@ -85,10 +88,19 @@ impl Client {
|
|||||||
//}
|
//}
|
||||||
// Incoming connection, read it to end and put it in a message
|
// Incoming connection, read it to end and put it in a message
|
||||||
result = self.quic_connection.accept_bi() => {
|
result = self.quic_connection.accept_bi() => {
|
||||||
|
debug!("Handling incoming message");
|
||||||
match result {
|
match result {
|
||||||
|
Err(ConnectionError::ConnectionClosed(_))
|
||||||
|
| Err(ConnectionError::TimedOut)
|
||||||
|
| Err(ConnectionError::Reset)
|
||||||
|
| Err(ConnectionError::LocallyClosed)
|
||||||
|
| Err(ConnectionError::ApplicationClosed(_))=> {
|
||||||
|
debug!("Connection closed");
|
||||||
|
return;
|
||||||
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!("Failed to accept incoming connection: {:?}",e);
|
warn!("Failed to accept incoming connection: {:?}",e);
|
||||||
continue;
|
return;
|
||||||
},
|
},
|
||||||
Ok((mut send,mut recv)) => {
|
Ok((mut send,mut recv)) => {
|
||||||
let message: fedichat::client::SignedClientMessage = match recv.read_to_end(config.max_message_len_kb*1024).await {
|
let message: fedichat::client::SignedClientMessage = match recv.read_to_end(config.max_message_len_kb*1024).await {
|
||||||
@@ -191,12 +203,15 @@ impl Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Handles message and send back the right response.
|
// Handles message and send back the right response.
|
||||||
|
#[instrument(skip_all)]
|
||||||
async fn handle_message(&mut self, config: &crate::config::Config, message: SignedClientMessage) -> Result<ServerMessage,ServerError> {
|
async fn handle_message(&mut self, config: &crate::config::Config, message: SignedClientMessage) -> Result<ServerMessage,ServerError> {
|
||||||
// 3 states
|
// 3 states
|
||||||
// waiting on challenge
|
// waiting on challenge
|
||||||
// waiting on auth
|
// waiting on auth
|
||||||
// normal operation, no reauth allowed
|
// normal operation, no reauth allowed
|
||||||
|
|
||||||
|
debug!("Handling message: {:?}",&message);
|
||||||
|
|
||||||
if let Some((ref challenge,ref saved_msg)) = self.in_challenge {
|
if let Some((ref challenge,ref saved_msg)) = self.in_challenge {
|
||||||
if let ClientMessage::ChallengeAnswer {response} = message.message {
|
if let ClientMessage::ChallengeAnswer {response} = message.message {
|
||||||
// If we successfully complete the challenge then perform the given action
|
// If we successfully complete the challenge then perform the given action
|
||||||
@@ -255,6 +270,10 @@ impl Client {
|
|||||||
return Ok(ServerMessage::Error(ServerError::NotInChallenge))
|
return Ok(ServerMessage::Error(ServerError::NotInChallenge))
|
||||||
},
|
},
|
||||||
|
|
||||||
|
CreateToken => {
|
||||||
|
Ok(ServerMessage::Token(db::gen_token(self.get_db().await?, username.clone()).await?))
|
||||||
|
},
|
||||||
|
|
||||||
// Should it be one message type or multiple? How does end-to-end
|
// Should it be one message type or multiple? How does end-to-end
|
||||||
// encryption work here? It could be done in a hacky way with extra html tags
|
// encryption work here? It could be done in a hacky way with extra html tags
|
||||||
Message {
|
Message {
|
||||||
@@ -452,6 +471,18 @@ impl Client {
|
|||||||
Ok(ServerMessage::Ok)
|
Ok(ServerMessage::Ok)
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
RoomLeave {
|
||||||
|
room_id,
|
||||||
|
} => {
|
||||||
|
let coords: Coordinate = room_id.clone().into();
|
||||||
|
|
||||||
|
if !self.statehandle.read().await.room_exists(coords.clone()) {
|
||||||
|
Ok(ServerMessage::Error(ServerError::RoomNotFound(room_id.clone())))
|
||||||
|
} else {
|
||||||
|
self.statehandle.write().await.leave(coords,user).await?;
|
||||||
|
Ok(ServerMessage::Ok)
|
||||||
|
}
|
||||||
|
},
|
||||||
RoomCreate {
|
RoomCreate {
|
||||||
room_id,
|
room_id,
|
||||||
} => {
|
} => {
|
||||||
@@ -470,15 +501,17 @@ impl Client {
|
|||||||
SubscribeMessages {
|
SubscribeMessages {
|
||||||
room_id,
|
room_id,
|
||||||
} => {
|
} => {
|
||||||
// TODO: Has to check if user is joined to room first
|
if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await {
|
||||||
self.subscriptions.insert(Relevance::Message(room_id));
|
self.subscriptions.insert(Relevance::Message(room_id));
|
||||||
Ok(ServerMessage::Ok)
|
Ok(ServerMessage::Ok)
|
||||||
|
} else {
|
||||||
|
Err(ServerError::NotJoined(room_id))
|
||||||
|
}
|
||||||
},
|
},
|
||||||
SubscribeState {
|
SubscribeState {
|
||||||
room_id,
|
room_id,
|
||||||
state,
|
state,
|
||||||
} => {
|
} => {
|
||||||
// TODO: Has to check if user is joined to room first
|
|
||||||
if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await {
|
if self.statehandle.read().await.is_joined(room_id.clone().into(),user).await {
|
||||||
self.subscriptions.insert(Relevance::State(room_id,state));
|
self.subscriptions.insert(Relevance::State(room_id,state));
|
||||||
Ok(ServerMessage::Ok)
|
Ok(ServerMessage::Ok)
|
||||||
@@ -511,6 +544,7 @@ impl Client {
|
|||||||
|
|
||||||
Err(ServerError::ChallengeInviteCode)
|
Err(ServerError::ChallengeInviteCode)
|
||||||
} else {
|
} else {
|
||||||
|
self.username = Some(username.clone());
|
||||||
db::maybe_create_user(self.get_db().await ?,username,password).await
|
db::maybe_create_user(self.get_db().await ?,username,password).await
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@@ -578,6 +612,12 @@ impl Client {
|
|||||||
operation: state::StateOperation
|
operation: state::StateOperation
|
||||||
) -> Result<Option<state::StateNode>,ServerError>
|
) -> Result<Option<state::StateNode>,ServerError>
|
||||||
{
|
{
|
||||||
|
// check if user is joined to the given room. If not return early
|
||||||
|
if !self.statehandle.read().await.is_joined(coord.clone().into(),user.clone()).await {
|
||||||
|
return Err(ServerError::NotJoined(coord.clone().to_roomid()))
|
||||||
|
}
|
||||||
|
|
||||||
|
// If they are then process the action
|
||||||
let unlockable = self.statehandle.read().await.to_unlockable_operation(operation).await?;
|
let unlockable = self.statehandle.read().await.to_unlockable_operation(operation).await?;
|
||||||
let locks = unlockable.get_locks();
|
let locks = unlockable.get_locks();
|
||||||
let room_id = coord.clone().into();
|
let room_id = coord.clone().into();
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ pub struct DBConfig {
|
|||||||
pub url: String,
|
pub url: String,
|
||||||
pub user: String,
|
pub user: String,
|
||||||
pub password: String,
|
pub password: String,
|
||||||
|
pub db_name: String,
|
||||||
pub num_connections: usize
|
pub num_connections: usize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+46
-34
@@ -5,7 +5,7 @@ use std::sync::Arc;
|
|||||||
use fedichat::message::{Relevance,TaggedMessage};
|
use fedichat::message::{Relevance,TaggedMessage};
|
||||||
use diesel_async::pooled_connection::deadpool::Pool;
|
use diesel_async::pooled_connection::deadpool::Pool;
|
||||||
use diesel_async::{AsyncPgConnection};
|
use diesel_async::{AsyncPgConnection};
|
||||||
use tracing::{instrument,warn};
|
use tracing::{instrument,warn,trace,debug};
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub async fn client_handler(
|
pub async fn client_handler(
|
||||||
@@ -16,48 +16,60 @@ pub async fn client_handler(
|
|||||||
message_ack_send: mpsc::Sender<Relevance>,
|
message_ack_send: mpsc::Sender<Relevance>,
|
||||||
db_handle: Pool<AsyncPgConnection>,
|
db_handle: Pool<AsyncPgConnection>,
|
||||||
endpoint: quinn::Endpoint,
|
endpoint: quinn::Endpoint,
|
||||||
close_handle: broadcast::Receiver<()>,
|
mut close_handle: broadcast::Receiver<()>,
|
||||||
config: crate::config::Config
|
config: crate::config::Config
|
||||||
) {
|
) {
|
||||||
let mut thread_handles = Vec::new();
|
let mut thread_handles = Vec::new();
|
||||||
|
|
||||||
// Start iterating over incoming connections.
|
// Start iterating over incoming connections.
|
||||||
while let Some(conn) = endpoint.accept().await {
|
|
||||||
let connection = match conn.await {
|
|
||||||
Ok(conn) => conn,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error while processing client connection");
|
|
||||||
warn!("{:?}",e);
|
|
||||||
continue
|
|
||||||
|
|
||||||
}
|
debug!("Starting client handler");
|
||||||
};
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(conn) = endpoint.accept() => {
|
||||||
|
let connection = match conn.await {
|
||||||
|
Ok(conn) => conn,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error while processing client connection");
|
||||||
|
warn!("{:?}",e);
|
||||||
|
continue
|
||||||
|
|
||||||
let statehandle = statehandle.clone();
|
}
|
||||||
let message_send = message_send.clone();
|
};
|
||||||
let message_recv = message_recv.resubscribe();
|
debug!("Handling new client connection from {}",connection.remote_address());
|
||||||
let close_handle = close_handle.resubscribe();
|
|
||||||
let db_handle = db_handle.clone();
|
|
||||||
let message_ack_send = message_ack_send.clone();
|
|
||||||
let config = config.clone();
|
|
||||||
|
|
||||||
thread_handles.push(tokio::spawn(async move {
|
let statehandle = statehandle.clone();
|
||||||
// listen for the first bidirectional stream
|
let message_send = message_send.clone();
|
||||||
// create client
|
let message_recv = message_recv.resubscribe();
|
||||||
let client = Client::new(
|
let close_handle = close_handle.resubscribe();
|
||||||
statehandle,
|
let db_handle = db_handle.clone();
|
||||||
(message_send,message_recv),
|
let message_ack_send = message_ack_send.clone();
|
||||||
message_ack_send,
|
let config = config.clone();
|
||||||
db_handle,
|
|
||||||
connection,
|
thread_handles.push(tokio::spawn(async move {
|
||||||
close_handle
|
debug!("Creating client handler");
|
||||||
);
|
// listen for the first bidirectional stream
|
||||||
// run client
|
// create client
|
||||||
client.run(config)
|
let client = Client::new(
|
||||||
}));
|
statehandle,
|
||||||
|
(message_send,message_recv),
|
||||||
|
message_ack_send,
|
||||||
|
db_handle,
|
||||||
|
connection,
|
||||||
|
close_handle
|
||||||
|
);
|
||||||
|
debug!("Running client handler");
|
||||||
|
// run client
|
||||||
|
client.run(config).await
|
||||||
|
}));
|
||||||
|
},
|
||||||
|
Ok(()) = close_handle.recv() => break,
|
||||||
|
// Connection error of some sort
|
||||||
|
else => break,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
debug!("Exiting client handler");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+82
-23
@@ -7,9 +7,15 @@ use bcrypt::{DEFAULT_COST, hash, verify};
|
|||||||
use diesel_async::AsyncPgConnection;
|
use diesel_async::AsyncPgConnection;
|
||||||
use diesel_async::RunQueryDsl;//, AsyncConnection};
|
use diesel_async::RunQueryDsl;//, AsyncConnection};
|
||||||
use diesel_async::pooled_connection::deadpool::Object;
|
use diesel_async::pooled_connection::deadpool::Object;
|
||||||
use fedichat::client::{ServerMessage,ServerError};
|
use fedichat::client::{ServerMessage,ServerError,AuthMethod};
|
||||||
|
use rand::distr::{Alphanumeric, SampleString};
|
||||||
|
use rand::rngs::{SysRng,StdRng};
|
||||||
|
use rand::prelude::*;
|
||||||
|
|
||||||
|
use time::{OffsetDateTime,Duration};
|
||||||
use tracing::{instrument,warn};
|
use tracing::{instrument,warn};
|
||||||
|
|
||||||
|
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub async fn maybe_create_user(mut connection: Object<AsyncPgConnection>, username: &str, password: &str) -> Result<ServerMessage,ServerError> {
|
pub async fn maybe_create_user(mut connection: Object<AsyncPgConnection>, username: &str, password: &str) -> Result<ServerMessage,ServerError> {
|
||||||
let password = match hash(password,DEFAULT_COST) {
|
let password = match hash(password,DEFAULT_COST) {
|
||||||
@@ -37,32 +43,85 @@ pub async fn maybe_create_user(mut connection: Object<AsyncPgConnection>, userna
|
|||||||
|
|
||||||
/// Try to authenticate a user against information in the database
|
/// Try to authenticate a user against information in the database
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
pub async fn verify_user(mut connection: Object<AsyncPgConnection>, user: &str, pass: &str) -> Result<bool,ServerError> {
|
pub async fn verify_user(mut connection: Object<AsyncPgConnection>, user: &str, pass: &AuthMethod) -> Result<bool,ServerError> {
|
||||||
use schema::users::dsl::*;
|
|
||||||
|
|
||||||
let result = users
|
match pass {
|
||||||
.filter(username.eq(user))
|
AuthMethod::Password(pass) => {
|
||||||
.select(models::User::as_select())
|
use schema::users::dsl::*;
|
||||||
.first(&mut *connection)
|
let result = users
|
||||||
|
.filter(username.eq(user))
|
||||||
|
.select(models::User::as_select())
|
||||||
|
.first(&mut *connection)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
match result {
|
||||||
|
// If we have more than 0 rows then authentication is successful
|
||||||
|
Ok(user) => match verify(pass,&user.password) {
|
||||||
|
Ok(val) => Ok(val),
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Error encountered while generating password hash");
|
||||||
|
warn!("{e}");
|
||||||
|
return Err(ServerError::Generic)
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
// TODO: Probably actually check the error
|
||||||
|
_ => Err(ServerError::AuthenticationFailed)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
AuthMethod::Token(tk) => {
|
||||||
|
use schema::tokens::dsl::*;
|
||||||
|
let result = tokens
|
||||||
|
.filter(username.eq(user))
|
||||||
|
.filter(token.eq(tk))
|
||||||
|
// Check that expiry is in the future
|
||||||
|
.filter(expiry.gt(OffsetDateTime::now_utc().unix_timestamp()))
|
||||||
|
.select(models::Token::as_select())
|
||||||
|
.first(&mut *connection)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
match result {
|
||||||
|
// If we have more than 0 rows then authentication is successful
|
||||||
|
// Only need one token, no need to hash or do anything else
|
||||||
|
Ok(_) => Ok(true),
|
||||||
|
// TODO: Probably actually check the error
|
||||||
|
_ => Err(ServerError::AuthenticationFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new token for the current user with an expiry set 3 months into the future
|
||||||
|
pub async fn gen_token(mut connection: Object<AsyncPgConnection>, user: String)
|
||||||
|
-> Result<String,ServerError>
|
||||||
|
{
|
||||||
|
let mut rng = StdRng::try_from_rng(&mut SysRng).unwrap();
|
||||||
|
let new_token = Alphanumeric.sample_string(&mut rng, 32);
|
||||||
|
let expiry = OffsetDateTime::now_utc() + Duration::days(90);
|
||||||
|
|
||||||
|
|
||||||
|
let insertable = models::NewToken {
|
||||||
|
username: user,
|
||||||
|
token: new_token.clone(),
|
||||||
|
expiry: expiry.unix_timestamp()
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = diesel::insert_into(schema::tokens::table)
|
||||||
|
.values(&insertable)
|
||||||
|
.execute(&mut *connection)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
match result {
|
match result {
|
||||||
// If we have more than 0 rows then authentication is successful
|
// TODO: might need to check this? I don't know what it means
|
||||||
Ok(user) => match verify(pass,&user.password) {
|
Ok(_) => Ok(new_token),
|
||||||
Ok(val) => Ok(val),
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error encountered while generating password hash");
|
|
||||||
warn!("{e}");
|
|
||||||
return Err(ServerError::Generic)
|
|
||||||
}
|
|
||||||
|
|
||||||
},
|
|
||||||
// TODO: Probably actually check the error
|
// TODO: Probably actually check the error
|
||||||
_ => Err(ServerError::AuthenticationFailed)
|
Err(_e) => Err(ServerError::UserAlreadyExists)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -73,3 +73,19 @@ pub struct Messages {
|
|||||||
pub server_timestamp: i64,
|
pub server_timestamp: i64,
|
||||||
pub username: UserT
|
pub username: UserT
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Queryable, Selectable)]
|
||||||
|
#[diesel(table_name = crate::db::schema::tokens)]
|
||||||
|
pub struct Token {
|
||||||
|
pub id: i32,
|
||||||
|
pub username: String,
|
||||||
|
pub token: String,
|
||||||
|
pub expiry: i64,
|
||||||
|
}
|
||||||
|
#[derive(Insertable)]
|
||||||
|
#[diesel(table_name = crate::db::schema::tokens)]
|
||||||
|
pub struct NewToken {
|
||||||
|
pub username: String,
|
||||||
|
pub token: String,
|
||||||
|
pub expiry: i64,
|
||||||
|
}
|
||||||
|
|||||||
+10
-1
@@ -48,6 +48,15 @@ diesel::table! {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
diesel::table! {
|
||||||
|
tokens (id) {
|
||||||
|
id -> Int4,
|
||||||
|
username -> Text,
|
||||||
|
token -> Text,
|
||||||
|
expiry -> Int8,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
diesel::table! {
|
diesel::table! {
|
||||||
users (id) {
|
users (id) {
|
||||||
id -> Int4,
|
id -> Int4,
|
||||||
@@ -56,4 +65,4 @@ diesel::table! {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
diesel::allow_tables_to_appear_in_same_query!(groups, messages, roles, users,);
|
diesel::allow_tables_to_appear_in_same_query!(groups, messages, roles, tokens, users,);
|
||||||
|
|||||||
+26
-9
@@ -15,7 +15,7 @@ use std::process::ExitCode;
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use serde::{Deserialize,Serialize};
|
use serde::{Deserialize,Serialize};
|
||||||
use tracing::{error,instrument,warn,debug,Level};
|
use tracing::{error,instrument,warn,debug,info,Level};
|
||||||
use tokio::sync::{RwLock,broadcast,mpsc};
|
use tokio::sync::{RwLock,broadcast,mpsc};
|
||||||
|
|
||||||
|
|
||||||
@@ -25,6 +25,12 @@ use crate::state::{State,StateError};
|
|||||||
#[derive(Hash,Eq,PartialEq,Clone,Serialize,Deserialize,Debug)]
|
#[derive(Hash,Eq,PartialEq,Clone,Serialize,Deserialize,Debug)]
|
||||||
pub struct Coordinate(Vec<i64>);
|
pub struct Coordinate(Vec<i64>);
|
||||||
|
|
||||||
|
impl Coordinate {
|
||||||
|
pub fn to_roomid(self) -> fedichat::RoomId {
|
||||||
|
fedichat::RoomId{coordinates: self.0}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<fedichat::RoomId> for Coordinate {
|
impl From<fedichat::RoomId> for Coordinate {
|
||||||
fn from(other: fedichat::RoomId) -> Coordinate {
|
fn from(other: fedichat::RoomId) -> Coordinate {
|
||||||
Coordinate(other.coordinates)
|
Coordinate(other.coordinates)
|
||||||
@@ -75,8 +81,12 @@ async fn main() -> ExitCode {
|
|||||||
|
|
||||||
|
|
||||||
// Set up database connection
|
// Set up database connection
|
||||||
let db_config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(config.database.url.clone());
|
let db_string = format!("postgres://{}:{}@{}/{}",config.database.user,config.database.password,config.database.url,config.database.db_name);
|
||||||
let db_pool = match Pool::builder(db_config).build() {
|
let db_config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(db_string);
|
||||||
|
let db_pool = match Pool::builder(db_config)
|
||||||
|
.max_size(config.database.num_connections)
|
||||||
|
.build()
|
||||||
|
{
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Error while creating database connection pool");
|
error!("Error while creating database connection pool");
|
||||||
@@ -101,6 +111,7 @@ async fn main() -> ExitCode {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
let quinn_config = match quinn::ServerConfig::with_single_cert(certs, key){
|
let quinn_config = match quinn::ServerConfig::with_single_cert(certs, key){
|
||||||
Ok(val) => val,
|
Ok(val) => val,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@@ -122,6 +133,8 @@ async fn main() -> ExitCode {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
debug!("Bound to {address} on port {}",config.port);
|
||||||
|
|
||||||
// Load or create new state
|
// Load or create new state
|
||||||
let state = match State::load_from_file(&config.statefile) {
|
let state = match State::load_from_file(&config.statefile) {
|
||||||
Ok(state) => state,
|
Ok(state) => state,
|
||||||
@@ -130,7 +143,10 @@ async fn main() -> ExitCode {
|
|||||||
match fs::File::create(&config.statefile) {
|
match fs::File::create(&config.statefile) {
|
||||||
// If the statefile is writable then create an empty state
|
// If the statefile is writable then create an empty state
|
||||||
// and use that
|
// and use that
|
||||||
Ok(_) => State::new(),
|
Ok(_) => {
|
||||||
|
debug!("Creating fresh state");
|
||||||
|
State::new()
|
||||||
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Could not open or create statefile. Check your config.");
|
error!("Could not open or create statefile. Check your config.");
|
||||||
error!("{:?}",e);
|
error!("{:?}",e);
|
||||||
@@ -153,9 +169,10 @@ async fn main() -> ExitCode {
|
|||||||
let (message_send,message_recv) = broadcast::channel(128);
|
let (message_send,message_recv) = broadcast::channel(128);
|
||||||
let (message_ack_send,message_ack_recv) = mpsc::channel(128);
|
let (message_ack_send,message_ack_recv) = mpsc::channel(128);
|
||||||
|
|
||||||
|
debug!("Setting ctrl-c handler");
|
||||||
match ctrlc_async::set_async_handler(
|
match ctrlc_async::set_async_handler(
|
||||||
async move { match close_send.send(()) {
|
async move { match close_send.send(()) {
|
||||||
Ok(_val) => (),
|
Ok(_val) => debug!("Propogating ctrl-c"),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Shutdown handler is broken. Cannot gracefully exit.");
|
error!("Shutdown handler is broken. Cannot gracefully exit.");
|
||||||
error!("{:?}",e);
|
error!("{:?}",e);
|
||||||
@@ -177,6 +194,7 @@ async fn main() -> ExitCode {
|
|||||||
|
|
||||||
// Create client listener
|
// Create client listener
|
||||||
|
|
||||||
|
debug!("Setting up client handler");
|
||||||
let statehandle_cloned = statehandle.clone();
|
let statehandle_cloned = statehandle.clone();
|
||||||
let config_cloned = config.clone();
|
let config_cloned = config.clone();
|
||||||
join_handles.push(tokio::spawn(async move {
|
join_handles.push(tokio::spawn(async move {
|
||||||
@@ -191,6 +209,8 @@ async fn main() -> ExitCode {
|
|||||||
).await;
|
).await;
|
||||||
} ));
|
} ));
|
||||||
|
|
||||||
|
info!("Successfully started confetti");
|
||||||
|
|
||||||
// Wait for child threads to exit
|
// Wait for child threads to exit
|
||||||
for handle in join_handles {
|
for handle in join_handles {
|
||||||
match handle.await {
|
match handle.await {
|
||||||
@@ -202,6 +222,7 @@ async fn main() -> ExitCode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("Shutting down");
|
||||||
//Save state
|
//Save state
|
||||||
match statehandle.write().await.write_to_file(&config.statefile).await {
|
match statehandle.write().await.write_to_file(&config.statefile).await {
|
||||||
Ok(()) => debug!("Successfully wrote state to {:?}",config.statefile),
|
Ok(()) => debug!("Successfully wrote state to {:?}",config.statefile),
|
||||||
@@ -211,9 +232,5 @@ async fn main() -> ExitCode {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Should never reach this
|
|
||||||
// How should I gracefully shutdown? Can I trap ctrlc
|
|
||||||
ExitCode::SUCCESS
|
ExitCode::SUCCESS
|
||||||
}
|
}
|
||||||
|
|||||||
+16
-2
@@ -9,6 +9,7 @@ use std::sync::Arc;
|
|||||||
use rmp_serde::{decode,encode};
|
use rmp_serde::{decode,encode};
|
||||||
use fedichat::state::{StatePermissionKey,StatePermissionValue};
|
use fedichat::state::{StatePermissionKey,StatePermissionValue};
|
||||||
use fedichat::client::ServerError;
|
use fedichat::client::ServerError;
|
||||||
|
use tracing::{warn,debug};
|
||||||
|
|
||||||
// These ergonomics kind of suck, might get reworked
|
// These ergonomics kind of suck, might get reworked
|
||||||
//
|
//
|
||||||
@@ -178,7 +179,8 @@ impl State {
|
|||||||
|
|
||||||
pub async fn is_joined(&self, coord: Coordinate, user: fedichat::User) -> bool {
|
pub async fn is_joined(&self, coord: Coordinate, user: fedichat::User) -> bool {
|
||||||
match self.get_bakedin(coord.clone(),"members".to_owned()).await {
|
match self.get_bakedin(coord.clone(),"members".to_owned()).await {
|
||||||
Ok(val) => val.0.contains_key(&StatePermissionKey::User(user.clone())),
|
// There surely is a way to do this without taking ownership of user
|
||||||
|
Ok(val) => val.0.contains_key(&StatePermissionKey::User(user)),
|
||||||
// Dunno if I should hide this error
|
// Dunno if I should hide this error
|
||||||
Err(_e) => false
|
Err(_e) => false
|
||||||
}
|
}
|
||||||
@@ -187,6 +189,8 @@ impl State {
|
|||||||
// Get all banned users. If we are not banned, add our name into the joined list
|
// Get all banned users. If we are not banned, add our name into the joined list
|
||||||
pub async fn join(&self, coord: Coordinate, user: fedichat::User) -> Result<(),StateError> {
|
pub async fn join(&self, coord: Coordinate, user: fedichat::User) -> Result<(),StateError> {
|
||||||
let bans = self.get_bakedin(coord.clone(),"banned".to_owned()).await?;
|
let bans = self.get_bakedin(coord.clone(),"banned".to_owned()).await?;
|
||||||
|
debug!("Banned user {:?} trying to join {:?}",user,coord);
|
||||||
|
debug!("BANLIST: {:?}",bans);
|
||||||
if bans.0.contains_key(&StatePermissionKey::User(user.clone())) {
|
if bans.0.contains_key(&StatePermissionKey::User(user.clone())) {
|
||||||
return Err(StateError::Banned(coord));
|
return Err(StateError::Banned(coord));
|
||||||
}
|
}
|
||||||
@@ -198,7 +202,17 @@ impl State {
|
|||||||
path: fedichat::state::StatePath(vec!["room".to_owned(),"members".to_owned()]),
|
path: fedichat::state::StatePath(vec!["room".to_owned(),"members".to_owned()]),
|
||||||
op: Operation::PermAdd(permission)
|
op: Operation::PermAdd(permission)
|
||||||
}).await.map(|_| ())
|
}).await.map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn leave(&self, coord: Coordinate, user: fedichat::User) -> Result<(),StateError> {
|
||||||
|
|
||||||
|
//let permission = fedichat::state::StatePermission(StatePermissionKey::User(user),StatePermissionValue::None);
|
||||||
|
// Remove ourselves from the joined list
|
||||||
|
self.execute_operation(UnlockedOperation {
|
||||||
|
room: coord.clone(),
|
||||||
|
path: fedichat::state::StatePath(vec!["room".to_owned(),"members".to_owned()]),
|
||||||
|
op: Operation::PermDel(StatePermissionKey::User(user))
|
||||||
|
}).await.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a baked in state value. Useful for checking if a user is joined to a room
|
// Get a baked in state value. Useful for checking if a user is joined to a room
|
||||||
@@ -708,7 +722,6 @@ impl StateNode {
|
|||||||
let banned_perms = fedichat::state::PermissionTable(
|
let banned_perms = fedichat::state::PermissionTable(
|
||||||
HashMap::from([
|
HashMap::from([
|
||||||
(StatePermissionKey::Operator,StatePermissionValue::Owner),
|
(StatePermissionKey::Operator,StatePermissionValue::Owner),
|
||||||
(StatePermissionKey::User(operator),StatePermissionValue::None),
|
|
||||||
(StatePermissionKey::Everyone,StatePermissionValue::Read),
|
(StatePermissionKey::Everyone,StatePermissionValue::Read),
|
||||||
])
|
])
|
||||||
);
|
);
|
||||||
@@ -838,6 +851,7 @@ pub enum StateError {
|
|||||||
|
|
||||||
impl From<StateError> for ServerError {
|
impl From<StateError> for ServerError {
|
||||||
fn from(value: StateError) -> ServerError {
|
fn from(value: StateError) -> ServerError {
|
||||||
|
warn!("StateError found: {}",value);
|
||||||
// TODO:
|
// TODO:
|
||||||
// Probably needs a pass, throwing away all this error information feels wrong.
|
// Probably needs a pass, throwing away all this error information feels wrong.
|
||||||
// Maybe this should debug print every time it is called lol
|
// Maybe this should debug print every time it is called lol
|
||||||
|
|||||||
Reference in New Issue
Block a user