Files
cbwebreader/comic/rest.py
Ajurna c5633bf54a New Frontend in Vue with drf interface (#72)
* frontend rewrite with vie initial commit

* got ComicCard.vue working nice.

* got TheComicList.vue working.

* added router and basic config

* getting jwt stuff working.

* login with jwt now working.

* implemented browse api call

* implemented browse api recievers

* jwt token is now updating automatically.

* removed code for jwt testing.

* enabled browsing

* breadcrumbs working

* adding django webpack loader

* linking up navigation

* fixes for ComicCard.vue stying

* added thumbnail view

* added thumbnail generation and handling.

* detached breadcrumbs

* fix breadcrumbs

* added first stages of reader

* reader view is working.

* reader is now working with keyboard shortcuts

* implemented setting read page.

* implemented pagination on comic reader.

* hide elements that shouldn't be shown.

* fixed the ComicCard.vue to use as little space as possible.

* fix navbar browse link

* added RecentView.vue and added manual option for breadcrumbs

* updated rest api to handle recent comics.

* most functionality of recent comics done

* modified comicstatus relation to use uuid relation and implemented mark read and unread for batches.

* added functions to TheRecentTable.vue

* added feed link to TheRecentTable.vue

* fixes for comicstatus updates.

* added constraints to comicstatus

* update to python packages.

* some changes for django 4, also removed django-recaptcha2 as it doesnt support django 4.

* some fixes and updates to ComicCard.vue

* cleaned up generate_directory. fixed bug where pages not visible on first call.

* cleaned up generate_directory. fixed bug where pages not visible on first call.

* cleaned up generate_directory. fixed bug where pages not visible on first call.

* cleaned up generate_directory.

* added silk stubs

* fix for re-requesting thumbnail after getting it already.

* fix for removing stale comics.
adding leeway to access token.

* mark read and unread

* added filtering to comic list.

* stored filtering state.

* stored filtering state.

* added next functionality to login.

* cleanup LoginView.vue

* bump font-awesome.

* working on AccountView.vue

* fixed form submission on LoginView.vue

* account page should now be working.

* hide users option if not superuser.

* added pdf support

* make pdf resize.

* added touch controls to pdf reader

* added touch controls to comic reader

* beginnings of routing between issues.

* fixes for navigating pages.

* fixes for navigating pages.

* fixes for navigating pages.

* renamed HomeView.vue to BrowseView.vue

* stubs for users page added. api ready

* users page further functinality

* fix for notification

* fix for notification

* moved messages to parent.

* form to add users

* added error handling

* removed console logging

* classification in base directory should be lowest

* renamed usermisc to classification to be more consistent with what it does.

* renamed usermisc to classification to be more consistent with what it does.

* added functionality to change classification of directories.

* merged rss_id api into account api.

* merged breadcrumbs api into browse api.

* clears some warnings from console.

* fixed read/unread rendering.

* added build script and starting lint

* fixing lint errors

* fixing lint errors

* fixing lint errors

* fixing lint errors

* fixing lint errors

* fixing lint errors

* fixing lint errors

* fixing lint errors

* fixing navigation bugs

* cleanup and fixes

* fixed generated tooltips over calling.

* fixed classifications.

* initial setup now working

* fix navbar branding

* fix favicon

* added beta build script.

* fixes to get ready for production

* optimisations for loading new comics.

* added loading indicators to TheComicList.vue

* lint fixes

* made two methods static. may use them elsewhere.

* fix for scanning files.

* version updates.

* fixes for production

* fixes for production

Co-authored-by: Peter Dwyer <peter.dwyer@clanwilliamhealth.com>
2022-08-25 15:42:20 +01:00

759 lines
31 KiB
Python

import mimetypes
from itertools import chain
from pathlib import Path
from typing import Union, NamedTuple, List
from uuid import UUID
from django.conf import settings
from django.contrib.auth.models import User, Group
from django.contrib.auth.password_validation import validate_password
from django.core.exceptions import ValidationError
from django.db.models import Count, Case, When, F, PositiveSmallIntegerField, Q
from django.http import FileResponse
from drf_yasg.utils import swagger_auto_schema
from rest_framework import viewsets, serializers, mixins, permissions, status, renderers
from rest_framework.decorators import action
from rest_framework.generics import get_object_or_404
from rest_framework.pagination import PageNumberPagination
from rest_framework.request import Request
from rest_framework.response import Response
from comic import models
from comic.errors import NotCompatibleArchive
from comic.util import generate_breadcrumbs_from_path
class UserSerializer(serializers.ModelSerializer):
classification = serializers.SlugRelatedField(many=False, read_only=True, slug_field='allowed_to_read',
source='usermisc')
class Meta:
model = User
fields = ['id', 'username', 'email', 'is_superuser', 'classification']
class AdminPasswordResetSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField(required=False)
class ClassificationSerializer(serializers.Serializer):
classification = serializers.IntegerField()
def validate_classification(self, data):
if data in models.Directory.Classification:
return data
raise serializers.ValidationError('Invalid Classification sent.')
class UserViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows users to be viewed or edited.
"""
queryset = User.objects.all().order_by('username')
serializer_class = UserSerializer
permission_classes = [permissions.IsAdminUser]
@action(methods=['patch'], detail=True, serializer_class=AdminPasswordResetSerializer)
def reset_password(self, request: Request, pk: int) -> Response:
"""
This will return a new password set on the user.
"""
target_user = get_object_or_404(User, id=pk)
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
if target_user.username == serializer.data['username']:
password = User.objects.make_random_password()
target_user.set_password(password)
resp_serializer = self.get_serializer({
'username': target_user.username,
'password': password
})
return Response(resp_serializer.data)
return Response({'errors': ['Invalid request']}, status.HTTP_400_BAD_REQUEST)
@action(methods=['patch'], detail=True, serializer_class=ClassificationSerializer)
def set_classification(self, request: Request, pk: int) -> Response:
"""
API Endpoint that will set the classification on the specified user.
"""
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
misc, _ = models.UserMisc.objects.get_or_create(user_id=pk)
misc.allowed_to_read = serializer.data['classification']
misc.save()
return Response(data={'classification': misc.allowed_to_read})
else:
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
class UserMiscSerializer(serializers.ModelSerializer):
class Meta:
model = models.UserMisc
fields = ['user', 'feed_id', 'allowed_to_read']
class UserMiscViewSet(viewsets.ModelViewSet):
queryset = models.UserMisc.objects.all()
serializer_class = UserMiscSerializer
permission_classes = [permissions.IsAdminUser]
class GroupSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = Group
fields = ['url', 'name']
class GroupViewSet(viewsets.ModelViewSet):
"""
API endpoint that allows groups to be viewed or edited.
"""
queryset = Group.objects.all()
serializer_class = GroupSerializer
permission_classes = [permissions.IsAuthenticated]
class BrowseFileField(serializers.FileField):
def to_representation(self, value):
if not value:
return None
return Path(settings.MEDIA_URL, value.name).as_posix()
class BrowseSerializer(serializers.Serializer):
selector = serializers.UUIDField()
title = serializers.CharField()
progress = serializers.IntegerField()
total = serializers.IntegerField()
type = serializers.CharField()
thumbnail = BrowseFileField()
classification = serializers.IntegerField()
finished = serializers.BooleanField()
unread = serializers.BooleanField()
class BreadcrumbSerializer(serializers.Serializer):
id = serializers.IntegerField()
selector = serializers.UUIDField()
name = serializers.CharField()
class ArchiveFile(NamedTuple):
file_name: str
mime_type: str
class BrowseViewSet(viewsets.GenericViewSet):
serializer_class = BrowseSerializer
permission_classes = [permissions.IsAuthenticated]
lookup_field = 'selector'
def list(self, request):
serializer = self.get_serializer(self.generate_directory(request.user), many=True)
return Response(serializer.data)
def retrieve(self, request, selector: UUID):
directory = models.Directory.objects.get(selector=selector)
serializer = self.get_serializer(self.generate_directory(request.user, directory), many=True)
return Response(serializer.data)
@swagger_auto_schema(responses={status.HTTP_200_OK: BreadcrumbSerializer(many=True)})
@action(methods=['get'], detail=True, serializer_class=BreadcrumbSerializer)
def breadcrumbs(self, _request: Request, selector: UUID) -> Response:
queryset = []
comic = False
try:
directory = models.Directory.objects.get(selector=selector)
except models.Directory.DoesNotExist:
comic = models.ComicBook.objects.get(selector=selector)
directory = comic.directory
for index, item in enumerate(generate_breadcrumbs_from_path(directory, comic)):
queryset.append({
"id": index,
"selector": item.selector,
"name": item.name,
})
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
@staticmethod
def clean_directories(directories, dir_path, directory=None):
dir_db_set = set([Path(settings.COMIC_BOOK_VOLUME, x.path) for x in directories])
dir_list = set([x for x in sorted(dir_path.glob('*')) if x.is_dir()])
# Create new directories db instances
for new_directory in dir_list - dir_db_set:
models.Directory(name=new_directory.name, parent=directory).save()
# Remove stale db instances
for stale_directory in dir_db_set - dir_list:
models.Directory.objects.get(name=stale_directory.name, parent=directory).delete()
@staticmethod
def get_archive_files(archive) -> List[ArchiveFile]:
return [
ArchiveFile(x, mimetypes.guess_type(x)[0]) for x in sorted(archive.namelist())
if not x.endswith('/') and mimetypes.guess_type(x)[0]
]
@staticmethod
def clean_files(files, user, dir_path, directory=None):
file_list = set([x for x in sorted(dir_path.glob('*')) if x.is_file()])
files_db_set = set([Path(dir_path, x.file_name) for x in files])
# Parse new comics
books_to_add = []
for new_comic in file_list - files_db_set:
if new_comic.suffix.lower() in settings.SUPPORTED_FILES:
books_to_add.append(
models.ComicBook(file_name=new_comic.name, directory=directory)
)
models.ComicBook.objects.bulk_create(books_to_add)
pages_to_add = []
status_to_add = []
for book in books_to_add:
status_to_add.append(models.ComicStatus(user=user, comic=book))
try:
archive, archive_type = book.get_archive()
if archive_type == 'archive':
pages_to_add.extend([
models.ComicPage(
Comic=book, index=idx, page_file_name=page.file_name, content_type=page.mime_type
) for idx, page in enumerate(BrowseViewSet.get_archive_files(archive))
])
elif archive_type == 'pdf':
pages_to_add.extend([
models.ComicPage(
Comic=book, index=idx, page_file_name=idx + 1, content_type='application/pdf'
) for idx in range(archive.page_count)
])
except NotCompatibleArchive:
pass
models.ComicStatus.objects.bulk_create(status_to_add)
models.ComicPage.objects.bulk_create(pages_to_add)
# Remove stale comic instances
for stale_comic in files_db_set - file_list:
models.ComicBook.objects.get(file_name=stale_comic.name, directory=directory).delete()
def generate_directory(self, user: User, directory=None):
"""
:type user: User
:type directory: Directory
"""
dir_path = Path(settings.COMIC_BOOK_VOLUME, directory.path) if directory else settings.COMIC_BOOK_VOLUME
files = []
dir_db_query = models.Directory.objects.filter(parent=directory)
self.clean_directories(dir_db_query, dir_path, directory)
file_db_query = models.ComicBook.objects.filter(directory=directory)
self.clean_files(file_db_query, user, dir_path, directory)
dir_db_query = dir_db_query.annotate(
total=Count('comicbook', distinct=True),
progress=Count('comicbook__comicstatus', Q(comicbook__comicstatus__finished=True,
comicbook__comicstatus__user=user), distinct=True),
finished=Q(total=F('progress')),
unread=Q(total__gt=F('progress'))
)
files.extend(dir_db_query)
# Create Missing Status
new_status = [models.ComicStatus(comic=file, user=user) for file in
file_db_query.exclude(comicstatus__in=models.ComicStatus.objects.filter(
comic__in=file_db_query, user=user))]
models.ComicStatus.objects.bulk_create(new_status)
file_db_query = file_db_query.annotate(
total=Count('comicpage', distinct=True),
progress=F('comicstatus__last_read_page') + 1,
finished=F('comicstatus__finished'),
unread=F('comicstatus__unread'),
user=F('comicstatus__user'),
classification=Case(
When(directory__isnull=True, then=models.Directory.Classification.C_G),
default=F('directory__classification'),
output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices)
)
).filter(Q(user__isnull=True) | Q(user=user.id))
files.extend(file_db_query)
for file in chain(file_db_query, dir_db_query):
if file.thumbnail and not Path(file.thumbnail.path).exists():
file.thumbnail.delete()
file.save()
files.sort(key=lambda x: x.title)
files.sort(key=lambda x: x.type, reverse=True)
return files
class GenerateThumbnailSerializer(serializers.Serializer):
selector = serializers.UUIDField()
thumbnail = BrowseFileField()
class GenerateThumbnailViewSet(viewsets.ViewSet):
permission_classes = [permissions.IsAuthenticated]
serializer_class = GenerateThumbnailSerializer
lookup_field = 'selector'
def retrieve(self, _request, selector: UUID):
try:
directory = models.Directory.objects.get(selector=selector)
if not directory.thumbnail:
directory.generate_thumbnail()
return Response(
self.serializer_class({
"selector": directory.selector,
"thumbnail": directory.thumbnail
}).data
)
except models.Directory.DoesNotExist:
comic = models.ComicBook.objects.get(selector=selector)
if not comic.thumbnail:
comic.generate_thumbnail()
return Response(
self.serializer_class({
"selector": comic.selector,
"thumbnail": comic.thumbnail
}).data
)
class PageSerializer(serializers.Serializer):
index = serializers.IntegerField()
page_file_name = serializers.CharField()
content_type = serializers.CharField()
class DirectionSerializer(serializers.Serializer):
route = serializers.ChoiceField(choices=['read', 'browse'])
selector = serializers.UUIDField(required=False)
class ReadSerializer(serializers.Serializer):
selector = serializers.UUIDField()
title = serializers.CharField()
last_read_page = serializers.IntegerField()
prev_comic = DirectionSerializer()
next_comic = DirectionSerializer()
pages = PageSerializer(many=True)
class TypeSerializer(serializers.Serializer):
type = serializers.CharField()
class ReadPageSerializer(serializers.Serializer):
page = serializers.IntegerField(source='last_read_page')
class ReadViewSet(viewsets.GenericViewSet):
serializer_class = ReadSerializer
permission_classes = [permissions.IsAuthenticated]
lookup_field = 'selector'
@swagger_auto_schema(responses={status.HTTP_200_OK: ReadSerializer()})
def retrieve(self, request: Request, selector: UUID) -> Response:
comic = get_object_or_404(models.ComicBook, selector=selector)
misc, _ = models.UserMisc.objects.get_or_create(user=request.user)
pages = models.ComicPage.objects.filter(Comic=comic)
comic_status, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
comic_list = list(models.ComicBook.objects.filter(directory=comic.directory).order_by('file_name'))
comic_index = comic_list.index(comic)
try:
prev_comic = {'route': 'browse', 'selector': comic.directory.selector} if comic_index == 0 else \
{'route': 'read', 'selector': comic_list[comic_index-1].selector}
except AttributeError:
prev_comic = {'route': 'browse'}
try:
next_comic = {'route': 'browse', 'selector': comic.directory.selector} if comic_index+1 == len(comic_list) \
else {'route': 'read', 'selector': comic_list[comic_index+1].selector}
except AttributeError:
next_comic = {'route': 'browse'}
data = {
"selector": comic.selector,
"title": comic.file_name,
"last_read_page": comic_status.last_read_page,
"prev_comic": prev_comic,
"next_comic": next_comic,
"pages": pages,
}
serializer = self.serializer_class(data)
return Response(serializer.data)
@swagger_auto_schema(responses={status.HTTP_200_OK: 'PDF Binary Data',
status.HTTP_400_BAD_REQUEST: 'User below classification allowed'})
@action(methods=['get'], detail=True)
def pdf(self, request: Request, selector: UUID) -> Union[FileResponse, Response]:
book = models.ComicBook.objects.get(selector=selector)
misc, _ = models.UserMisc.objects.get_or_create(user=request.user)
try:
if book.directory.classification > misc.allowed_to_read:
return Response(status=400, data={'errors': 'Not allowed to read.'})
except AttributeError:
pass
return FileResponse(open(book.get_pdf(), 'rb'), content_type='application/pdf')
@swagger_auto_schema(responses={status.HTTP_200_OK: TypeSerializer()})
@action(methods=['get'], detail=True)
def type(self, _request: Request, selector: UUID) -> Response:
book = models.ComicBook.objects.get(selector=selector)
return Response({'type': book.file_name.split('.')[-1].lower()})
@swagger_auto_schema(responses={status.HTTP_200_OK: ReadPageSerializer()}, request_body=ReadPageSerializer)
@action(methods=['put'], detail=True, serializer_class=ReadPageSerializer)
def set_page(self, request: Request, selector: UUID) -> Response:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
comic_status, _ = models.ComicStatus.objects.annotate(page_count=Count('comic__comicpage'))\
.get_or_create(comic_id=selector, user=request.user)
comic_status.last_read_page = serializer.data['page']
comic_status.unread = False
if comic_status.page_count-1 == comic_status.last_read_page:
comic_status.finished = True
else:
comic_status.finished = False
comic_status.save()
return Response(self.get_serializer(comic_status).data)
else:
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class PassthroughRenderer(renderers.BaseRenderer):
"""
Return data as-is. View should supply a Response.
"""
media_type = '*/*'
format = ''
def render(self, data, accepted_media_type=None, renderer_context=None):
return data
class ImageViewSet(viewsets.ViewSet):
queryset = models.ComicPage.objects.all()
lookup_field = 'page'
renderer_classes = [PassthroughRenderer]
def retrieve(self, _request, parent_lookup_selector, page):
book = models.ComicBook.objects.get(selector=parent_lookup_selector)
img, content = book.get_image(int(page))
self.renderer_classes[0].media_type = content
response = FileResponse(img, content_type=content)
return response
class StandardResultsSetPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 100
class RecentComicsSerializer(serializers.ModelSerializer):
total_pages = serializers.IntegerField()
unread = serializers.BooleanField()
finished = serializers.BooleanField()
last_read_page = serializers.IntegerField()
class Meta:
model = models.ComicBook
fields = ['file_name', 'date_added', 'selector', 'total_pages', 'unread', 'finished', 'last_read_page']
class RecentComicsView(mixins.ListModelMixin, viewsets.GenericViewSet):
queryset = models.ComicBook.objects.all()
serializer_class = RecentComicsSerializer
pagination_class = StandardResultsSetPagination
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
user = self.request.user
if "search_text" in self.request.query_params:
query = models.ComicBook.objects.filter(file_name__icontains=self.request.query_params["search_text"])
else:
query = models.ComicBook.objects.all()
query = query.annotate(
total_pages=Count('comicpage'),
unread=Case(When(comicstatus__user=user, then='comicstatus__unread')),
finished=Case(When(comicstatus__user=user, then='comicstatus__finished')),
last_read_page=Case(When(comicstatus__user=user, then='comicstatus__last_read_page')) + 1,
classification=Case(
When(directory__isnull=True, then=models.Directory.Classification.C_18),
default=F('directory__classification'),
output_field=PositiveSmallIntegerField(choices=models.Directory.Classification.choices)
)
)
query = query.order_by('-date_added')
return query
class ActionSerializer(serializers.Serializer):
selectors = serializers.ListSerializer(child=serializers.UUIDField())
class ActionViewSet(viewsets.GenericViewSet):
permission_classes = [permissions.IsAuthenticated]
lookup_field = 'action'
serializer_class = ActionSerializer
@action(detail=False, methods=['PUT'])
def mark_read(self, request):
serializer = ActionSerializer(data=request.data)
if serializer.is_valid():
comics = self.get_comics(serializer.data['selectors'])
comic_status = models.ComicStatus.objects.filter(comic__selector__in=comics, user=request.user)
comic_status = comic_status.annotate(total_pages=Count('comic__comicpage'))
status_to_update = []
for c_status in comic_status:
c_status.last_read_page = c_status.total_pages-1
c_status.unread = False
c_status.finished = True
status_to_update.append(c_status)
comics.remove(str(c_status.comic_id))
for new_status in comics:
comic = models.ComicBook.objects.annotate(
total_pages=Count('comicpage')).get(selector=new_status)
obj, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
obj.unread = False
obj.finished = True
obj.last_read_page = comic.total_pages
status_to_update.append(obj)
models.ComicStatus.objects.bulk_update(status_to_update, ['unread', 'finished', 'last_read_page'])
return Response({'status': 'marked_read'})
else:
return Response(serializer.errors,
status=status.HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['PUT'])
def mark_unread(self, request):
serializer = ActionSerializer(data=request.data)
if serializer.is_valid():
serializer = ActionSerializer(data=request.data)
if serializer.is_valid():
comics = self.get_comics(serializer.data['selectors'])
comic_status = models.ComicStatus.objects.filter(comic__selector__in=comics, user=request.user)
status_to_update = []
for c_status in comic_status:
c_status.last_read_page = 0
c_status.unread = True
c_status.finished = False
status_to_update.append(c_status)
comics.remove(str(c_status.comic_id))
for new_status in comics:
comic = models.ComicBook.objects.get(selector=new_status)
obj, _ = models.ComicStatus.objects.get_or_create(comic=comic, user=request.user)
obj.unread = True
obj.finished = False
obj.last_read_page = 0
status_to_update.append(obj)
models.ComicStatus.objects.bulk_update(status_to_update, ['unread', 'finished', 'last_read_page'])
return Response({'status': 'marked_unread'})
else:
return Response(serializer.errors,
status=status.HTTP_400_BAD_REQUEST)
def get_comics(self, selectors):
data = set()
data = data.union(
set(models.ComicBook.objects.filter(selector__in=selectors).values_list('selector', flat=True)))
directories = models.Directory.objects.filter(selector__in=selectors)
if directories:
for directory in directories:
data = data.union(
set(models.ComicBook.objects.filter(directory=directory).values_list('selector', flat=True)))
data = data.union(self.get_comics(models.Directory.objects.filter(
parent__in=directories).values_list('selector', flat=True)))
return [str(x) for x in data]
class RSSSerializer(serializers.Serializer):
feed_id = serializers.UUIDField()
class AccountSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['username', 'email', 'is_superuser']
class UpdateEmailSerializer(serializers.Serializer):
username = serializers.CharField()
email = serializers.EmailField()
password = serializers.CharField()
class PasswordResetSerializer(serializers.Serializer):
username = serializers.CharField()
old_password = serializers.CharField()
new_password = serializers.CharField(required=False)
new_password_confirm = serializers.CharField(required=False)
def validate_new_password(self, data):
if data == '':
return data
try:
validate_password(data)
except ValidationError as e:
raise serializers.ValidationError(e)
return data
def validate(self, attrs):
super().validate(attrs)
if attrs['new_password'] != attrs['new_password_confirm']:
raise serializers.ValidationError('New passwords do not match')
return attrs
class AccountViewSet(viewsets.GenericViewSet):
permission_classes = [permissions.IsAuthenticated]
lookup_field = 'username'
serializer_class = AccountSerializer
@swagger_auto_schema(responses={200: AccountSerializer()})
@action(detail=False, methods=['PATCH'], serializer_class=PasswordResetSerializer)
def reset_password(self, request: Request) -> Response:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
if request.user.username != serializer.data['username']:
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Username does not match account'})
if not request.user.check_password(serializer.data['old_password']):
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Password Incorrect'})
request.user.set_password(serializer.data['new_password'])
request.user.save()
return Response(AccountSerializer(request.user).data)
else:
return Response({"errors": serializer.errors}, status.HTTP_400_BAD_REQUEST)
def list(self, request):
serializer = self.get_serializer(request.user)
return Response(serializer.data)
@swagger_auto_schema(responses={200: AccountSerializer()})
@action(detail=False, methods=['PATCH'], serializer_class=UpdateEmailSerializer)
def update_email(self, request: Request) -> Response:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
if request.user.username != serializer.data['username']:
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Username does not match account'})
if not request.user.check_password(serializer.data['password']):
return Response(status=status.HTTP_400_BAD_REQUEST, data={'errors': 'Password Incorrect'})
request.user.email = serializer.data['email']
request.user.save()
account = AccountSerializer(request.user)
return Response(account.data)
else:
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
@swagger_auto_schema(responses={status.HTTP_200_OK: RSSSerializer()})
@action(methods=['get'], detail=False, serializer_class=RSSSerializer)
def feed_id(self, request: Request):
"""
Return the RSS feed id needed to get users RSS Feed.
"""
user_misc = get_object_or_404(models.UserMisc, user=request.user)
serializer = self.get_serializer(user_misc)
return Response(serializer.data)
class DirectorySerializer(serializers.ModelSerializer):
class Meta:
model = models.Directory
fields = ['selector', 'classification']
extra_kwargs = {
'selector': {'validators': []},
}
class DirectoryViewSet(mixins.UpdateModelMixin, viewsets.GenericViewSet):
serializer_class = DirectorySerializer
queryset = models.Directory.objects.all()
permission_classes = [permissions.IsAdminUser]
lookup_field = 'selector'
@swagger_auto_schema(responses={200: DirectorySerializer(many=True)})
def update(self, request: Request, selector: UUID) -> Response:
"""
This will set the classification of a directory and all it's children.
"""
main_parent = get_object_or_404(models.Directory, selector=selector)
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
main_parent.classification = serializer.data['classification']
to_update = {main_parent}
to_visit = {main_parent}
while to_visit:
parent = to_visit.pop()
for child in models.Directory.objects.filter(parent=parent):
child.classification = serializer.data['classification']
to_visit.add(child)
to_update.add(child)
models.Directory.objects.bulk_update(to_update, fields=['classification'])
data = models.Directory.objects.filter(directory__in=to_update)
response = self.get_serializer(data, many=True)
return Response(response.data)
else:
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
def partial_update(self, request, *args, **kwargs):
"""
This will set the classification of a directory and none of its children.
"""
return super().update(request, *args, **kwargs)
class InitialSetupSerializer(serializers.Serializer):
username = serializers.CharField()
email = serializers.EmailField(required=False, allow_blank=True)
password = serializers.CharField()
class InitialSetupRequired(serializers.Serializer):
required = serializers.BooleanField()
class InitialSetup(viewsets.GenericViewSet):
permission_classes = [permissions.AllowAny]
@swagger_auto_schema(responses={status.HTTP_200_OK: InitialSetupRequired(many=False)})
@action(detail=False, methods=['get'], serializer_class=InitialSetupRequired)
def required(self, _request: Request) -> Response:
serializer = self.get_serializer({'required': User.objects.count() == 0})
return Response(serializer.data)
@action(methods=['post'], detail=False, serializer_class=InitialSetupSerializer)
def create_user(self, request: Request) -> Response:
if User.objects.count() == 0:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
admin = User(
username=serializer.data['username'],
email=serializer.data['email'],
is_superuser=True,
is_active=True,
is_staff=True
)
admin.set_password(serializer.data['password'])
admin.save()
return Response(serializer.data, status.HTTP_201_CREATED)
else:
return Response(serializer.errors, status.HTTP_400_BAD_REQUEST)
else:
return Response({}, status.HTTP_400_BAD_REQUEST)