Compare commits
4 commits
2ab33b82d0
...
7f6db2de40
Author | SHA1 | Date | |
---|---|---|---|
7f6db2de40 | |||
fbb8f5a7c5 | |||
aef400ff51 | |||
6e5892a209 |
5 changed files with 70 additions and 21 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -491,7 +491,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "datatrash"
|
name = "datatrash"
|
||||||
version = "2.3.5"
|
version = "2.3.6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"actix-files",
|
"actix-files",
|
||||||
"actix-governor",
|
"actix-governor",
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "datatrash"
|
name = "datatrash"
|
||||||
version = "2.3.5"
|
version = "2.3.6"
|
||||||
authors = ["neri"]
|
authors = ["neri"]
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
use futures_util::TryStreamExt;
|
use futures_util::TryStreamExt;
|
||||||
use sqlx::{postgres::PgPool, Row};
|
use sqlx::{postgres::PgPool, Row};
|
||||||
use std::cmp::max;
|
use std::cmp::max;
|
||||||
|
use std::error::Error;
|
||||||
|
use std::fmt::Display;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use time::ext::NumericalStdDuration;
|
use time::ext::NumericalStdDuration;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
@ -8,26 +10,27 @@ use tokio::fs;
|
||||||
use tokio::sync::mpsc::Receiver;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
|
|
||||||
pub(crate) async fn delete_old_files(mut receiver: Receiver<()>, db: PgPool, files_dir: PathBuf) {
|
pub(crate) async fn delete_old_files(
|
||||||
|
mut receiver: Receiver<()>,
|
||||||
|
db: PgPool,
|
||||||
|
files_dir: PathBuf,
|
||||||
|
) -> Result<(), DeletionError> {
|
||||||
loop {
|
loop {
|
||||||
wait_for_file_expiry(&mut receiver, &db).await;
|
wait_for_file_expiry(&mut receiver, &db).await?;
|
||||||
|
|
||||||
let now = OffsetDateTime::now_utc();
|
let now = OffsetDateTime::now_utc();
|
||||||
let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1")
|
let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1")
|
||||||
.bind(now)
|
.bind(now)
|
||||||
.fetch(&db);
|
.fetch(&db);
|
||||||
while let Some(row) = rows.try_next().await.expect("could not load expired files") {
|
while let Some(row) = rows.try_next().await? {
|
||||||
let file_id: String = row.try_get("file_id").expect("we selected this column");
|
let file_id: String = row.try_get("file_id").expect("we selected this column");
|
||||||
delete_content(&file_id, &files_dir)
|
delete_content(&file_id, &files_dir).await?
|
||||||
.await
|
|
||||||
.expect("could not delete file");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sqlx::query("DELETE FROM files WHERE valid_till < $1")
|
sqlx::query("DELETE FROM files WHERE valid_till < $1")
|
||||||
.bind(now)
|
.bind(now)
|
||||||
.execute(&db)
|
.execute(&db)
|
||||||
.await
|
.await?;
|
||||||
.expect("could not delete expired files from database");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -46,19 +49,24 @@ pub(crate) async fn delete_by_id(
|
||||||
|
|
||||||
async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> {
|
async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> {
|
||||||
let path = files_dir.join(file_id);
|
let path = files_dir.join(file_id);
|
||||||
if fs::remove_file(&path).await.is_ok() {
|
if fs::try_exists(&path).await? {
|
||||||
log::info!("delete file {}", file_id);
|
fs::remove_file(&path).await?;
|
||||||
|
log::info!("deleted file {}", file_id);
|
||||||
|
} else {
|
||||||
|
log::warn!("expiring file {} was missing from the filesystem", file_id);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn wait_for_file_expiry(receiver: &mut Receiver<()>, db: &PgPool) {
|
async fn wait_for_file_expiry(
|
||||||
|
receiver: &mut Receiver<()>,
|
||||||
|
db: &PgPool,
|
||||||
|
) -> Result<(), DeletionError> {
|
||||||
let valid_till: (Option<OffsetDateTime>,) =
|
let valid_till: (Option<OffsetDateTime>,) =
|
||||||
sqlx::query_as("SELECT MIN(valid_till) as min from files")
|
sqlx::query_as("SELECT MIN(valid_till) as min from files")
|
||||||
.fetch_one(db)
|
.fetch_one(db)
|
||||||
.await
|
.await?;
|
||||||
.expect("could not fetch expiring files from database");
|
let next_timeout = match valid_till.0 {
|
||||||
let next_timeout: std::time::Duration = match valid_till.0 {
|
|
||||||
Some(valid_till) => (max(
|
Some(valid_till) => (max(
|
||||||
0,
|
0,
|
||||||
valid_till.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp(),
|
valid_till.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp(),
|
||||||
|
@ -67,4 +75,41 @@ async fn wait_for_file_expiry(receiver: &mut Receiver<()>, db: &PgPool) {
|
||||||
None => 1_u64.std_days(),
|
None => 1_u64.std_days(),
|
||||||
};
|
};
|
||||||
let _ = timeout(next_timeout, receiver.recv()).await;
|
let _ = timeout(next_timeout, receiver.recv()).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum DeletionError {
|
||||||
|
Db(sqlx::Error),
|
||||||
|
Fs(std::io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<sqlx::Error> for DeletionError {
|
||||||
|
fn from(value: sqlx::Error) -> Self {
|
||||||
|
DeletionError::Db(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<std::io::Error> for DeletionError {
|
||||||
|
fn from(value: std::io::Error) -> Self {
|
||||||
|
DeletionError::Fs(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for DeletionError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
DeletionError::Db(_) => write!(f, "Failed to fetch expired files from database"),
|
||||||
|
DeletionError::Fs(_) => write!(f, "Failed to delete file from filesystem"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error for DeletionError {
|
||||||
|
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||||
|
match self {
|
||||||
|
DeletionError::Db(err) => Some(err),
|
||||||
|
DeletionError::Fs(err) => Some(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,9 +123,11 @@ async fn main() -> std::io::Result<()> {
|
||||||
.bind(bind_address)?
|
.bind(bind_address)?
|
||||||
.run();
|
.run();
|
||||||
|
|
||||||
// exit when http_server exits OR when deleter panics
|
// exit when http_server exits OR when deleter errors
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
result = http_server => result,
|
result = http_server => result,
|
||||||
_ = deleter => panic!("deleter never returns")
|
result = deleter => {
|
||||||
|
result?.map(|_| unreachable!("deletion runs infinitely")).expect("deletion may not fail")
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ use actix_web::http::header::LOCATION;
|
||||||
use actix_web::{error, web, Error, HttpRequest, HttpResponse};
|
use actix_web::{error, web, Error, HttpRequest, HttpResponse};
|
||||||
use rand::{distributions::Slice, Rng};
|
use rand::{distributions::Slice, Rng};
|
||||||
use sqlx::postgres::PgPool;
|
use sqlx::postgres::PgPool;
|
||||||
use std::path::PathBuf;
|
use std::path::{Path, PathBuf};
|
||||||
use tokio::fs::{self, OpenOptions};
|
use tokio::fs::{self, OpenOptions};
|
||||||
use tokio::sync::mpsc::Sender;
|
use tokio::sync::mpsc::Sender;
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ pub async fn upload(
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
insert_file_metadata(&file_id, file_name, &upload_config, db).await?;
|
insert_file_metadata(&file_id, file_name, &file_path, &upload_config, db).await?;
|
||||||
|
|
||||||
log::info!(
|
log::info!(
|
||||||
"{} create new file {} (valid_till: {}, content_type: {}, delete_on_download: {})",
|
"{} create new file {} (valid_till: {}, content_type: {}, delete_on_download: {})",
|
||||||
|
@ -69,6 +69,7 @@ pub async fn upload(
|
||||||
async fn insert_file_metadata(
|
async fn insert_file_metadata(
|
||||||
file_id: &String,
|
file_id: &String,
|
||||||
file_name: String,
|
file_name: String,
|
||||||
|
file_path: &Path,
|
||||||
upload_config: &UploadConfig,
|
upload_config: &UploadConfig,
|
||||||
db: web::Data<sqlx::Pool<sqlx::Postgres>>,
|
db: web::Data<sqlx::Pool<sqlx::Postgres>>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
@ -85,7 +86,8 @@ async fn insert_file_metadata(
|
||||||
.await;
|
.await;
|
||||||
if let Err(db_err) = db_insert {
|
if let Err(db_err) = db_insert {
|
||||||
log::error!("could not insert into datebase {:?}", db_err);
|
log::error!("could not insert into datebase {:?}", db_err);
|
||||||
if let Err(file_err) = fs::remove_file(file_name).await {
|
|
||||||
|
if let Err(file_err) = fs::remove_file(file_path).await {
|
||||||
log::error!("could not remove file {:?}", file_err);
|
log::error!("could not remove file {:?}", file_err);
|
||||||
}
|
}
|
||||||
return Err(error::ErrorInternalServerError(
|
return Err(error::ErrorInternalServerError(
|
||||||
|
|
Loading…
Reference in a new issue