Compare commits

...

4 commits

5 changed files with 70 additions and 21 deletions

2
Cargo.lock generated
View file

@ -491,7 +491,7 @@ dependencies = [
[[package]] [[package]]
name = "datatrash" name = "datatrash"
version = "2.3.5" version = "2.3.6"
dependencies = [ dependencies = [
"actix-files", "actix-files",
"actix-governor", "actix-governor",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "datatrash" name = "datatrash"
version = "2.3.5" version = "2.3.6"
authors = ["neri"] authors = ["neri"]
edition = "2021" edition = "2021"

View file

@ -1,6 +1,8 @@
use futures_util::TryStreamExt; use futures_util::TryStreamExt;
use sqlx::{postgres::PgPool, Row}; use sqlx::{postgres::PgPool, Row};
use std::cmp::max; use std::cmp::max;
use std::error::Error;
use std::fmt::Display;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use time::ext::NumericalStdDuration; use time::ext::NumericalStdDuration;
use time::OffsetDateTime; use time::OffsetDateTime;
@ -8,26 +10,27 @@ use tokio::fs;
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
use tokio::time::timeout; use tokio::time::timeout;
pub(crate) async fn delete_old_files(mut receiver: Receiver<()>, db: PgPool, files_dir: PathBuf) { pub(crate) async fn delete_old_files(
mut receiver: Receiver<()>,
db: PgPool,
files_dir: PathBuf,
) -> Result<(), DeletionError> {
loop { loop {
wait_for_file_expiry(&mut receiver, &db).await; wait_for_file_expiry(&mut receiver, &db).await?;
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1") let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1")
.bind(now) .bind(now)
.fetch(&db); .fetch(&db);
while let Some(row) = rows.try_next().await.expect("could not load expired files") { while let Some(row) = rows.try_next().await? {
let file_id: String = row.try_get("file_id").expect("we selected this column"); let file_id: String = row.try_get("file_id").expect("we selected this column");
delete_content(&file_id, &files_dir) delete_content(&file_id, &files_dir).await?
.await
.expect("could not delete file");
} }
sqlx::query("DELETE FROM files WHERE valid_till < $1") sqlx::query("DELETE FROM files WHERE valid_till < $1")
.bind(now) .bind(now)
.execute(&db) .execute(&db)
.await .await?;
.expect("could not delete expired files from database");
} }
} }
@ -46,19 +49,24 @@ pub(crate) async fn delete_by_id(
async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> { async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> {
let path = files_dir.join(file_id); let path = files_dir.join(file_id);
if fs::remove_file(&path).await.is_ok() { if fs::try_exists(&path).await? {
log::info!("delete file {}", file_id); fs::remove_file(&path).await?;
log::info!("deleted file {}", file_id);
} else {
log::warn!("expiring file {} was missing from the filesystem", file_id);
} }
Ok(()) Ok(())
} }
async fn wait_for_file_expiry(receiver: &mut Receiver<()>, db: &PgPool) { async fn wait_for_file_expiry(
receiver: &mut Receiver<()>,
db: &PgPool,
) -> Result<(), DeletionError> {
let valid_till: (Option<OffsetDateTime>,) = let valid_till: (Option<OffsetDateTime>,) =
sqlx::query_as("SELECT MIN(valid_till) as min from files") sqlx::query_as("SELECT MIN(valid_till) as min from files")
.fetch_one(db) .fetch_one(db)
.await .await?;
.expect("could not fetch expiring files from database"); let next_timeout = match valid_till.0 {
let next_timeout: std::time::Duration = match valid_till.0 {
Some(valid_till) => (max( Some(valid_till) => (max(
0, 0,
valid_till.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp(), valid_till.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp(),
@ -67,4 +75,41 @@ async fn wait_for_file_expiry(receiver: &mut Receiver<()>, db: &PgPool) {
None => 1_u64.std_days(), None => 1_u64.std_days(),
}; };
let _ = timeout(next_timeout, receiver.recv()).await; let _ = timeout(next_timeout, receiver.recv()).await;
Ok(())
}
#[derive(Debug)]
pub enum DeletionError {
Db(sqlx::Error),
Fs(std::io::Error),
}
impl From<sqlx::Error> for DeletionError {
fn from(value: sqlx::Error) -> Self {
DeletionError::Db(value)
}
}
impl From<std::io::Error> for DeletionError {
fn from(value: std::io::Error) -> Self {
DeletionError::Fs(value)
}
}
impl Display for DeletionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DeletionError::Db(_) => write!(f, "Failed to fetch expired files from database"),
DeletionError::Fs(_) => write!(f, "Failed to delete file from filesystem"),
}
}
}
impl Error for DeletionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
DeletionError::Db(err) => Some(err),
DeletionError::Fs(err) => Some(err),
}
}
} }

View file

@ -123,9 +123,11 @@ async fn main() -> std::io::Result<()> {
.bind(bind_address)? .bind(bind_address)?
.run(); .run();
// exit when http_server exits OR when deleter panics // exit when http_server exits OR when deleter errors
tokio::select! { tokio::select! {
result = http_server => result, result = http_server => result,
_ = deleter => panic!("deleter never returns") result = deleter => {
result?.map(|_| unreachable!("deletion runs infinitely")).expect("deletion may not fail")
},
} }
} }

View file

@ -9,7 +9,7 @@ use actix_web::http::header::LOCATION;
use actix_web::{error, web, Error, HttpRequest, HttpResponse}; use actix_web::{error, web, Error, HttpRequest, HttpResponse};
use rand::{distributions::Slice, Rng}; use rand::{distributions::Slice, Rng};
use sqlx::postgres::PgPool; use sqlx::postgres::PgPool;
use std::path::PathBuf; use std::path::{Path, PathBuf};
use tokio::fs::{self, OpenOptions}; use tokio::fs::{self, OpenOptions};
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
@ -46,7 +46,7 @@ pub async fn upload(
) )
}); });
insert_file_metadata(&file_id, file_name, &upload_config, db).await?; insert_file_metadata(&file_id, file_name, &file_path, &upload_config, db).await?;
log::info!( log::info!(
"{} create new file {} (valid_till: {}, content_type: {}, delete_on_download: {})", "{} create new file {} (valid_till: {}, content_type: {}, delete_on_download: {})",
@ -69,6 +69,7 @@ pub async fn upload(
async fn insert_file_metadata( async fn insert_file_metadata(
file_id: &String, file_id: &String,
file_name: String, file_name: String,
file_path: &Path,
upload_config: &UploadConfig, upload_config: &UploadConfig,
db: web::Data<sqlx::Pool<sqlx::Postgres>>, db: web::Data<sqlx::Pool<sqlx::Postgres>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
@ -85,7 +86,8 @@ async fn insert_file_metadata(
.await; .await;
if let Err(db_err) = db_insert { if let Err(db_err) = db_insert {
log::error!("could not insert into datebase {:?}", db_err); log::error!("could not insert into datebase {:?}", db_err);
if let Err(file_err) = fs::remove_file(file_name).await {
if let Err(file_err) = fs::remove_file(file_path).await {
log::error!("could not remove file {:?}", file_err); log::error!("could not remove file {:?}", file_err);
} }
return Err(error::ErrorInternalServerError( return Err(error::ErrorInternalServerError(