Files
cbwebreader/comic/rest.py
Ajurna dd5817419b Remove ThePdfReader.vue and migrate PDF handling to pymupdf
This commit removes the frontend component ThePdfReader.vue and replaces its functionality with a backend implementation based on pymupdf. Also includes package updates, refactors PDF archive handling, and adjusts security settings to support development on localhost.
2025-05-21 22:30:34 +01:00

653 lines
28 KiB
Python

from http.client import HTTPResponse
from pathlib import Path
from typing import Union, Optional, Dict, Iterable, List
from uuid import UUID
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.auth.password_validation import validate_password
from django.core.exceptions import ValidationError
from django.db.models import Case, When, F, PositiveSmallIntegerField, FileField, QuerySet
from django.http import FileResponse
from drf_yasg.utils import swagger_auto_schema
from rest_framework import viewsets, serializers, mixins, permissions, status, renderers
from rest_framework.decorators import action
from rest_framework.generics import get_object_or_404
from rest_framework.pagination import PageNumberPagination
from rest_framework.request import Request
from rest_framework.response import Response
from comic import models
from comic.processing import generate_directory
from comic.util import generate_breadcrumbs_from_path
class UserSerializer(serializers.ModelSerializer):
classification = serializers.SlugRelatedField(many=False, read_only=True, slug_field='allowed_to_read',
source='usermisc')
class Meta:
model = User
fields = ['id', 'username', 'email', 'is_superuser', 'classification']
class AdminPasswordResetSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField(required=False)
class ClassificationSerializer(serializers.Serializer):
classification = serializers.IntegerField()
def validate_classification(self, data: int) -> int:
if data in models.Directory.Classification:
return data
raise serializers.ValidationError('Invalid Classification sent.')
class UserViewSet(viewsets.ModelViewSet): # pylint: disable=too-many-ancestors
"""
API endpoint that allows users to be viewed or edited.
"""
queryset = User.objects.all().order_by('username')
serializer_class = UserSerializer
permission_classes = [permissions.IsAdminUser]
@action(methods=['patch'], detail=True, serializer_class=AdminPasswordResetSerializer)
def reset_password(self, request: Request, pk: int) -> Response:
"""
This will return a new password set on the user.
"""
target_user = get_object_or_404(User, id=pk)
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
if target_user.username == serializer.data['username']:
password = User.objects.make_random_password()
target_user.set_password(password)
target_user.save()
resp_serializer = self.get_serializer({
'username': target_user.username,
'password': password
})
return Response(resp_serializer.data)
return Response({'errors': ['Invalid request']}, status.HTTP_400_BAD_REQUEST)
@action(methods=['patch'], detail=True, serializer_class=ClassificationSerializer)
def set_classification(self, request: Request, pk: int) -> Response:
"""
API Endpoint that will set the classification on the specified user.
"""
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
misc, _ = models.UserMisc.objects.get_or_create(user_id=pk)
misc.allowed_to_read = serializer.data['classification']
misc.save()
return Response(data={'classification': misc.allowed_to_read})
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
class BrowseFileField(serializers.FileField):
def to_representation(self, value: Optional[FileField]) -> Optional[str]:
if not value:
return None
return Path(settings.MEDIA_URL, value.name).as_posix()
class BrowseSerializer(serializers.Serializer):
selector = serializers.UUIDField()
title = serializers.CharField()
progress = serializers.IntegerField()
total = serializers.IntegerField()
type = serializers.CharField()
thumbnail = BrowseFileField()
classification = serializers.IntegerField()
finished = serializers.BooleanField()
unread = serializers.BooleanField()
class BreadcrumbSerializer(serializers.Serializer):
id = serializers.IntegerField()
selector = serializers.UUIDField()
name = serializers.CharField()
class BrowseViewSet(viewsets.GenericViewSet):
serializer_class = BrowseSerializer
permission_classes = [permissions.IsAuthenticated]
lookup_field = 'selector'
def get_queryset(self) -> None:
return
def list(self, request: Request) -> Response:
serializer = self.get_serializer(generate_directory(request.user), many=True)
return Response(serializer.data)
@swagger_auto_schema(responses={status.HTTP_200_OK: BrowseSerializer(many=True)})
def retrieve(self, request: Request, selector: UUID) -> Response:
directory = models.Directory.objects.get(selector=selector)
serializer = self.get_serializer(generate_directory(request.user, directory), many=True)
return Response(serializer.data)
@swagger_auto_schema(responses={status.HTTP_200_OK: BreadcrumbSerializer(many=True)})
@action(methods=['get'], detail=True, serializer_class=BreadcrumbSerializer)
def breadcrumbs(self, _request: Request, selector: UUID) -> Response:
queryset = []
comic = False
try:
directory = models.Directory.objects.get(selector=selector)
except models.Directory.DoesNotExist:
comic = models.ComicBook.objects.get(selector=selector)
directory = comic.directory
for index, item in enumerate(generate_breadcrumbs_from_path(directory, comic)):
queryset.append({
"id": index,
"selector": item.selector,
"name": item.name,
})
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class GenerateThumbnailSerializer(serializers.Serializer):
selector = serializers.UUIDField()
thumbnail = BrowseFileField()
class GenerateThumbnailViewSet(viewsets.ViewSet):
permission_classes = [permissions.IsAuthenticated]
serializer_class = GenerateThumbnailSerializer
lookup_field = 'selector'
@swagger_auto_schema(responses={status.HTTP_200_OK: GenerateThumbnailSerializer()})
def retrieve(self, _request: Request, selector: UUID) -> Response:
try:
directory = models.Directory.objects.get(selector=selector)
if not directory.thumbnail:
directory.generate_thumbnail()
return Response(
self.serializer_class({
"selector": directory.selector,
"thumbnail": directory.thumbnail
}).data
)
except models.Directory.DoesNotExist:
comic = models.ComicBook.objects.get(selector=selector)
if not comic.thumbnail:
comic.generate_thumbnail()
return Response(
self.serializer_class({
"selector": comic.selector,
"thumbnail": comic.thumbnail
}).data
)
class DirectionSerializer(serializers.Serializer):
route = serializers.ChoiceField(choices=['read', 'browse'])
selector = serializers.UUIDField(required=False)
class ReadSerializer(serializers.Serializer):
selector = serializers.UUIDField()
title = serializers.CharField()
last_read_page = serializers.IntegerField()
prev_comic = DirectionSerializer()
next_comic = DirectionSerializer()
pages = serializers.IntegerField()
class TypeSerializer(serializers.Serializer):
type = serializers.CharField()
class ReadPageSerializer(serializers.Serializer):
page = serializers.IntegerField(source='last_read_page')
class ReadViewSet(viewsets.GenericViewSet):
serializer_class = ReadSerializer
permission_classes = [permissions.IsAuthenticated]
lookup_field = 'selector'
@swagger_auto_schema(responses={status.HTTP_200_OK: ReadSerializer()})
def retrieve(self, request: Request, selector: UUID) -> Response:
comic = get_object_or_404(models.ComicBook, selector=selector)
_, _ = models.UserMisc.objects.get_or_create(user=request.user)
comic_status, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
comic_list = list(models.ComicBook.objects.filter(directory=comic.directory).order_by('file_name'))
comic_index = comic_list.index(comic)
current_page_count = comic.get_page_count()
if comic.page_count != current_page_count:
comic.page_count = current_page_count
comic.save()
try:
prev_comic = {'route': 'browse', 'selector': comic.directory.selector} if comic_index == 0 else \
{'route': 'read', 'selector': comic_list[comic_index - 1].selector}
except AttributeError:
prev_comic = {'route': 'browse'}
try:
next_comic = {'route': 'browse', 'selector': comic.directory.selector} \
if comic_index + 1 == len(comic_list) \
else {'route': 'read', 'selector': comic_list[comic_index + 1].selector}
except AttributeError:
next_comic = {'route': 'browse'}
data = {
"selector": comic.selector,
"title": comic.file_name,
"last_read_page": comic_status.last_read_page,
"prev_comic": prev_comic,
"next_comic": next_comic,
"pages": comic.page_count,
}
serializer = self.serializer_class(data)
return Response(serializer.data)
@swagger_auto_schema(responses={status.HTTP_200_OK: 'PDF Binary Data',
status.HTTP_400_BAD_REQUEST: 'User below classification allowed'})
@action(methods=['get'], detail=True)
def pdf(self, request: Request, selector: UUID) -> Union[FileResponse, Response, HTTPResponse]:
book = models.ComicBook.objects.get(selector=selector)
misc, _ = models.UserMisc.objects.get_or_create(user=request.user)
try:
if book.directory.classification > misc.allowed_to_read:
return Response(status=400, data={'errors': 'Not allowed to read.'})
except AttributeError:
pass
return FileResponse(open(book.get_pdf(), 'rb'), content_type='application/pdf')
@swagger_auto_schema(responses={status.HTTP_200_OK: TypeSerializer()})
@action(methods=['get'], detail=True)
def type(self, _request: Request, selector: UUID) -> Response:
book = models.ComicBook.objects.get(selector=selector)
return Response({'type': book.file_name.split('.')[-1].lower()})
@swagger_auto_schema(responses={status.HTTP_200_OK: ReadPageSerializer()}, request_body=ReadPageSerializer)
@action(methods=['put'], detail=True, serializer_class=ReadPageSerializer)
def set_page(self, request: Request, selector: UUID) -> Response:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
comic_status, _ = models.ComicStatus.objects.annotate(page_count=F('comic__page_count')) \
.get_or_create(comic_id=selector, user=request.user)
comic_status.last_read_page = serializer.data['page']
comic_status.unread = False
if comic_status.page_count - 1 == comic_status.last_read_page:
comic_status.finished = True
else:
comic_status.finished = False
comic_status.save()
return Response(self.get_serializer(comic_status).data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class PassthroughRenderer(renderers.BaseRenderer): # pylint: disable=too-few-public-methods
"""
Return data as-is. View should supply a Response.
"""
media_type = '*/*'
format = ''
def render(self, data: bytes, accepted_media_type: Optional[str] = None, renderer_context: Optional[str] = None) \
-> bytes:
return data
class ImageViewSet(viewsets.ViewSet):
queryset = models.ComicBook.objects.all()
lookup_field = 'page'
renderer_classes = [PassthroughRenderer]
@swagger_auto_schema(responses={status.HTTP_200_OK: "A Binary Image response"})
def retrieve(self, _request: Request, selector: UUID, page: int) -> FileResponse:
book = models.ComicBook.objects.get(selector=selector)
img, content = book.get_image(int(page) - 1)
self.renderer_classes[0].media_type = content
return FileResponse(img, content_type=content)
class StandardResultsSetPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 100
class RecentComicsSerializer(serializers.ModelSerializer):
unread = serializers.BooleanField()
finished = serializers.BooleanField()
last_read_page = serializers.IntegerField()
class Meta:
model = models.ComicBook
fields = ['file_name', 'date_added', 'selector', 'page_count', 'unread', 'finished', 'last_read_page']
class SearchTextQuerySerializer(serializers.Serializer):
search_text = serializers.CharField(required=False)
class RecentComicsView(mixins.ListModelMixin, viewsets.GenericViewSet):
queryset = models.ComicBook.objects.all()
serializer_class = RecentComicsSerializer
pagination_class = StandardResultsSetPagination
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self) -> QuerySet[models.ComicBook]:
user = self.request.user
if "search_text" in self.request.query_params:
query = models.ComicBook.objects.filter(file_name__icontains=self.request.query_params["search_text"])
else:
query = models.ComicBook.objects.all()
query = query.annotate(
unread=Case(When(comicstatus__user=user, then='comicstatus__unread')),
finished=Case(When(comicstatus__user=user, then='comicstatus__finished')),
last_read_page=Case(When(comicstatus__user=user, then='comicstatus__last_read_page')) + 1,
classification=Case(
When(directory__isnull=True, then=models.Directory.Classification.C_18),
default=F('directory__classification'),
output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices)
)
)
query = query.order_by('-date_added')
return query
@swagger_auto_schema(query_serializer=SearchTextQuerySerializer())
def list(self, request: Request, *args, **kwargs) -> Response:
return super().list(request, *args, **kwargs)
class HistorySerializer(serializers.ModelSerializer):
last_read_time = serializers.DateTimeField()
finished = serializers.BooleanField()
last_read_page = serializers.IntegerField()
class Meta:
model = models.ComicBook
fields = ['file_name', 'selector', 'page_count', 'last_read_time', 'finished', 'last_read_page']
class HistoryViewSet(mixins.ListModelMixin, viewsets.GenericViewSet):
queryset = models.ComicBook.objects.all()
serializer_class = HistorySerializer
pagination_class = StandardResultsSetPagination
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self) -> QuerySet[models.ComicBook]:
user = self.request.user
if "search_text" in self.request.query_params:
query = models.ComicBook.objects.filter(file_name__icontains=self.request.query_params["search_text"])
else:
query = models.ComicBook.objects.all()
query = query.annotate(
last_read_page=Case(When(comicstatus__user=user, then='comicstatus__last_read_page')) + 1,
finished=Case(When(comicstatus__user=user, then='comicstatus__finished')),
unread=Case(When(comicstatus__user=user, then='comicstatus__unread')),
last_read_time=Case(When(comicstatus__user=user, then='comicstatus__updated')),
classification=Case(
When(directory__isnull=True, then=models.Directory.Classification.C_18),
default=F('directory__classification'),
output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices)
)
)
query = query.filter(comicstatus__unread=False)
query = query.order_by('-comicstatus__updated')
return query
@swagger_auto_schema(query_serializer=SearchTextQuerySerializer())
def list(self, request: Request, *args, **kwargs) -> Response:
return super().list(request, *args, **kwargs)
class ActionSerializer(serializers.Serializer):
selectors = serializers.ListSerializer(child=serializers.UUIDField())
class ActionViewSet(viewsets.GenericViewSet):
permission_classes = [permissions.IsAuthenticated]
lookup_field = 'action'
serializer_class = ActionSerializer
@action(detail=False, methods=['PUT'])
def mark_read(self, request: Request) -> Response:
serializer = ActionSerializer(data=request.data)
if serializer.is_valid():
comics = self.get_comics(serializer.data['selectors'])
comic_status = models.ComicStatus.objects.filter(comic__selector__in=comics, user=request.user)
comic_status = comic_status.annotate(total_pages=F('comic__page_count'))
status_to_update = []
for c_status in comic_status:
c_status.last_read_page = c_status.total_pages - 1
c_status.unread = False
c_status.finished = True
status_to_update.append(c_status)
comics.remove(str(c_status.comic_id))
for new_status in comics:
comic = models.ComicBook.objects.get(selector=new_status)
obj, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
obj.unread = False
obj.finished = True
obj.last_read_page = comic.total_pages
status_to_update.append(obj)
models.ComicStatus.objects.bulk_update(status_to_update, ['unread', 'finished', 'last_read_page'])
return Response({'status': 'marked_read'})
return Response(serializer.errors,
status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['PUT'])
def mark_unread(self, request: Request) -> Response:
serializer = ActionSerializer(data=request.data)
if serializer.is_valid():
comics = self.get_comics(serializer.data['selectors'])
comic_status = models.ComicStatus.objects.filter(comic__selector__in=comics, user=request.user)
status_to_update = []
for c_status in comic_status:
c_status.last_read_page = 0
c_status.unread = True
c_status.finished = False
status_to_update.append(c_status)
comics.remove(str(c_status.comic_id))
for new_status in comics:
comic = models.ComicBook.objects.get(selector=new_status)
obj, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
obj.unread = True
obj.finished = False
obj.last_read_page = 0
status_to_update.append(obj)
models.ComicStatus.objects.bulk_update(status_to_update, ['unread', 'finished', 'last_read_page'])
return Response({'status': 'marked_unread'})
return Response(serializer.errors,
status=status.HTTP_400_BAD_REQUEST)
def get_comics(self, selectors: Iterable[Union[str, UUID]]) -> List[str]:
data = set()
data = data.union(
set(models.ComicBook.objects.filter(selector__in=selectors).values_list('selector', flat=True)))
directories = models.Directory.objects.filter(selector__in=selectors)
if directories:
for directory in directories:
data = data.union(
set(models.ComicBook.objects.filter(directory=directory).values_list('selector', flat=True)))
data = data.union(self.get_comics(models.Directory.objects.filter(
parent__in=directories).values_list('selector', flat=True)))
return [str(x) for x in data]
class RSSSerializer(serializers.Serializer):
feed_id = serializers.UUIDField()
class AccountSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['username', 'email', 'is_superuser']
class UpdateEmailSerializer(serializers.Serializer):
username = serializers.CharField()
email = serializers.EmailField()
password = serializers.CharField()
class PasswordResetSerializer(serializers.Serializer):
username = serializers.CharField()
old_password = serializers.CharField()
new_password = serializers.CharField(required=False)
new_password_confirm = serializers.CharField(required=False)
def validate_new_password(self, data: str) -> str:
if data == '':
return data
try:
validate_password(data)
except ValidationError as err:
raise serializers.ValidationError(err)
return data
def validate(self, attrs: Dict[str, str]) -> Dict[str, str]:
super().validate(attrs)
if attrs['new_password'] != attrs['new_password_confirm']:
raise serializers.ValidationError('New passwords do not match')
return attrs
class AccountViewSet(viewsets.GenericViewSet):
permission_classes = [permissions.IsAuthenticated]
lookup_field = 'username'
serializer_class = AccountSerializer
@swagger_auto_schema(responses={200: AccountSerializer()})
@action(detail=False, methods=['PATCH'], serializer_class=PasswordResetSerializer)
def reset_password(self, request: Request) -> Response:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
if request.user.username != serializer.data['username']:
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Username does not match account'})
if not request.user.check_password(serializer.data['old_password']):
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Password Incorrect'})
request.user.set_password(serializer.data['new_password'])
request.user.save()
return Response(AccountSerializer(request.user).data)
return Response({"errors": serializer.errors}, status.HTTP_400_BAD_REQUEST)
def list(self, request: Request) -> Response:
serializer = self.get_serializer(request.user)
return Response(serializer.data)
@swagger_auto_schema(responses={200: AccountSerializer()})
@action(detail=False, methods=['PATCH'], serializer_class=UpdateEmailSerializer)
def update_email(self, request: Request) -> Response:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
if request.user.username != serializer.data['username']:
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Username does not match account'})
if not request.user.check_password(serializer.data['password']):
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Password Incorrect'})
request.user.email = serializer.data['email']
request.user.save()
account = AccountSerializer(request.user)
return Response(account.data)
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
@swagger_auto_schema(responses={status.HTTP_200_OK: RSSSerializer()})
@action(methods=['get'], detail=False, serializer_class=RSSSerializer)
def feed_id(self, request: Request) -> Response:
"""
Return the RSS feed id needed to get users RSS Feed.
"""
user_misc = get_object_or_404(models.UserMisc, user=request.user)
serializer = self.get_serializer(user_misc)
return Response(serializer.data)
class DirectorySerializer(serializers.ModelSerializer):
class Meta:
model = models.Directory
fields = ['selector', 'classification']
extra_kwargs = {
'selector': {'validators': []},
}
class DirectoryViewSet(mixins.UpdateModelMixin, viewsets.GenericViewSet):
serializer_class = DirectorySerializer
queryset = models.Directory.objects.all()
permission_classes = [permissions.IsAdminUser]
lookup_field = 'selector'
@swagger_auto_schema(responses={200: DirectorySerializer(many=True)})
def update(self, request: Request, selector: UUID) -> Response: # pylint: disable=arguments-differ
"""
This will set the classification of a directory and all it's children.
"""
main_parent = get_object_or_404(models.Directory, selector=selector)
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
main_parent.classification = serializer.data['classification']
to_update = {main_parent}
to_visit = {main_parent}
while to_visit:
parent = to_visit.pop()
for child in models.Directory.objects.filter(parent=parent):
child.classification = serializer.data['classification']
to_visit.add(child)
to_update.add(child)
models.Directory.objects.bulk_update(to_update, fields=['classification'])
data = models.Directory.objects.filter(directory__in=to_update)
response = self.get_serializer(data, many=True)
return Response(response.data)
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
def partial_update(self, request: Request, *args, **kwargs) -> Response:
"""
This will set the classification of a directory and none of its children.
"""
return super().update(request, *args, **kwargs)
class InitialSetupSerializer(serializers.Serializer):
username = serializers.CharField()
email = serializers.EmailField(required=False, allow_blank=True)
password = serializers.CharField()
class InitialSetupRequired(serializers.Serializer):
required = serializers.BooleanField()
class InitialSetup(viewsets.GenericViewSet):
permission_classes = [permissions.AllowAny]
@swagger_auto_schema(responses={status.HTTP_200_OK: InitialSetupRequired(many=False)})
@action(detail=False, methods=['get'], serializer_class=InitialSetupRequired)
def required(self, _request: Request) -> Response:
serializer = self.get_serializer({'required': User.objects.count() == 0})
return Response(serializer.data)
@action(methods=['post'], detail=False, serializer_class=InitialSetupSerializer)
def create_user(self, request: Request) -> Response:
if User.objects.count() == 0:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
admin = User(
username=serializer.data['username'],
email=serializer.data['email'],
is_superuser=True,
is_active=True,
is_staff=True
)
admin.set_password(serializer.data['password'])
admin.save()
return Response(serializer.data, status.HTTP_201_CREATED)
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
return Response({}, status.HTTP_400_BAD_REQUEST)