import mimetypes from itertools import chain from pathlib import Path from typing import Union, NamedTuple, List from uuid import UUID from django.conf import settings from django.contrib.auth.models import User from django.contrib.auth.password_validation import validate_password from django.core.exceptions import ValidationError from django.db.models import Count, Case, When, F, PositiveSmallIntegerField, Q from django.http import FileResponse from drf_yasg.utils import swagger_auto_schema from rest_framework import viewsets, serializers, mixins, permissions, status, renderers from rest_framework.decorators import action from rest_framework.generics import get_object_or_404 from rest_framework.pagination import PageNumberPagination from rest_framework.request import Request from rest_framework.response import Response from comic import models from comic.errors import NotCompatibleArchive from comic.util import generate_breadcrumbs_from_path class UserSerializer(serializers.ModelSerializer): classification = serializers.SlugRelatedField(many=False, read_only=True, slug_field='allowed_to_read', source='usermisc') class Meta: model = User fields = ['id', 'username', 'email', 'is_superuser', 'classification'] class AdminPasswordResetSerializer(serializers.Serializer): username = serializers.CharField() password = serializers.CharField(required=False) class ClassificationSerializer(serializers.Serializer): classification = serializers.IntegerField() def validate_classification(self, data): if data in models.Directory.Classification: return data raise serializers.ValidationError('Invalid Classification sent.') class UserViewSet(viewsets.ModelViewSet): """ API endpoint that allows users to be viewed or edited. """ queryset = User.objects.all().order_by('username') serializer_class = UserSerializer permission_classes = [permissions.IsAdminUser] @action(methods=['patch'], detail=True, serializer_class=AdminPasswordResetSerializer) def reset_password(self, request: Request, pk: int) -> Response: """ This will return a new password set on the user. """ target_user = get_object_or_404(User, id=pk) serializer = self.get_serializer(data=request.data) if serializer.is_valid(): if target_user.username == serializer.data['username']: password = User.objects.make_random_password() target_user.set_password(password) resp_serializer = self.get_serializer({ 'username': target_user.username, 'password': password }) return Response(resp_serializer.data) return Response({'errors': ['Invalid request']}, status.HTTP_400_BAD_REQUEST) @action(methods=['patch'], detail=True, serializer_class=ClassificationSerializer) def set_classification(self, request: Request, pk: int) -> Response: """ API Endpoint that will set the classification on the specified user. """ serializer = self.get_serializer(data=request.data) if serializer.is_valid(): misc, _ = models.UserMisc.objects.get_or_create(user_id=pk) misc.allowed_to_read = serializer.data['classification'] misc.save() return Response(data={'classification': misc.allowed_to_read}) else: return Response(serializer.errors, status.HTTP_400_BAD_REQUEST) class BrowseFileField(serializers.FileField): def to_representation(self, value): if not value: return None return Path(settings.MEDIA_URL, value.name).as_posix() class BrowseSerializer(serializers.Serializer): selector = serializers.UUIDField() title = serializers.CharField() progress = serializers.IntegerField() total = serializers.IntegerField() type = serializers.CharField() thumbnail = BrowseFileField() classification = serializers.IntegerField() finished = serializers.BooleanField() unread = serializers.BooleanField() class BreadcrumbSerializer(serializers.Serializer): id = serializers.IntegerField() selector = serializers.UUIDField() name = serializers.CharField() class ArchiveFile(NamedTuple): file_name: str mime_type: str class BrowseViewSet(viewsets.GenericViewSet): serializer_class = BrowseSerializer permission_classes = [permissions.IsAuthenticated] lookup_field = 'selector' def list(self, request): serializer = self.get_serializer(self.generate_directory(request.user), many=True) return Response(serializer.data) def retrieve(self, request, selector: UUID): directory = models.Directory.objects.get(selector=selector) serializer = self.get_serializer(self.generate_directory(request.user, directory), many=True) return Response(serializer.data) @swagger_auto_schema(responses={status.HTTP_200_OK: BreadcrumbSerializer(many=True)}) @action(methods=['get'], detail=True, serializer_class=BreadcrumbSerializer) def breadcrumbs(self, _request: Request, selector: UUID) -> Response: queryset = [] comic = False try: directory = models.Directory.objects.get(selector=selector) except models.Directory.DoesNotExist: comic = models.ComicBook.objects.get(selector=selector) directory = comic.directory for index, item in enumerate(generate_breadcrumbs_from_path(directory, comic)): queryset.append({ "id": index, "selector": item.selector, "name": item.name, }) serializer = self.get_serializer(queryset, many=True) return Response(serializer.data) @staticmethod def clean_directories(directories, dir_path, directory=None): dir_db_set = set([Path(settings.COMIC_BOOK_VOLUME, x.path) for x in directories]) dir_list = set([x for x in sorted(dir_path.glob('*')) if x.is_dir()]) # Create new directories db instances for new_directory in dir_list - dir_db_set: models.Directory(name=new_directory.name, parent=directory).save() # Remove stale db instances for stale_directory in dir_db_set - dir_list: models.Directory.objects.get(name=stale_directory.name, parent=directory).delete() @staticmethod def get_archive_files(archive) -> List[ArchiveFile]: return [ ArchiveFile(x, mimetypes.guess_type(x)[0]) for x in sorted(archive.namelist()) if not x.endswith('/') and mimetypes.guess_type(x)[0] ] @staticmethod def clean_files(files, user, dir_path, directory=None): file_list = set([x for x in sorted(dir_path.glob('*')) if x.is_file()]) files_db_set = set([Path(dir_path, x.file_name) for x in files]) # Parse new comics books_to_add = [] for new_comic in file_list - files_db_set: if new_comic.suffix.lower() in settings.SUPPORTED_FILES: books_to_add.append( models.ComicBook(file_name=new_comic.name, directory=directory) ) models.ComicBook.objects.bulk_create(books_to_add) pages_to_add = [] status_to_add = [] for book in books_to_add: status_to_add.append(models.ComicStatus(user=user, comic=book)) try: archive, archive_type = book.get_archive() if archive_type == 'archive': pages_to_add.extend([ models.ComicPage( Comic=book, index=idx, page_file_name=page.file_name, content_type=page.mime_type ) for idx, page in enumerate(BrowseViewSet.get_archive_files(archive)) ]) elif archive_type == 'pdf': pages_to_add.extend([ models.ComicPage( Comic=book, index=idx, page_file_name=idx + 1, content_type='application/pdf' ) for idx in range(archive.page_count) ]) except NotCompatibleArchive: pass models.ComicStatus.objects.bulk_create(status_to_add) models.ComicPage.objects.bulk_create(pages_to_add) # Remove stale comic instances for stale_comic in files_db_set - file_list: models.ComicBook.objects.get(file_name=stale_comic.name, directory=directory).delete() def generate_directory(self, user: User, directory=None): """ :type user: User :type directory: Directory """ dir_path = Path(settings.COMIC_BOOK_VOLUME, directory.path) if directory else settings.COMIC_BOOK_VOLUME files = [] dir_db_query = models.Directory.objects.filter(parent=directory) self.clean_directories(dir_db_query, dir_path, directory) file_db_query = models.ComicBook.objects.filter(directory=directory) self.clean_files(file_db_query, user, dir_path, directory) dir_db_query = dir_db_query.annotate( total=Count('comicbook', distinct=True), progress=Count('comicbook__comicstatus', Q(comicbook__comicstatus__finished=True, comicbook__comicstatus__user=user), distinct=True), finished=Q(total=F('progress')), unread=Q(total__gt=F('progress')) ) files.extend(dir_db_query) # Create Missing Status new_status = [models.ComicStatus(comic=file, user=user) for file in file_db_query.exclude(comicstatus__in=models.ComicStatus.objects.filter( comic__in=file_db_query, user=user))] models.ComicStatus.objects.bulk_create(new_status) file_db_query = file_db_query.annotate( total=Count('comicpage', distinct=True), progress=F('comicstatus__last_read_page') + 1, finished=F('comicstatus__finished'), unread=F('comicstatus__unread'), user=F('comicstatus__user'), classification=Case( When(directory__isnull=True, then=models.Directory.Classification.C_G), default=F('directory__classification'), output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices) ) ).filter(Q(user__isnull=True) | Q(user=user.id)) files.extend(file_db_query) for file in chain(file_db_query, dir_db_query): if file.thumbnail and not Path(file.thumbnail.path).exists(): file.thumbnail.delete() file.save() files.sort(key=lambda x: x.title) files.sort(key=lambda x: x.type, reverse=True) return files class GenerateThumbnailSerializer(serializers.Serializer): selector = serializers.UUIDField() thumbnail = BrowseFileField() class GenerateThumbnailViewSet(viewsets.ViewSet): permission_classes = [permissions.IsAuthenticated] serializer_class = GenerateThumbnailSerializer lookup_field = 'selector' def retrieve(self, _request, selector: UUID): try: directory = models.Directory.objects.get(selector=selector) if not directory.thumbnail: directory.generate_thumbnail() return Response( self.serializer_class({ "selector": directory.selector, "thumbnail": directory.thumbnail }).data ) except models.Directory.DoesNotExist: comic = models.ComicBook.objects.get(selector=selector) if not comic.thumbnail: comic.generate_thumbnail() return Response( self.serializer_class({ "selector": comic.selector, "thumbnail": comic.thumbnail }).data ) class PageSerializer(serializers.Serializer): index = serializers.IntegerField() page_file_name = serializers.CharField() content_type = serializers.CharField() class DirectionSerializer(serializers.Serializer): route = serializers.ChoiceField(choices=['read', 'browse']) selector = serializers.UUIDField(required=False) class ReadSerializer(serializers.Serializer): selector = serializers.UUIDField() title = serializers.CharField() last_read_page = serializers.IntegerField() prev_comic = DirectionSerializer() next_comic = DirectionSerializer() pages = PageSerializer(many=True) class TypeSerializer(serializers.Serializer): type = serializers.CharField() class ReadPageSerializer(serializers.Serializer): page = serializers.IntegerField(source='last_read_page') class ReadViewSet(viewsets.GenericViewSet): serializer_class = ReadSerializer permission_classes = [permissions.IsAuthenticated] lookup_field = 'selector' @swagger_auto_schema(responses={status.HTTP_200_OK: ReadSerializer()}) def retrieve(self, request: Request, selector: UUID) -> Response: comic = get_object_or_404(models.ComicBook, selector=selector) misc, _ = models.UserMisc.objects.get_or_create(user=request.user) pages = models.ComicPage.objects.filter(Comic=comic) comic_status, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user) comic_list = list(models.ComicBook.objects.filter(directory=comic.directory).order_by('file_name')) comic_index = comic_list.index(comic) try: prev_comic = {'route': 'browse', 'selector': comic.directory.selector} if comic_index == 0 else \ {'route': 'read', 'selector': comic_list[comic_index-1].selector} except AttributeError: prev_comic = {'route': 'browse'} try: next_comic = {'route': 'browse', 'selector': comic.directory.selector} if comic_index+1 == len(comic_list) \ else {'route': 'read', 'selector': comic_list[comic_index+1].selector} except AttributeError: next_comic = {'route': 'browse'} data = { "selector": comic.selector, "title": comic.file_name, "last_read_page": comic_status.last_read_page, "prev_comic": prev_comic, "next_comic": next_comic, "pages": pages, } serializer = self.serializer_class(data) return Response(serializer.data) @swagger_auto_schema(responses={status.HTTP_200_OK: 'PDF Binary Data', status.HTTP_400_BAD_REQUEST: 'User below classification allowed'}) @action(methods=['get'], detail=True) def pdf(self, request: Request, selector: UUID) -> Union[FileResponse, Response]: book = models.ComicBook.objects.get(selector=selector) misc, _ = models.UserMisc.objects.get_or_create(user=request.user) try: if book.directory.classification > misc.allowed_to_read: return Response(status=400, data={'errors': 'Not allowed to read.'}) except AttributeError: pass return FileResponse(open(book.get_pdf(), 'rb'), content_type='application/pdf') @swagger_auto_schema(responses={status.HTTP_200_OK: TypeSerializer()}) @action(methods=['get'], detail=True) def type(self, _request: Request, selector: UUID) -> Response: book = models.ComicBook.objects.get(selector=selector) return Response({'type': book.file_name.split('.')[-1].lower()}) @swagger_auto_schema(responses={status.HTTP_200_OK: ReadPageSerializer()}, request_body=ReadPageSerializer) @action(methods=['put'], detail=True, serializer_class=ReadPageSerializer) def set_page(self, request: Request, selector: UUID) -> Response: serializer = self.get_serializer(data=request.data) if serializer.is_valid(): comic_status, _ = models.ComicStatus.objects.annotate(page_count=Count('comic__comicpage'))\ .get_or_create(comic_id=selector, user=request.user) comic_status.last_read_page = serializer.data['page'] comic_status.unread = False if comic_status.page_count-1 == comic_status.last_read_page: comic_status.finished = True else: comic_status.finished = False comic_status.save() return Response(self.get_serializer(comic_status).data) else: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) class PassthroughRenderer(renderers.BaseRenderer): """ Return data as-is. View should supply a Response. """ media_type = '*/*' format = '' def render(self, data, accepted_media_type=None, renderer_context=None): return data class ImageViewSet(viewsets.ViewSet): queryset = models.ComicPage.objects.all() lookup_field = 'page' renderer_classes = [PassthroughRenderer] def retrieve(self, _request, parent_lookup_selector, page): book = models.ComicBook.objects.get(selector=parent_lookup_selector) img, content = book.get_image(int(page)) self.renderer_classes[0].media_type = content response = FileResponse(img, content_type=content) return response class StandardResultsSetPagination(PageNumberPagination): page_size = 10 page_size_query_param = 'page_size' max_page_size = 100 class RecentComicsSerializer(serializers.ModelSerializer): total_pages = serializers.IntegerField() unread = serializers.BooleanField() finished = serializers.BooleanField() last_read_page = serializers.IntegerField() class Meta: model = models.ComicBook fields = ['file_name', 'date_added', 'selector', 'total_pages', 'unread', 'finished', 'last_read_page'] class RecentComicsView(mixins.ListModelMixin, viewsets.GenericViewSet): queryset = models.ComicBook.objects.all() serializer_class = RecentComicsSerializer pagination_class = StandardResultsSetPagination permission_classes = [permissions.IsAuthenticated] def get_queryset(self): user = self.request.user if "search_text" in self.request.query_params: query = models.ComicBook.objects.filter(file_name__icontains=self.request.query_params["search_text"]) else: query = models.ComicBook.objects.all() query = query.annotate( total_pages=Count('comicpage'), unread=Case(When(comicstatus__user=user, then='comicstatus__unread')), finished=Case(When(comicstatus__user=user, then='comicstatus__finished')), last_read_page=Case(When(comicstatus__user=user, then='comicstatus__last_read_page')) + 1, classification=Case( When(directory__isnull=True, then=models.Directory.Classification.C_18), default=F('directory__classification'), output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices) ) ) query = query.order_by('-date_added') return query class ActionSerializer(serializers.Serializer): selectors = serializers.ListSerializer(child=serializers.UUIDField()) class ActionViewSet(viewsets.GenericViewSet): permission_classes = [permissions.IsAuthenticated] lookup_field = 'action' serializer_class = ActionSerializer @action(detail=False, methods=['PUT']) def mark_read(self, request): serializer = ActionSerializer(data=request.data) if serializer.is_valid(): comics = self.get_comics(serializer.data['selectors']) comic_status = models.ComicStatus.objects.filter(comic__selector__in=comics, user=request.user) comic_status = comic_status.annotate(total_pages=Count('comic__comicpage')) status_to_update = [] for c_status in comic_status: c_status.last_read_page = c_status.total_pages-1 c_status.unread = False c_status.finished = True status_to_update.append(c_status) comics.remove(str(c_status.comic_id)) for new_status in comics: comic = models.ComicBook.objects.annotate( total_pages=Count('comicpage')).get(selector=new_status) obj, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user) obj.unread = False obj.finished = True obj.last_read_page = comic.total_pages status_to_update.append(obj) models.ComicStatus.objects.bulk_update(status_to_update, ['unread', 'finished', 'last_read_page']) return Response({'status': 'marked_read'}) else: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=False, methods=['PUT']) def mark_unread(self, request): serializer = ActionSerializer(data=request.data) if serializer.is_valid(): serializer = ActionSerializer(data=request.data) if serializer.is_valid(): comics = self.get_comics(serializer.data['selectors']) comic_status = models.ComicStatus.objects.filter(comic__selector__in=comics, user=request.user) status_to_update = [] for c_status in comic_status: c_status.last_read_page = 0 c_status.unread = True c_status.finished = False status_to_update.append(c_status) comics.remove(str(c_status.comic_id)) for new_status in comics: comic = models.ComicBook.objects.get(selector=new_status) obj, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user) obj.unread = True obj.finished = False obj.last_read_page = 0 status_to_update.append(obj) models.ComicStatus.objects.bulk_update(status_to_update, ['unread', 'finished', 'last_read_page']) return Response({'status': 'marked_unread'}) else: return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) def get_comics(self, selectors): data = set() data = data.union( set(models.ComicBook.objects.filter(selector__in=selectors).values_list('selector', flat=True))) directories = models.Directory.objects.filter(selector__in=selectors) if directories: for directory in directories: data = data.union( set(models.ComicBook.objects.filter(directory=directory).values_list('selector', flat=True))) data = data.union(self.get_comics(models.Directory.objects.filter( parent__in=directories).values_list('selector', flat=True))) return [str(x) for x in data] class RSSSerializer(serializers.Serializer): feed_id = serializers.UUIDField() class AccountSerializer(serializers.ModelSerializer): class Meta: model = User fields = ['username', 'email', 'is_superuser'] class UpdateEmailSerializer(serializers.Serializer): username = serializers.CharField() email = serializers.EmailField() password = serializers.CharField() class PasswordResetSerializer(serializers.Serializer): username = serializers.CharField() old_password = serializers.CharField() new_password = serializers.CharField(required=False) new_password_confirm = serializers.CharField(required=False) def validate_new_password(self, data): if data == '': return data try: validate_password(data) except ValidationError as e: raise serializers.ValidationError(e) return data def validate(self, attrs): super().validate(attrs) if attrs['new_password'] != attrs['new_password_confirm']: raise serializers.ValidationError('New passwords do not match') return attrs class AccountViewSet(viewsets.GenericViewSet): permission_classes = [permissions.IsAuthenticated] lookup_field = 'username' serializer_class = AccountSerializer @swagger_auto_schema(responses={200: AccountSerializer()}) @action(detail=False, methods=['PATCH'], serializer_class=PasswordResetSerializer) def reset_password(self, request: Request) -> Response: serializer = self.get_serializer(data=request.data) if serializer.is_valid(): if request.user.username != serializer.data['username']: return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Username does not match account'}) if not request.user.check_password(serializer.data['old_password']): return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Password Incorrect'}) request.user.set_password(serializer.data['new_password']) request.user.save() return Response(AccountSerializer(request.user).data) else: return Response({"errors": serializer.errors}, status.HTTP_400_BAD_REQUEST) def list(self, request): serializer = self.get_serializer(request.user) return Response(serializer.data) @swagger_auto_schema(responses={200: AccountSerializer()}) @action(detail=False, methods=['PATCH'], serializer_class=UpdateEmailSerializer) def update_email(self, request: Request) -> Response: serializer = self.get_serializer(data=request.data) if serializer.is_valid(): if request.user.username != serializer.data['username']: return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Username does not match account'}) if not request.user.check_password(serializer.data['password']): return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Password Incorrect'}) request.user.email = serializer.data['email'] request.user.save() account = AccountSerializer(request.user) return Response(account.data) else: return Response(serializer.errors, status.HTTP_400_BAD_REQUEST) @swagger_auto_schema(responses={status.HTTP_200_OK: RSSSerializer()}) @action(methods=['get'], detail=False, serializer_class=RSSSerializer) def feed_id(self, request: Request): """ Return the RSS feed id needed to get users RSS Feed. """ user_misc = get_object_or_404(models.UserMisc, user=request.user) serializer = self.get_serializer(user_misc) return Response(serializer.data) class DirectorySerializer(serializers.ModelSerializer): class Meta: model = models.Directory fields = ['selector', 'classification'] extra_kwargs = { 'selector': {'validators': []}, } class DirectoryViewSet(mixins.UpdateModelMixin, viewsets.GenericViewSet): serializer_class = DirectorySerializer queryset = models.Directory.objects.all() permission_classes = [permissions.IsAdminUser] lookup_field = 'selector' @swagger_auto_schema(responses={200: DirectorySerializer(many=True)}) def update(self, request: Request, selector: UUID) -> Response: """ This will set the classification of a directory and all it's children. """ main_parent = get_object_or_404(models.Directory, selector=selector) serializer = self.get_serializer(data=request.data) if serializer.is_valid(): main_parent.classification = serializer.data['classification'] to_update = {main_parent} to_visit = {main_parent} while to_visit: parent = to_visit.pop() for child in models.Directory.objects.filter(parent=parent): child.classification = serializer.data['classification'] to_visit.add(child) to_update.add(child) models.Directory.objects.bulk_update(to_update, fields=['classification']) data = models.Directory.objects.filter(directory__in=to_update) response = self.get_serializer(data, many=True) return Response(response.data) else: return Response(serializer.errors, status.HTTP_400_BAD_REQUEST) def partial_update(self, request, *args, **kwargs): """ This will set the classification of a directory and none of its children. """ return super().update(request, *args, **kwargs) class InitialSetupSerializer(serializers.Serializer): username = serializers.CharField() email = serializers.EmailField(required=False, allow_blank=True) password = serializers.CharField() class InitialSetupRequired(serializers.Serializer): required = serializers.BooleanField() class InitialSetup(viewsets.GenericViewSet): permission_classes = [permissions.AllowAny] @swagger_auto_schema(responses={status.HTTP_200_OK: InitialSetupRequired(many=False)}) @action(detail=False, methods=['get'], serializer_class=InitialSetupRequired) def required(self, _request: Request) -> Response: serializer = self.get_serializer({'required': User.objects.count() == 0}) return Response(serializer.data) @action(methods=['post'], detail=False, serializer_class=InitialSetupSerializer) def create_user(self, request: Request) -> Response: if User.objects.count() == 0: serializer = self.get_serializer(data=request.data) if serializer.is_valid(): admin = User( username=serializer.data['username'], email=serializer.data['email'], is_superuser=True, is_active=True, is_staff=True ) admin.set_password(serializer.data['password']) admin.save() return Response(serializer.data, status.HTTP_201_CREATED) else: return Response(serializer.errors, status.HTTP_400_BAD_REQUEST) else: return Response({}, status.HTTP_400_BAD_REQUEST)