2022-02-26 23:34:57 +00:00
|
|
|
use futures_util::TryStreamExt;
|
2021-03-09 18:59:10 +00:00
|
|
|
use sqlx::{postgres::PgPool, Row};
|
2022-02-27 00:50:29 +00:00
|
|
|
use std::cmp::max;
|
2023-05-24 08:53:07 +00:00
|
|
|
use std::error::Error;
|
|
|
|
use std::fmt::Display;
|
2022-02-26 23:34:57 +00:00
|
|
|
use std::path::{Path, PathBuf};
|
2022-02-27 00:50:29 +00:00
|
|
|
use time::ext::NumericalStdDuration;
|
2022-08-18 21:20:56 +00:00
|
|
|
use time::OffsetDateTime;
|
2022-02-26 23:34:57 +00:00
|
|
|
use tokio::fs;
|
|
|
|
use tokio::sync::mpsc::Receiver;
|
|
|
|
use tokio::time::timeout;
|
2020-07-09 17:27:24 +00:00
|
|
|
|
2023-05-24 08:53:07 +00:00
|
|
|
pub(crate) async fn delete_old_files(
|
|
|
|
mut receiver: Receiver<()>,
|
|
|
|
db: PgPool,
|
|
|
|
files_dir: PathBuf,
|
|
|
|
) -> Result<(), DeletionError> {
|
2020-07-09 17:27:24 +00:00
|
|
|
loop {
|
2023-05-24 08:53:07 +00:00
|
|
|
wait_for_file_expiry(&mut receiver, &db).await?;
|
2020-07-13 13:29:40 +00:00
|
|
|
|
2022-02-27 00:50:29 +00:00
|
|
|
let now = OffsetDateTime::now_utc();
|
2021-03-09 18:59:10 +00:00
|
|
|
let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1")
|
2020-07-13 13:29:40 +00:00
|
|
|
.bind(now)
|
2020-07-12 00:26:11 +00:00
|
|
|
.fetch(&db);
|
2023-05-24 08:53:07 +00:00
|
|
|
while let Some(row) = rows.try_next().await? {
|
2021-03-09 18:59:10 +00:00
|
|
|
let file_id: String = row.try_get("file_id").expect("we selected this column");
|
2023-05-24 08:53:07 +00:00
|
|
|
delete_content(&file_id, &files_dir).await?
|
2020-07-09 17:27:24 +00:00
|
|
|
}
|
2020-07-13 13:29:40 +00:00
|
|
|
|
2020-07-12 00:26:11 +00:00
|
|
|
sqlx::query("DELETE FROM files WHERE valid_till < $1")
|
2020-07-13 13:29:40 +00:00
|
|
|
.bind(now)
|
2020-07-09 17:27:24 +00:00
|
|
|
.execute(&db)
|
2023-05-24 08:53:07 +00:00
|
|
|
.await?;
|
2020-07-09 17:27:24 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-04 01:38:29 +00:00
|
|
|
pub(crate) async fn delete_by_id(
|
|
|
|
db: &PgPool,
|
|
|
|
file_id: &str,
|
|
|
|
files_dir: &Path,
|
|
|
|
) -> Result<(), sqlx::Error> {
|
2021-09-11 00:08:47 +00:00
|
|
|
delete_content(file_id, files_dir).await?;
|
2021-04-04 01:38:29 +00:00
|
|
|
sqlx::query("DELETE FROM files WHERE file_id = $1")
|
|
|
|
.bind(file_id)
|
|
|
|
.execute(db)
|
|
|
|
.await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> {
|
|
|
|
let path = files_dir.join(file_id);
|
2023-05-24 08:14:16 +00:00
|
|
|
if fs::try_exists(&path).await? {
|
|
|
|
fs::remove_file(&path).await?;
|
|
|
|
log::info!("deleted file {}", file_id);
|
|
|
|
} else {
|
|
|
|
log::warn!("expiring file {} was missing from the filesystem", file_id);
|
2021-04-04 01:38:29 +00:00
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2023-05-24 08:53:07 +00:00
|
|
|
async fn wait_for_file_expiry(
|
|
|
|
receiver: &mut Receiver<()>,
|
|
|
|
db: &PgPool,
|
|
|
|
) -> Result<(), DeletionError> {
|
2022-02-27 00:50:29 +00:00
|
|
|
let valid_till: (Option<OffsetDateTime>,) =
|
2021-09-12 16:51:14 +00:00
|
|
|
sqlx::query_as("SELECT MIN(valid_till) as min from files")
|
2021-09-24 20:51:13 +00:00
|
|
|
.fetch_one(db)
|
2023-05-24 08:53:07 +00:00
|
|
|
.await?;
|
2023-05-24 08:14:16 +00:00
|
|
|
let next_timeout = match valid_till.0 {
|
2022-02-27 00:50:29 +00:00
|
|
|
Some(valid_till) => (max(
|
|
|
|
0,
|
|
|
|
valid_till.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp(),
|
|
|
|
) as u64)
|
|
|
|
.std_seconds(),
|
|
|
|
None => 1_u64.std_days(),
|
2020-07-09 17:27:24 +00:00
|
|
|
};
|
2022-02-27 00:50:29 +00:00
|
|
|
let _ = timeout(next_timeout, receiver.recv()).await;
|
2023-05-24 08:53:07 +00:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum DeletionError {
|
|
|
|
Db(sqlx::Error),
|
|
|
|
Fs(std::io::Error),
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<sqlx::Error> for DeletionError {
|
|
|
|
fn from(value: sqlx::Error) -> Self {
|
|
|
|
DeletionError::Db(value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<std::io::Error> for DeletionError {
|
|
|
|
fn from(value: std::io::Error) -> Self {
|
|
|
|
DeletionError::Fs(value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Display for DeletionError {
|
|
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
|
|
match self {
|
|
|
|
DeletionError::Db(_) => write!(f, "Failed to fetch expired files from database"),
|
|
|
|
DeletionError::Fs(_) => write!(f, "Failed to delete file from filesystem"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Error for DeletionError {
|
|
|
|
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
|
|
|
match self {
|
|
|
|
DeletionError::Db(err) => Some(err),
|
|
|
|
DeletionError::Fs(err) => Some(err),
|
|
|
|
}
|
|
|
|
}
|
2020-07-09 17:27:24 +00:00
|
|
|
}
|