mirror of
https://github.com/ajurna/cbwebreader.git
synced 2025-12-06 06:17:17 +00:00
This commit removes the frontend component ThePdfReader.vue and replaces its functionality with a backend implementation based on pymupdf. Also includes package updates, refactors PDF archive handling, and adjusts security settings to support development on localhost.
653 lines
28 KiB
Python
653 lines
28 KiB
Python
from http.client import HTTPResponse
|
|
from pathlib import Path
|
|
from typing import Union, Optional, Dict, Iterable, List
|
|
from uuid import UUID
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import User
|
|
from django.contrib.auth.password_validation import validate_password
|
|
from django.core.exceptions import ValidationError
|
|
from django.db.models import Case, When, F, PositiveSmallIntegerField, FileField, QuerySet
|
|
from django.http import FileResponse
|
|
from drf_yasg.utils import swagger_auto_schema
|
|
from rest_framework import viewsets, serializers, mixins, permissions, status, renderers
|
|
from rest_framework.decorators import action
|
|
from rest_framework.generics import get_object_or_404
|
|
from rest_framework.pagination import PageNumberPagination
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
|
|
from comic import models
|
|
from comic.processing import generate_directory
|
|
from comic.util import generate_breadcrumbs_from_path
|
|
|
|
|
|
class UserSerializer(serializers.ModelSerializer):
|
|
classification = serializers.SlugRelatedField(many=False, read_only=True, slug_field='allowed_to_read',
|
|
source='usermisc')
|
|
|
|
class Meta:
|
|
model = User
|
|
fields = ['id', 'username', 'email', 'is_superuser', 'classification']
|
|
|
|
|
|
class AdminPasswordResetSerializer(serializers.Serializer):
|
|
username = serializers.CharField()
|
|
password = serializers.CharField(required=False)
|
|
|
|
|
|
class ClassificationSerializer(serializers.Serializer):
|
|
classification = serializers.IntegerField()
|
|
|
|
def validate_classification(self, data: int) -> int:
|
|
if data in models.Directory.Classification:
|
|
return data
|
|
raise serializers.ValidationError('Invalid Classification sent.')
|
|
|
|
|
|
class UserViewSet(viewsets.ModelViewSet): # pylint: disable=too-many-ancestors
|
|
"""
|
|
API endpoint that allows users to be viewed or edited.
|
|
"""
|
|
queryset = User.objects.all().order_by('username')
|
|
serializer_class = UserSerializer
|
|
permission_classes = [permissions.IsAdminUser]
|
|
|
|
@action(methods=['patch'], detail=True, serializer_class=AdminPasswordResetSerializer)
|
|
def reset_password(self, request: Request, pk: int) -> Response:
|
|
"""
|
|
This will return a new password set on the user.
|
|
"""
|
|
target_user = get_object_or_404(User, id=pk)
|
|
serializer = self.get_serializer(data=request.data)
|
|
if serializer.is_valid():
|
|
if target_user.username == serializer.data['username']:
|
|
password = User.objects.make_random_password()
|
|
target_user.set_password(password)
|
|
target_user.save()
|
|
resp_serializer = self.get_serializer({
|
|
'username': target_user.username,
|
|
'password': password
|
|
})
|
|
return Response(resp_serializer.data)
|
|
|
|
return Response({'errors': ['Invalid request']}, status.HTTP_400_BAD_REQUEST)
|
|
|
|
@action(methods=['patch'], detail=True, serializer_class=ClassificationSerializer)
|
|
def set_classification(self, request: Request, pk: int) -> Response:
|
|
"""
|
|
API Endpoint that will set the classification on the specified user.
|
|
"""
|
|
serializer = self.get_serializer(data=request.data)
|
|
if serializer.is_valid():
|
|
misc, _ = models.UserMisc.objects.get_or_create(user_id=pk)
|
|
misc.allowed_to_read = serializer.data['classification']
|
|
misc.save()
|
|
return Response(data={'classification': misc.allowed_to_read})
|
|
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class BrowseFileField(serializers.FileField):
|
|
def to_representation(self, value: Optional[FileField]) -> Optional[str]:
|
|
if not value:
|
|
return None
|
|
return Path(settings.MEDIA_URL, value.name).as_posix()
|
|
|
|
|
|
class BrowseSerializer(serializers.Serializer):
|
|
selector = serializers.UUIDField()
|
|
title = serializers.CharField()
|
|
progress = serializers.IntegerField()
|
|
total = serializers.IntegerField()
|
|
type = serializers.CharField()
|
|
thumbnail = BrowseFileField()
|
|
classification = serializers.IntegerField()
|
|
finished = serializers.BooleanField()
|
|
unread = serializers.BooleanField()
|
|
|
|
|
|
class BreadcrumbSerializer(serializers.Serializer):
|
|
id = serializers.IntegerField()
|
|
selector = serializers.UUIDField()
|
|
name = serializers.CharField()
|
|
|
|
|
|
class BrowseViewSet(viewsets.GenericViewSet):
|
|
serializer_class = BrowseSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
lookup_field = 'selector'
|
|
|
|
def get_queryset(self) -> None:
|
|
return
|
|
|
|
def list(self, request: Request) -> Response:
|
|
serializer = self.get_serializer(generate_directory(request.user), many=True)
|
|
return Response(serializer.data)
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: BrowseSerializer(many=True)})
|
|
def retrieve(self, request: Request, selector: UUID) -> Response:
|
|
directory = models.Directory.objects.get(selector=selector)
|
|
serializer = self.get_serializer(generate_directory(request.user, directory), many=True)
|
|
return Response(serializer.data)
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: BreadcrumbSerializer(many=True)})
|
|
@action(methods=['get'], detail=True, serializer_class=BreadcrumbSerializer)
|
|
def breadcrumbs(self, _request: Request, selector: UUID) -> Response:
|
|
queryset = []
|
|
comic = False
|
|
try:
|
|
directory = models.Directory.objects.get(selector=selector)
|
|
except models.Directory.DoesNotExist:
|
|
comic = models.ComicBook.objects.get(selector=selector)
|
|
directory = comic.directory
|
|
|
|
for index, item in enumerate(generate_breadcrumbs_from_path(directory, comic)):
|
|
queryset.append({
|
|
"id": index,
|
|
"selector": item.selector,
|
|
"name": item.name,
|
|
})
|
|
serializer = self.get_serializer(queryset, many=True)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class GenerateThumbnailSerializer(serializers.Serializer):
|
|
selector = serializers.UUIDField()
|
|
thumbnail = BrowseFileField()
|
|
|
|
|
|
class GenerateThumbnailViewSet(viewsets.ViewSet):
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
serializer_class = GenerateThumbnailSerializer
|
|
lookup_field = 'selector'
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: GenerateThumbnailSerializer()})
|
|
def retrieve(self, _request: Request, selector: UUID) -> Response:
|
|
try:
|
|
directory = models.Directory.objects.get(selector=selector)
|
|
if not directory.thumbnail:
|
|
directory.generate_thumbnail()
|
|
return Response(
|
|
self.serializer_class({
|
|
"selector": directory.selector,
|
|
"thumbnail": directory.thumbnail
|
|
}).data
|
|
)
|
|
except models.Directory.DoesNotExist:
|
|
comic = models.ComicBook.objects.get(selector=selector)
|
|
if not comic.thumbnail:
|
|
comic.generate_thumbnail()
|
|
return Response(
|
|
self.serializer_class({
|
|
"selector": comic.selector,
|
|
"thumbnail": comic.thumbnail
|
|
}).data
|
|
)
|
|
|
|
|
|
class DirectionSerializer(serializers.Serializer):
|
|
route = serializers.ChoiceField(choices=['read', 'browse'])
|
|
selector = serializers.UUIDField(required=False)
|
|
|
|
|
|
class ReadSerializer(serializers.Serializer):
|
|
selector = serializers.UUIDField()
|
|
title = serializers.CharField()
|
|
last_read_page = serializers.IntegerField()
|
|
prev_comic = DirectionSerializer()
|
|
next_comic = DirectionSerializer()
|
|
pages = serializers.IntegerField()
|
|
|
|
|
|
class TypeSerializer(serializers.Serializer):
|
|
type = serializers.CharField()
|
|
|
|
|
|
class ReadPageSerializer(serializers.Serializer):
|
|
page = serializers.IntegerField(source='last_read_page')
|
|
|
|
|
|
class ReadViewSet(viewsets.GenericViewSet):
|
|
serializer_class = ReadSerializer
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
lookup_field = 'selector'
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: ReadSerializer()})
|
|
def retrieve(self, request: Request, selector: UUID) -> Response:
|
|
comic = get_object_or_404(models.ComicBook, selector=selector)
|
|
_, _ = models.UserMisc.objects.get_or_create(user=request.user)
|
|
comic_status, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
|
|
comic_list = list(models.ComicBook.objects.filter(directory=comic.directory).order_by('file_name'))
|
|
comic_index = comic_list.index(comic)
|
|
current_page_count = comic.get_page_count()
|
|
if comic.page_count != current_page_count:
|
|
comic.page_count = current_page_count
|
|
comic.save()
|
|
try:
|
|
prev_comic = {'route': 'browse', 'selector': comic.directory.selector} if comic_index == 0 else \
|
|
{'route': 'read', 'selector': comic_list[comic_index - 1].selector}
|
|
except AttributeError:
|
|
prev_comic = {'route': 'browse'}
|
|
try:
|
|
next_comic = {'route': 'browse', 'selector': comic.directory.selector} \
|
|
if comic_index + 1 == len(comic_list) \
|
|
else {'route': 'read', 'selector': comic_list[comic_index + 1].selector}
|
|
except AttributeError:
|
|
next_comic = {'route': 'browse'}
|
|
data = {
|
|
"selector": comic.selector,
|
|
"title": comic.file_name,
|
|
"last_read_page": comic_status.last_read_page,
|
|
"prev_comic": prev_comic,
|
|
"next_comic": next_comic,
|
|
"pages": comic.page_count,
|
|
}
|
|
serializer = self.serializer_class(data)
|
|
return Response(serializer.data)
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: 'PDF Binary Data',
|
|
status.HTTP_400_BAD_REQUEST: 'User below classification allowed'})
|
|
@action(methods=['get'], detail=True)
|
|
def pdf(self, request: Request, selector: UUID) -> Union[FileResponse, Response, HTTPResponse]:
|
|
book = models.ComicBook.objects.get(selector=selector)
|
|
misc, _ = models.UserMisc.objects.get_or_create(user=request.user)
|
|
try:
|
|
if book.directory.classification > misc.allowed_to_read:
|
|
return Response(status=400, data={'errors': 'Not allowed to read.'})
|
|
except AttributeError:
|
|
pass
|
|
return FileResponse(open(book.get_pdf(), 'rb'), content_type='application/pdf')
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: TypeSerializer()})
|
|
@action(methods=['get'], detail=True)
|
|
def type(self, _request: Request, selector: UUID) -> Response:
|
|
book = models.ComicBook.objects.get(selector=selector)
|
|
return Response({'type': book.file_name.split('.')[-1].lower()})
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: ReadPageSerializer()}, request_body=ReadPageSerializer)
|
|
@action(methods=['put'], detail=True, serializer_class=ReadPageSerializer)
|
|
def set_page(self, request: Request, selector: UUID) -> Response:
|
|
|
|
serializer = self.get_serializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
comic_status, _ = models.ComicStatus.objects.annotate(page_count=F('comic__page_count')) \
|
|
.get_or_create(comic_id=selector, user=request.user)
|
|
comic_status.last_read_page = serializer.data['page']
|
|
comic_status.unread = False
|
|
if comic_status.page_count - 1 == comic_status.last_read_page:
|
|
comic_status.finished = True
|
|
else:
|
|
comic_status.finished = False
|
|
|
|
comic_status.save()
|
|
return Response(self.get_serializer(comic_status).data)
|
|
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
|
|
class PassthroughRenderer(renderers.BaseRenderer): # pylint: disable=too-few-public-methods
|
|
"""
|
|
Return data as-is. View should supply a Response.
|
|
"""
|
|
media_type = '*/*'
|
|
format = ''
|
|
|
|
def render(self, data: bytes, accepted_media_type: Optional[str] = None, renderer_context: Optional[str] = None) \
|
|
-> bytes:
|
|
return data
|
|
|
|
|
|
class ImageViewSet(viewsets.ViewSet):
|
|
queryset = models.ComicBook.objects.all()
|
|
lookup_field = 'page'
|
|
renderer_classes = [PassthroughRenderer]
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: "A Binary Image response"})
|
|
def retrieve(self, _request: Request, selector: UUID, page: int) -> FileResponse:
|
|
book = models.ComicBook.objects.get(selector=selector)
|
|
img, content = book.get_image(int(page) - 1)
|
|
self.renderer_classes[0].media_type = content
|
|
return FileResponse(img, content_type=content)
|
|
|
|
|
|
class StandardResultsSetPagination(PageNumberPagination):
|
|
page_size = 10
|
|
page_size_query_param = 'page_size'
|
|
max_page_size = 100
|
|
|
|
|
|
class RecentComicsSerializer(serializers.ModelSerializer):
|
|
unread = serializers.BooleanField()
|
|
finished = serializers.BooleanField()
|
|
last_read_page = serializers.IntegerField()
|
|
|
|
class Meta:
|
|
model = models.ComicBook
|
|
fields = ['file_name', 'date_added', 'selector', 'page_count', 'unread', 'finished', 'last_read_page']
|
|
|
|
|
|
class SearchTextQuerySerializer(serializers.Serializer):
|
|
search_text = serializers.CharField(required=False)
|
|
|
|
|
|
class RecentComicsView(mixins.ListModelMixin, viewsets.GenericViewSet):
|
|
queryset = models.ComicBook.objects.all()
|
|
serializer_class = RecentComicsSerializer
|
|
pagination_class = StandardResultsSetPagination
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self) -> QuerySet[models.ComicBook]:
|
|
user = self.request.user
|
|
if "search_text" in self.request.query_params:
|
|
query = models.ComicBook.objects.filter(file_name__icontains=self.request.query_params["search_text"])
|
|
else:
|
|
query = models.ComicBook.objects.all()
|
|
|
|
query = query.annotate(
|
|
unread=Case(When(comicstatus__user=user, then='comicstatus__unread')),
|
|
finished=Case(When(comicstatus__user=user, then='comicstatus__finished')),
|
|
last_read_page=Case(When(comicstatus__user=user, then='comicstatus__last_read_page')) + 1,
|
|
classification=Case(
|
|
When(directory__isnull=True, then=models.Directory.Classification.C_18),
|
|
default=F('directory__classification'),
|
|
output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices)
|
|
)
|
|
)
|
|
|
|
query = query.order_by('-date_added')
|
|
return query
|
|
|
|
@swagger_auto_schema(query_serializer=SearchTextQuerySerializer())
|
|
def list(self, request: Request, *args, **kwargs) -> Response:
|
|
return super().list(request, *args, **kwargs)
|
|
|
|
|
|
class HistorySerializer(serializers.ModelSerializer):
|
|
last_read_time = serializers.DateTimeField()
|
|
finished = serializers.BooleanField()
|
|
last_read_page = serializers.IntegerField()
|
|
|
|
class Meta:
|
|
model = models.ComicBook
|
|
fields = ['file_name', 'selector', 'page_count', 'last_read_time', 'finished', 'last_read_page']
|
|
|
|
|
|
class HistoryViewSet(mixins.ListModelMixin, viewsets.GenericViewSet):
|
|
queryset = models.ComicBook.objects.all()
|
|
serializer_class = HistorySerializer
|
|
pagination_class = StandardResultsSetPagination
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
|
|
def get_queryset(self) -> QuerySet[models.ComicBook]:
|
|
user = self.request.user
|
|
if "search_text" in self.request.query_params:
|
|
query = models.ComicBook.objects.filter(file_name__icontains=self.request.query_params["search_text"])
|
|
else:
|
|
query = models.ComicBook.objects.all()
|
|
|
|
query = query.annotate(
|
|
last_read_page=Case(When(comicstatus__user=user, then='comicstatus__last_read_page')) + 1,
|
|
finished=Case(When(comicstatus__user=user, then='comicstatus__finished')),
|
|
unread=Case(When(comicstatus__user=user, then='comicstatus__unread')),
|
|
last_read_time=Case(When(comicstatus__user=user, then='comicstatus__updated')),
|
|
classification=Case(
|
|
When(directory__isnull=True, then=models.Directory.Classification.C_18),
|
|
default=F('directory__classification'),
|
|
output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices)
|
|
)
|
|
)
|
|
query = query.filter(comicstatus__unread=False)
|
|
query = query.order_by('-comicstatus__updated')
|
|
return query
|
|
|
|
@swagger_auto_schema(query_serializer=SearchTextQuerySerializer())
|
|
def list(self, request: Request, *args, **kwargs) -> Response:
|
|
return super().list(request, *args, **kwargs)
|
|
|
|
|
|
class ActionSerializer(serializers.Serializer):
|
|
selectors = serializers.ListSerializer(child=serializers.UUIDField())
|
|
|
|
|
|
class ActionViewSet(viewsets.GenericViewSet):
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
lookup_field = 'action'
|
|
serializer_class = ActionSerializer
|
|
|
|
@action(detail=False, methods=['PUT'])
|
|
def mark_read(self, request: Request) -> Response:
|
|
serializer = ActionSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
comics = self.get_comics(serializer.data['selectors'])
|
|
comic_status = models.ComicStatus.objects.filter(comic__selector__in=comics, user=request.user)
|
|
comic_status = comic_status.annotate(total_pages=F('comic__page_count'))
|
|
status_to_update = []
|
|
for c_status in comic_status:
|
|
c_status.last_read_page = c_status.total_pages - 1
|
|
c_status.unread = False
|
|
c_status.finished = True
|
|
status_to_update.append(c_status)
|
|
comics.remove(str(c_status.comic_id))
|
|
for new_status in comics:
|
|
comic = models.ComicBook.objects.get(selector=new_status)
|
|
obj, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
|
|
obj.unread = False
|
|
obj.finished = True
|
|
obj.last_read_page = comic.total_pages
|
|
status_to_update.append(obj)
|
|
models.ComicStatus.objects.bulk_update(status_to_update, ['unread', 'finished', 'last_read_page'])
|
|
return Response({'status': 'marked_read'})
|
|
return Response(serializer.errors,
|
|
status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
@action(detail=False, methods=['PUT'])
|
|
def mark_unread(self, request: Request) -> Response:
|
|
serializer = ActionSerializer(data=request.data)
|
|
if serializer.is_valid():
|
|
comics = self.get_comics(serializer.data['selectors'])
|
|
comic_status = models.ComicStatus.objects.filter(comic__selector__in=comics, user=request.user)
|
|
status_to_update = []
|
|
for c_status in comic_status:
|
|
c_status.last_read_page = 0
|
|
c_status.unread = True
|
|
c_status.finished = False
|
|
status_to_update.append(c_status)
|
|
comics.remove(str(c_status.comic_id))
|
|
for new_status in comics:
|
|
comic = models.ComicBook.objects.get(selector=new_status)
|
|
obj, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
|
|
obj.unread = True
|
|
obj.finished = False
|
|
obj.last_read_page = 0
|
|
status_to_update.append(obj)
|
|
models.ComicStatus.objects.bulk_update(status_to_update, ['unread', 'finished', 'last_read_page'])
|
|
return Response({'status': 'marked_unread'})
|
|
return Response(serializer.errors,
|
|
status=status.HTTP_400_BAD_REQUEST)
|
|
|
|
def get_comics(self, selectors: Iterable[Union[str, UUID]]) -> List[str]:
|
|
data = set()
|
|
data = data.union(
|
|
set(models.ComicBook.objects.filter(selector__in=selectors).values_list('selector', flat=True)))
|
|
directories = models.Directory.objects.filter(selector__in=selectors)
|
|
if directories:
|
|
for directory in directories:
|
|
data = data.union(
|
|
set(models.ComicBook.objects.filter(directory=directory).values_list('selector', flat=True)))
|
|
data = data.union(self.get_comics(models.Directory.objects.filter(
|
|
parent__in=directories).values_list('selector', flat=True)))
|
|
return [str(x) for x in data]
|
|
|
|
|
|
class RSSSerializer(serializers.Serializer):
|
|
feed_id = serializers.UUIDField()
|
|
|
|
|
|
class AccountSerializer(serializers.ModelSerializer):
|
|
class Meta:
|
|
model = User
|
|
fields = ['username', 'email', 'is_superuser']
|
|
|
|
|
|
class UpdateEmailSerializer(serializers.Serializer):
|
|
username = serializers.CharField()
|
|
email = serializers.EmailField()
|
|
password = serializers.CharField()
|
|
|
|
|
|
class PasswordResetSerializer(serializers.Serializer):
|
|
username = serializers.CharField()
|
|
old_password = serializers.CharField()
|
|
new_password = serializers.CharField(required=False)
|
|
new_password_confirm = serializers.CharField(required=False)
|
|
|
|
def validate_new_password(self, data: str) -> str:
|
|
if data == '':
|
|
return data
|
|
try:
|
|
validate_password(data)
|
|
except ValidationError as err:
|
|
raise serializers.ValidationError(err)
|
|
return data
|
|
|
|
def validate(self, attrs: Dict[str, str]) -> Dict[str, str]:
|
|
super().validate(attrs)
|
|
if attrs['new_password'] != attrs['new_password_confirm']:
|
|
raise serializers.ValidationError('New passwords do not match')
|
|
return attrs
|
|
|
|
|
|
class AccountViewSet(viewsets.GenericViewSet):
|
|
permission_classes = [permissions.IsAuthenticated]
|
|
lookup_field = 'username'
|
|
serializer_class = AccountSerializer
|
|
|
|
@swagger_auto_schema(responses={200: AccountSerializer()})
|
|
@action(detail=False, methods=['PATCH'], serializer_class=PasswordResetSerializer)
|
|
def reset_password(self, request: Request) -> Response:
|
|
serializer = self.get_serializer(data=request.data)
|
|
if serializer.is_valid():
|
|
if request.user.username != serializer.data['username']:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Username does not match account'})
|
|
if not request.user.check_password(serializer.data['old_password']):
|
|
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Password Incorrect'})
|
|
request.user.set_password(serializer.data['new_password'])
|
|
request.user.save()
|
|
return Response(AccountSerializer(request.user).data)
|
|
return Response({"errors": serializer.errors}, status.HTTP_400_BAD_REQUEST)
|
|
|
|
def list(self, request: Request) -> Response:
|
|
serializer = self.get_serializer(request.user)
|
|
return Response(serializer.data)
|
|
|
|
@swagger_auto_schema(responses={200: AccountSerializer()})
|
|
@action(detail=False, methods=['PATCH'], serializer_class=UpdateEmailSerializer)
|
|
def update_email(self, request: Request) -> Response:
|
|
serializer = self.get_serializer(data=request.data)
|
|
if serializer.is_valid():
|
|
if request.user.username != serializer.data['username']:
|
|
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Username does not match account'})
|
|
if not request.user.check_password(serializer.data['password']):
|
|
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Password Incorrect'})
|
|
request.user.email = serializer.data['email']
|
|
request.user.save()
|
|
account = AccountSerializer(request.user)
|
|
return Response(account.data)
|
|
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: RSSSerializer()})
|
|
@action(methods=['get'], detail=False, serializer_class=RSSSerializer)
|
|
def feed_id(self, request: Request) -> Response:
|
|
"""
|
|
Return the RSS feed id needed to get users RSS Feed.
|
|
"""
|
|
user_misc = get_object_or_404(models.UserMisc, user=request.user)
|
|
serializer = self.get_serializer(user_misc)
|
|
return Response(serializer.data)
|
|
|
|
|
|
class DirectorySerializer(serializers.ModelSerializer):
|
|
class Meta:
|
|
model = models.Directory
|
|
fields = ['selector', 'classification']
|
|
extra_kwargs = {
|
|
'selector': {'validators': []},
|
|
}
|
|
|
|
|
|
class DirectoryViewSet(mixins.UpdateModelMixin, viewsets.GenericViewSet):
|
|
serializer_class = DirectorySerializer
|
|
queryset = models.Directory.objects.all()
|
|
permission_classes = [permissions.IsAdminUser]
|
|
lookup_field = 'selector'
|
|
|
|
@swagger_auto_schema(responses={200: DirectorySerializer(many=True)})
|
|
def update(self, request: Request, selector: UUID) -> Response: # pylint: disable=arguments-differ
|
|
"""
|
|
This will set the classification of a directory and all it's children.
|
|
"""
|
|
main_parent = get_object_or_404(models.Directory, selector=selector)
|
|
|
|
serializer = self.get_serializer(data=request.data)
|
|
|
|
if serializer.is_valid():
|
|
main_parent.classification = serializer.data['classification']
|
|
to_update = {main_parent}
|
|
to_visit = {main_parent}
|
|
|
|
while to_visit:
|
|
parent = to_visit.pop()
|
|
for child in models.Directory.objects.filter(parent=parent):
|
|
child.classification = serializer.data['classification']
|
|
to_visit.add(child)
|
|
to_update.add(child)
|
|
models.Directory.objects.bulk_update(to_update, fields=['classification'])
|
|
data = models.Directory.objects.filter(directory__in=to_update)
|
|
response = self.get_serializer(data, many=True)
|
|
return Response(response.data)
|
|
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
|
|
|
|
def partial_update(self, request: Request, *args, **kwargs) -> Response:
|
|
"""
|
|
This will set the classification of a directory and none of its children.
|
|
"""
|
|
return super().update(request, *args, **kwargs)
|
|
|
|
|
|
class InitialSetupSerializer(serializers.Serializer):
|
|
username = serializers.CharField()
|
|
email = serializers.EmailField(required=False, allow_blank=True)
|
|
password = serializers.CharField()
|
|
|
|
|
|
class InitialSetupRequired(serializers.Serializer):
|
|
required = serializers.BooleanField()
|
|
|
|
|
|
class InitialSetup(viewsets.GenericViewSet):
|
|
permission_classes = [permissions.AllowAny]
|
|
|
|
@swagger_auto_schema(responses={status.HTTP_200_OK: InitialSetupRequired(many=False)})
|
|
@action(detail=False, methods=['get'], serializer_class=InitialSetupRequired)
|
|
def required(self, _request: Request) -> Response:
|
|
serializer = self.get_serializer({'required': User.objects.count() == 0})
|
|
return Response(serializer.data)
|
|
|
|
@action(methods=['post'], detail=False, serializer_class=InitialSetupSerializer)
|
|
def create_user(self, request: Request) -> Response:
|
|
if User.objects.count() == 0:
|
|
serializer = self.get_serializer(data=request.data)
|
|
if serializer.is_valid():
|
|
admin = User(
|
|
username=serializer.data['username'],
|
|
email=serializer.data['email'],
|
|
is_superuser=True,
|
|
is_active=True,
|
|
is_staff=True
|
|
)
|
|
admin.set_password(serializer.data['password'])
|
|
admin.save()
|
|
return Response(serializer.data, status.HTTP_201_CREATED)
|
|
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
|
|
return Response({}, status.HTTP_400_BAD_REQUEST)
|