rename to iopaint
This commit is contained in:
1
iopaint/file_manager/__init__.py
Normal file
1
iopaint/file_manager/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .file_manager import FileManager
|
||||
222
iopaint/file_manager/file_manager.py
Normal file
222
iopaint/file_manager/file_manager.py
Normal file
@@ -0,0 +1,222 @@
|
||||
import os
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from PIL import Image, ImageOps, PngImagePlugin
|
||||
from fastapi import FastAPI, UploadFile, HTTPException
|
||||
from starlette.responses import FileResponse
|
||||
|
||||
from ..schema import MediasResponse, MediaTab
|
||||
|
||||
LARGE_ENOUGH_NUMBER = 100
|
||||
PngImagePlugin.MAX_TEXT_CHUNK = LARGE_ENOUGH_NUMBER * (1024**2)
|
||||
from .storage_backends import FilesystemStorageBackend
|
||||
from .utils import aspect_to_string, generate_filename, glob_img
|
||||
|
||||
|
||||
class FileManager:
|
||||
def __init__(self, app: FastAPI, input_dir: Path, output_dir: Path):
|
||||
self.app = app
|
||||
self.input_dir: Path = input_dir
|
||||
self.output_dir: Path = output_dir
|
||||
|
||||
self.image_dir_filenames = []
|
||||
self.output_dir_filenames = []
|
||||
if not self.thumbnail_directory.exists():
|
||||
self.thumbnail_directory.mkdir(parents=True)
|
||||
|
||||
# fmt: off
|
||||
self.app.add_api_route("/api/v1/save_image", self.api_save_image, methods=["POST"])
|
||||
self.app.add_api_route("/api/v1/medias", self.api_medias, methods=["GET"], response_model=List[MediasResponse])
|
||||
self.app.add_api_route("/api/v1/media_file", self.api_media_file, methods=["GET"])
|
||||
self.app.add_api_route("/api/v1/media_thumbnail_file", self.api_media_thumbnail_file, methods=["GET"])
|
||||
# fmt: on
|
||||
|
||||
def api_save_image(self, file: UploadFile):
|
||||
filename = file.filename
|
||||
origin_image_bytes = file.file.read()
|
||||
with open(self.output_dir / filename, "wb") as fw:
|
||||
fw.write(origin_image_bytes)
|
||||
|
||||
def api_medias(self, tab: MediaTab) -> List[MediasResponse]:
|
||||
img_dir = self._get_dir(tab)
|
||||
return self._media_names(img_dir)
|
||||
|
||||
def api_media_file(self, tab: MediaTab, filename: str) -> FileResponse:
|
||||
file_path = self._get_file(tab, filename)
|
||||
return FileResponse(file_path, media_type="image/png")
|
||||
|
||||
# tab=${tab}?filename=${filename.name}?width=${width}&height=${height}
|
||||
def api_media_thumbnail_file(
|
||||
self, tab: MediaTab, filename: str, width: int, height: int
|
||||
) -> FileResponse:
|
||||
img_dir = self._get_dir(tab)
|
||||
thumb_filename, (width, height) = self.get_thumbnail(
|
||||
img_dir, filename, width=width, height=height
|
||||
)
|
||||
thumbnail_filepath = self.thumbnail_directory / thumb_filename
|
||||
return FileResponse(
|
||||
thumbnail_filepath,
|
||||
headers={
|
||||
"X-Width": str(width),
|
||||
"X-Height": str(height),
|
||||
},
|
||||
media_type="image/jpeg",
|
||||
)
|
||||
|
||||
def _get_dir(self, tab: MediaTab) -> Path:
|
||||
if tab == "input":
|
||||
return self.input_dir
|
||||
elif tab == "output":
|
||||
return self.output_dir
|
||||
else:
|
||||
raise HTTPException(status_code=422, detail=f"tab not found: {tab}")
|
||||
|
||||
def _get_file(self, tab: MediaTab, filename: str) -> Path:
|
||||
file_path = self._get_dir(tab) / filename
|
||||
if not file_path.exists():
|
||||
raise HTTPException(status_code=422, detail=f"file not found: {file_path}")
|
||||
return file_path
|
||||
|
||||
@property
|
||||
def thumbnail_directory(self) -> Path:
|
||||
return self.output_dir / "thumbnails"
|
||||
|
||||
@staticmethod
|
||||
def _media_names(directory: Path) -> List[MediasResponse]:
|
||||
names = sorted([it.name for it in glob_img(directory)])
|
||||
res = []
|
||||
for name in names:
|
||||
path = os.path.join(directory, name)
|
||||
img = Image.open(path)
|
||||
res.append(
|
||||
MediasResponse(
|
||||
name=name,
|
||||
height=img.height,
|
||||
width=img.width,
|
||||
ctime=os.path.getctime(path),
|
||||
mtime=os.path.getmtime(path),
|
||||
)
|
||||
)
|
||||
return res
|
||||
|
||||
def get_thumbnail(
|
||||
self, directory: Path, original_filename: str, width, height, **options
|
||||
):
|
||||
directory = Path(directory)
|
||||
storage = FilesystemStorageBackend(self.app)
|
||||
crop = options.get("crop", "fit")
|
||||
background = options.get("background")
|
||||
quality = options.get("quality", 90)
|
||||
|
||||
original_path, original_filename = os.path.split(original_filename)
|
||||
original_filepath = os.path.join(directory, original_path, original_filename)
|
||||
image = Image.open(BytesIO(storage.read(original_filepath)))
|
||||
|
||||
# keep ratio resize
|
||||
if not width and not height:
|
||||
width = 256
|
||||
|
||||
if width != 0:
|
||||
height = int(image.height * width / image.width)
|
||||
else:
|
||||
width = int(image.width * height / image.height)
|
||||
|
||||
thumbnail_size = (width, height)
|
||||
|
||||
thumbnail_filename = generate_filename(
|
||||
directory,
|
||||
original_filename,
|
||||
aspect_to_string(thumbnail_size),
|
||||
crop,
|
||||
background,
|
||||
quality,
|
||||
)
|
||||
|
||||
thumbnail_filepath = os.path.join(
|
||||
self.thumbnail_directory, original_path, thumbnail_filename
|
||||
)
|
||||
|
||||
if storage.exists(thumbnail_filepath):
|
||||
return thumbnail_filepath, (width, height)
|
||||
|
||||
try:
|
||||
image.load()
|
||||
except (IOError, OSError):
|
||||
self.app.logger.warning("Thumbnail not load image: %s", original_filepath)
|
||||
return thumbnail_filepath, (width, height)
|
||||
|
||||
# get original image format
|
||||
options["format"] = options.get("format", image.format)
|
||||
|
||||
image = self._create_thumbnail(
|
||||
image, thumbnail_size, crop, background=background
|
||||
)
|
||||
|
||||
raw_data = self.get_raw_data(image, **options)
|
||||
storage.save(thumbnail_filepath, raw_data)
|
||||
|
||||
return thumbnail_filepath, (width, height)
|
||||
|
||||
def get_raw_data(self, image, **options):
|
||||
data = {
|
||||
"format": self._get_format(image, **options),
|
||||
"quality": options.get("quality", 90),
|
||||
}
|
||||
|
||||
_file = BytesIO()
|
||||
image.save(_file, **data)
|
||||
return _file.getvalue()
|
||||
|
||||
@staticmethod
|
||||
def colormode(image, colormode="RGB"):
|
||||
if colormode == "RGB" or colormode == "RGBA":
|
||||
if image.mode == "RGBA":
|
||||
return image
|
||||
if image.mode == "LA":
|
||||
return image.convert("RGBA")
|
||||
return image.convert(colormode)
|
||||
|
||||
if colormode == "GRAY":
|
||||
return image.convert("L")
|
||||
|
||||
return image.convert(colormode)
|
||||
|
||||
@staticmethod
|
||||
def background(original_image, color=0xFF):
|
||||
size = (max(original_image.size),) * 2
|
||||
image = Image.new("L", size, color)
|
||||
image.paste(
|
||||
original_image,
|
||||
tuple(map(lambda x: (x[0] - x[1]) / 2, zip(size, original_image.size))),
|
||||
)
|
||||
|
||||
return image
|
||||
|
||||
def _get_format(self, image, **options):
|
||||
if options.get("format"):
|
||||
return options.get("format")
|
||||
if image.format:
|
||||
return image.format
|
||||
|
||||
return "JPEG"
|
||||
|
||||
def _create_thumbnail(self, image, size, crop="fit", background=None):
|
||||
try:
|
||||
resample = Image.Resampling.LANCZOS
|
||||
except AttributeError: # pylint: disable=raise-missing-from
|
||||
resample = Image.ANTIALIAS
|
||||
|
||||
if crop == "fit":
|
||||
image = ImageOps.fit(image, size, resample)
|
||||
else:
|
||||
image = image.copy()
|
||||
image.thumbnail(size, resample=resample)
|
||||
|
||||
if background is not None:
|
||||
image = self.background(image)
|
||||
|
||||
image = self.colormode(image)
|
||||
|
||||
return image
|
||||
46
iopaint/file_manager/storage_backends.py
Normal file
46
iopaint/file_manager/storage_backends.py
Normal file
@@ -0,0 +1,46 @@
|
||||
# Copy from https://github.com/silentsokolov/flask-thumbnails/blob/master/flask_thumbnails/storage_backends.py
|
||||
import errno
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseStorageBackend(ABC):
|
||||
def __init__(self, app=None):
|
||||
self.app = app
|
||||
|
||||
@abstractmethod
|
||||
def read(self, filepath, mode="rb", **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def exists(self, filepath):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def save(self, filepath, data):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class FilesystemStorageBackend(BaseStorageBackend):
|
||||
def read(self, filepath, mode="rb", **kwargs):
|
||||
with open(filepath, mode) as f: # pylint: disable=unspecified-encoding
|
||||
return f.read()
|
||||
|
||||
def exists(self, filepath):
|
||||
return os.path.exists(filepath)
|
||||
|
||||
def save(self, filepath, data):
|
||||
directory = os.path.dirname(filepath)
|
||||
|
||||
if not os.path.exists(directory):
|
||||
try:
|
||||
os.makedirs(directory)
|
||||
except OSError as e:
|
||||
if e.errno != errno.EEXIST:
|
||||
raise
|
||||
|
||||
if not os.path.isdir(directory):
|
||||
raise IOError("{} is not a directory".format(directory))
|
||||
|
||||
with open(filepath, "wb") as f:
|
||||
f.write(data)
|
||||
65
iopaint/file_manager/utils.py
Normal file
65
iopaint/file_manager/utils.py
Normal file
@@ -0,0 +1,65 @@
|
||||
# Copy from: https://github.com/silentsokolov/flask-thumbnails/blob/master/flask_thumbnails/utils.py
|
||||
import hashlib
|
||||
from pathlib import Path
|
||||
|
||||
from typing import Union
|
||||
|
||||
|
||||
def generate_filename(directory: Path, original_filename, *options) -> str:
|
||||
text = str(directory.absolute()) + original_filename
|
||||
for v in options:
|
||||
text += "%s" % v
|
||||
md5_hash = hashlib.md5()
|
||||
md5_hash.update(text.encode("utf-8"))
|
||||
return md5_hash.hexdigest() + ".jpg"
|
||||
|
||||
|
||||
def parse_size(size):
|
||||
if isinstance(size, int):
|
||||
# If the size parameter is a single number, assume square aspect.
|
||||
return [size, size]
|
||||
|
||||
if isinstance(size, (tuple, list)):
|
||||
if len(size) == 1:
|
||||
# If single value tuple/list is provided, exand it to two elements
|
||||
return size + type(size)(size)
|
||||
return size
|
||||
|
||||
try:
|
||||
thumbnail_size = [int(x) for x in size.lower().split("x", 1)]
|
||||
except ValueError:
|
||||
raise ValueError( # pylint: disable=raise-missing-from
|
||||
"Bad thumbnail size format. Valid format is INTxINT."
|
||||
)
|
||||
|
||||
if len(thumbnail_size) == 1:
|
||||
# If the size parameter only contains a single integer, assume square aspect.
|
||||
thumbnail_size.append(thumbnail_size[0])
|
||||
|
||||
return thumbnail_size
|
||||
|
||||
|
||||
def aspect_to_string(size):
|
||||
if isinstance(size, str):
|
||||
return size
|
||||
|
||||
return "x".join(map(str, size))
|
||||
|
||||
|
||||
IMG_SUFFIX = {".jpg", ".jpeg", ".png", ".JPG", ".JPEG", ".PNG"}
|
||||
|
||||
|
||||
def glob_img(p: Union[Path, str], recursive: bool = False):
|
||||
p = Path(p)
|
||||
if p.is_file() and p.suffix in IMG_SUFFIX:
|
||||
yield p
|
||||
else:
|
||||
if recursive:
|
||||
files = Path(p).glob("**/*.*")
|
||||
else:
|
||||
files = Path(p).glob("*.*")
|
||||
|
||||
for it in files:
|
||||
if it.suffix not in IMG_SUFFIX:
|
||||
continue
|
||||
yield it
|
||||
Reference in New Issue
Block a user