This commit is contained in:
Qing
2023-12-30 23:36:44 +08:00
parent 85c3397b97
commit c4abda3942
35 changed files with 969 additions and 854 deletions

View File

@@ -1,18 +1,19 @@
# Copy from https://github.com/silentsokolov/flask-thumbnails/blob/master/flask_thumbnails/thumbnail.py
import os
from datetime import datetime
import cv2
import time
from io import BytesIO
from pathlib import Path
import numpy as np
# from watchdog.events import FileSystemEventHandler
# from watchdog.observers import Observer
from typing import List
from PIL import Image, ImageOps, PngImagePlugin
from loguru import logger
from fastapi import FastAPI, UploadFile, HTTPException
from starlette.responses import FileResponse
from ..schema import (
MediasResponse,
MediasRequest,
MediaFileRequest,
MediaTab,
MediaThumbnailFileRequest,
)
LARGE_ENOUGH_NUMBER = 100
PngImagePlugin.MAX_TEXT_CHUNK = LARGE_ENOUGH_NUMBER * (1024**2)
@@ -21,132 +22,87 @@ from .utils import aspect_to_string, generate_filename, glob_img
class FileManager:
def __init__(self, app=None):
def __init__(self, app: FastAPI, input_dir: Path, output_dir: Path):
self.app = app
self._default_root_directory = "media"
self._default_thumbnail_directory = "media"
self._default_root_url = "/"
self._default_thumbnail_root_url = "/"
self._default_format = "JPEG"
self.output_dir: Path = None
if app is not None:
self.init_app(app)
self.input_dir: Path = input_dir
self.output_dir: Path = output_dir
self.image_dir_filenames = []
self.output_dir_filenames = []
if not self.thumbnail_directory.exists():
self.thumbnail_directory.mkdir(parents=True)
self.image_dir_observer = None
self.output_dir_observer = None
# fmt: off
self.app.add_api_route("/api/v1/save_image", self.api_save_image, methods=["POST"])
self.app.add_api_route("/api/v1/medias", self.api_medias, methods=["POST"], response_model=List[MediasResponse])
self.app.add_api_route("/api/v1/media_file", self.api_media_file, methods=["POST"], response_model=None)
self.app.add_api_route("/api/v1/media_thumbnail_file", self.api_media_thumbnail_file, methods=["POST"], response_model=None)
# fmt: on
self.modified_time = {
"image": datetime.utcnow(),
"output": datetime.utcnow(),
}
def api_save_image(self, file: UploadFile):
filename = file.filename
origin_image_bytes = file.file.read()
with open(self.output_dir / filename, "wb") as fw:
fw.write(origin_image_bytes)
# def start(self):
# self.image_dir_filenames = self._media_names(self.root_directory)
# self.output_dir_filenames = self._media_names(self.output_dir)
#
# logger.info(f"Start watching image directory: {self.root_directory}")
# self.image_dir_observer = Observer()
# self.image_dir_observer.schedule(self, self.root_directory, recursive=False)
# self.image_dir_observer.start()
#
# logger.info(f"Start watching output directory: {self.output_dir}")
# self.output_dir_observer = Observer()
# self.output_dir_observer.schedule(self, self.output_dir, recursive=False)
# self.output_dir_observer.start()
def api_medias(self, req: MediasRequest) -> List[MediasResponse]:
img_dir = self._get_dir(req.tab)
return self._media_names(img_dir)
def on_modified(self, event):
if not os.path.isdir(event.src_path):
return
if event.src_path == str(self.root_directory):
logger.info(f"Image directory {event.src_path} modified")
self.image_dir_filenames = self._media_names(self.root_directory)
self.modified_time["image"] = datetime.utcnow()
elif event.src_path == str(self.output_dir):
logger.info(f"Output directory {event.src_path} modified")
self.output_dir_filenames = self._media_names(self.output_dir)
self.modified_time["output"] = datetime.utcnow()
def api_media_file(self, req: MediaFileRequest) -> FileResponse:
file_path = self._get_file(req.tab, req.filename)
return FileResponse(file_path)
def init_app(self, app):
if self.app is None:
self.app = app
app.thumbnail_instance = self
if not hasattr(app, "extensions"):
app.extensions = {}
if "thumbnail" in app.extensions:
raise RuntimeError("Flask-thumbnail extension already initialized")
app.extensions["thumbnail"] = self
app.config.setdefault("THUMBNAIL_MEDIA_ROOT", self._default_root_directory)
app.config.setdefault(
"THUMBNAIL_MEDIA_THUMBNAIL_ROOT", self._default_thumbnail_directory
def api_media_thumbnail_file(self, req: MediaThumbnailFileRequest) -> FileResponse:
img_dir = self._get_dir(req.tab)
thumb_filename, (width, height) = self.get_thumbnail(
img_dir, req.filename, width=req.width, height=req.height
)
app.config.setdefault("THUMBNAIL_MEDIA_URL", self._default_root_url)
app.config.setdefault(
"THUMBNAIL_MEDIA_THUMBNAIL_URL", self._default_thumbnail_root_url
thumbnail_filepath = self.thumbnail_directory / thumb_filename
return FileResponse(
thumbnail_filepath,
headers={
"X-Width": str(width),
"X-Height": str(height),
},
)
app.config.setdefault("THUMBNAIL_DEFAULT_FORMAT", self._default_format)
@property
def root_directory(self):
path = self.app.config["THUMBNAIL_MEDIA_ROOT"]
if os.path.isabs(path):
return path
def _get_dir(self, tab: MediaTab) -> Path:
if tab == "input":
return self.input_dir
elif tab == "output":
return self.output_dir
else:
return os.path.join(self.app.root_path, path)
raise HTTPException(status_code=422, detail=f"tab not found: {tab}")
def _get_file(self, tab: MediaTab, filename: str) -> Path:
file_path = self._get_dir(tab) / filename
if not file_path.exists():
raise HTTPException(status_code=422, detail=f"file not found: {file_path}")
return file_path
@property
def thumbnail_directory(self):
path = self.app.config["THUMBNAIL_MEDIA_THUMBNAIL_ROOT"]
if os.path.isabs(path):
return path
else:
return os.path.join(self.app.root_path, path)
@property
def root_url(self):
return self.app.config["THUMBNAIL_MEDIA_URL"]
@property
def media_names(self):
# return self.image_dir_filenames
return self._media_names(self.root_directory)
@property
def output_media_names(self):
return self._media_names(self.output_dir)
# return self.output_dir_filenames
def thumbnail_directory(self) -> Path:
return self.output_dir / "thumbnails"
@staticmethod
def _media_names(directory: Path):
def _media_names(directory: Path) -> List[MediasResponse]:
names = sorted([it.name for it in glob_img(directory)])
res = []
for name in names:
path = os.path.join(directory, name)
img = Image.open(path)
res.append(
{
"name": name,
"height": img.height,
"width": img.width,
"ctime": os.path.getctime(path),
"mtime": os.path.getmtime(path),
}
MediasResponse(
name=name,
height=img.height,
width=img.width,
ctime=os.path.getctime(path),
mtime=os.path.getmtime(path),
)
)
return res
@property
def thumbnail_url(self):
return self.app.config["THUMBNAIL_MEDIA_THUMBNAIL_URL"]
def get_thumbnail(
self, directory: Path, original_filename: str, width, height, **options
):
@@ -161,7 +117,10 @@ class FileManager:
image = Image.open(BytesIO(storage.read(original_filepath)))
# keep ratio resize
if width is not None:
if not width and not height:
width = 256
if width != 0:
height = int(image.height * width / image.width)
else:
width = int(image.width * height / image.height)
@@ -180,18 +139,15 @@ class FileManager:
thumbnail_filepath = os.path.join(
self.thumbnail_directory, original_path, thumbnail_filename
)
thumbnail_url = os.path.join(
self.thumbnail_url, original_path, thumbnail_filename
)
if storage.exists(thumbnail_filepath):
return thumbnail_url, (width, height)
return thumbnail_filepath, (width, height)
try:
image.load()
except (IOError, OSError):
self.app.logger.warning("Thumbnail not load image: %s", original_filepath)
return thumbnail_url, (width, height)
return thumbnail_filepath, (width, height)
# get original image format
options["format"] = options.get("format", image.format)
@@ -203,7 +159,7 @@ class FileManager:
raw_data = self.get_raw_data(image, **options)
storage.save(thumbnail_filepath, raw_data)
return thumbnail_url, (width, height)
return thumbnail_filepath, (width, height)
def get_raw_data(self, image, **options):
data = {
@@ -246,7 +202,7 @@ class FileManager:
if image.format:
return image.format
return self.app.config["THUMBNAIL_DEFAULT_FORMAT"]
return "JPEG"
def _create_thumbnail(self, image, size, crop="fit", background=None):
try: