mirror of
https://github.com/ajurna/cbwebreader.git
synced 2025-12-06 22:27:19 +00:00
* resetting a users password forgot to save. * resetting a users password forgot to save.
118 lines
4.8 KiB
Python
118 lines
4.8 KiB
Python
import mimetypes
|
|
import zipfile
|
|
from itertools import chain
|
|
from pathlib import Path
|
|
from typing import NamedTuple, List, Optional, Union
|
|
|
|
import rarfile
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import User
|
|
from django.db.models import Count, Q, F, Case, When, PositiveSmallIntegerField, QuerySet, ExpressionWrapper, \
|
|
IntegerField
|
|
|
|
from comic import models
|
|
from comic.errors import NotCompatibleArchive
|
|
|
|
|
|
def generate_directory(user: User, directory: Optional[models.Directory] = None) -> List[QuerySet]:
|
|
dir_path = Path(settings.COMIC_BOOK_VOLUME, directory.path) if directory else settings.COMIC_BOOK_VOLUME
|
|
files = []
|
|
|
|
dir_db_query = models.Directory.objects.filter(parent=directory)
|
|
clean_directories(dir_db_query, dir_path, directory)
|
|
|
|
file_db_query = models.ComicBook.objects.filter(directory=directory)
|
|
clean_files(file_db_query, user, dir_path, directory)
|
|
|
|
dir_db_query = dir_db_query.annotate(
|
|
total=Count('comicbook', distinct=True),
|
|
progress=Count('comicbook__comicstatus', Q(comicbook__comicstatus__finished=True,
|
|
comicbook__comicstatus__user=user), distinct=True),
|
|
finished=ExpressionWrapper(Q(total=F('progress')), output_field=IntegerField()),
|
|
unread=ExpressionWrapper(Q(total__gt=F('progress')), output_field=IntegerField()),
|
|
)
|
|
files.extend(dir_db_query)
|
|
|
|
# Create Missing Status
|
|
new_status = [models.ComicStatus(comic=file, user=user) for file in
|
|
file_db_query.exclude(comicstatus__in=models.ComicStatus.objects.filter(
|
|
comic__in=file_db_query, user=user))]
|
|
models.ComicStatus.objects.bulk_create(new_status)
|
|
|
|
file_db_query = file_db_query.annotate(
|
|
progress=F('comicstatus__last_read_page') + 1,
|
|
finished=F('comicstatus__finished'),
|
|
unread=F('comicstatus__unread'),
|
|
user=F('comicstatus__user'),
|
|
classification=Case(
|
|
When(directory__isnull=True, then=models.Directory.Classification.C_G),
|
|
default=F('directory__classification'),
|
|
output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices)
|
|
)
|
|
).filter(Q(user__isnull=True) | Q(user=user.id))
|
|
|
|
files.extend(file_db_query)
|
|
|
|
for file in chain(file_db_query, dir_db_query):
|
|
if file.thumbnail and not Path(file.thumbnail.path).exists():
|
|
file.thumbnail.delete()
|
|
file.save()
|
|
files.sort(key=lambda x: x.title)
|
|
files.sort(key=lambda x: x.type, reverse=True)
|
|
return files
|
|
|
|
|
|
def clean_directories(directories: QuerySet, dir_path: Path, directory: Optional[models.Directory] = None) -> None:
|
|
dir_db_set = set(Path(settings.COMIC_BOOK_VOLUME, x.path) for x in directories)
|
|
dir_list = set(x for x in sorted(dir_path.glob('*')) if x.is_dir())
|
|
# Create new directories db instances
|
|
for new_directory in dir_list - dir_db_set:
|
|
models.Directory(name=new_directory.name, parent=directory).save()
|
|
|
|
# Remove stale db instances
|
|
for stale_directory in dir_db_set - dir_list:
|
|
models.Directory.objects.get(name=stale_directory.name, parent=directory).delete()
|
|
|
|
|
|
def clean_files(files: QuerySet, user: User, dir_path: Path, directory: Optional[models.Directory] = None) -> None:
|
|
file_list = set(x for x in sorted(dir_path.glob('*')) if x.is_file())
|
|
files_db_set = set(Path(dir_path, x.file_name) for x in files)
|
|
|
|
# Parse new comics
|
|
books_to_add = []
|
|
for new_comic in file_list - files_db_set:
|
|
if new_comic.suffix.lower() in settings.SUPPORTED_FILES:
|
|
new_book = models.ComicBook(file_name=new_comic.name, directory=directory)
|
|
archive, archive_type = new_book.get_archive()
|
|
try:
|
|
if archive_type == 'archive':
|
|
new_book.page_count = len(get_archive_files(archive))
|
|
elif archive_type == 'pdf':
|
|
new_book.page_count = archive.page_count
|
|
except NotCompatibleArchive:
|
|
pass
|
|
books_to_add.append(new_book)
|
|
models.ComicBook.objects.bulk_create(books_to_add)
|
|
|
|
status_to_add = []
|
|
for book in books_to_add:
|
|
status_to_add.append(models.ComicStatus(user=user, comic=book))
|
|
|
|
models.ComicStatus.objects.bulk_create(status_to_add)
|
|
|
|
# Remove stale comic instances
|
|
for stale_comic in files_db_set - file_list:
|
|
models.ComicBook.objects.get(file_name=stale_comic.name, directory=directory).delete()
|
|
|
|
|
|
class ArchiveFile(NamedTuple):
|
|
file_name: str
|
|
mime_type: str
|
|
|
|
|
|
def get_archive_files(archive: Union[zipfile.ZipFile, rarfile.RarFile]) -> List[ArchiveFile]:
|
|
return [
|
|
ArchiveFile(x, mimetypes.guess_type(x)[0]) for x in sorted(archive.namelist())
|
|
if not x.endswith('/') and mimetypes.guess_type(x)[0]
|
|
]
|