mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 18:54:58 +00:00
246 lines
9 KiB
Python
246 lines
9 KiB
Python
from django.contrib.auth import authenticate, login, logout
|
|
from django.contrib.auth.models import Group, Permission
|
|
from django.http import JsonResponse, HttpResponse
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from rest_framework.decorators import api_view, permission_classes, action
|
|
from rest_framework.response import Response
|
|
from rest_framework import viewsets, status
|
|
from drf_yasg.utils import swagger_auto_schema
|
|
from drf_yasg import openapi
|
|
import json
|
|
from .permissions import IsAdmin, Authenticated
|
|
from dispatcharr.utils import network_access_allowed
|
|
|
|
from .models import User
|
|
from .serializers import UserSerializer, GroupSerializer, PermissionSerializer
|
|
from rest_framework_simplejwt.views import TokenObtainPairView, TokenRefreshView
|
|
|
|
|
|
class TokenObtainPairView(TokenObtainPairView):
|
|
def post(self, request, *args, **kwargs):
|
|
# Custom logic here
|
|
if not network_access_allowed(request, "UI"):
|
|
return Response({"error": "Forbidden"}, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
# Get the response from the parent class first
|
|
response = super().post(request, *args, **kwargs)
|
|
|
|
# If login was successful, update last_login
|
|
if response.status_code == 200:
|
|
username = request.data.get("username")
|
|
if username:
|
|
from django.utils import timezone
|
|
try:
|
|
user = User.objects.get(username=username)
|
|
user.last_login = timezone.now()
|
|
user.save(update_fields=['last_login'])
|
|
except User.DoesNotExist:
|
|
pass # User doesn't exist, but login somehow succeeded
|
|
|
|
return response
|
|
|
|
|
|
class TokenRefreshView(TokenRefreshView):
|
|
def post(self, request, *args, **kwargs):
|
|
# Custom logic here
|
|
if not network_access_allowed(request, "UI"):
|
|
return Response({"error": "Unauthorized"}, status=status.HTTP_403_FORBIDDEN)
|
|
|
|
return super().post(request, *args, **kwargs)
|
|
|
|
|
|
@csrf_exempt # In production, consider CSRF protection strategies or ensure this endpoint is only accessible when no superuser exists.
|
|
def initialize_superuser(request):
|
|
# If a superuser already exists, always indicate that
|
|
if User.objects.filter(is_superuser=True).exists():
|
|
return JsonResponse({"superuser_exists": True})
|
|
|
|
if request.method == "POST":
|
|
try:
|
|
data = json.loads(request.body)
|
|
username = data.get("username")
|
|
password = data.get("password")
|
|
email = data.get("email", "")
|
|
if not username or not password:
|
|
return JsonResponse(
|
|
{"error": "Username and password are required."}, status=400
|
|
)
|
|
# Create the superuser
|
|
User.objects.create_superuser(
|
|
username=username, password=password, email=email, user_level=10
|
|
)
|
|
return JsonResponse({"superuser_exists": True})
|
|
except Exception as e:
|
|
return JsonResponse({"error": str(e)}, status=500)
|
|
# For GET requests, indicate no superuser exists
|
|
return JsonResponse({"superuser_exists": False})
|
|
|
|
|
|
# 🔹 1) Authentication APIs
|
|
class AuthViewSet(viewsets.ViewSet):
|
|
"""Handles user login and logout"""
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Authenticate and log in a user",
|
|
request_body=openapi.Schema(
|
|
type=openapi.TYPE_OBJECT,
|
|
required=["username", "password"],
|
|
properties={
|
|
"username": openapi.Schema(type=openapi.TYPE_STRING),
|
|
"password": openapi.Schema(
|
|
type=openapi.TYPE_STRING, format=openapi.FORMAT_PASSWORD
|
|
),
|
|
},
|
|
),
|
|
responses={200: "Login successful", 400: "Invalid credentials"},
|
|
)
|
|
def login(self, request):
|
|
"""Logs in a user and returns user details"""
|
|
username = request.data.get("username")
|
|
password = request.data.get("password")
|
|
user = authenticate(request, username=username, password=password)
|
|
|
|
if user:
|
|
login(request, user)
|
|
# Update last_login timestamp
|
|
from django.utils import timezone
|
|
user.last_login = timezone.now()
|
|
user.save(update_fields=['last_login'])
|
|
|
|
return Response(
|
|
{
|
|
"message": "Login successful",
|
|
"user": {
|
|
"id": user.id,
|
|
"username": user.username,
|
|
"email": user.email,
|
|
"groups": list(user.groups.values_list("name", flat=True)),
|
|
},
|
|
}
|
|
)
|
|
return Response({"error": "Invalid credentials"}, status=400)
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Log out the current user",
|
|
responses={200: "Logout successful"},
|
|
)
|
|
def logout(self, request):
|
|
"""Logs out the authenticated user"""
|
|
logout(request)
|
|
return Response({"message": "Logout successful"})
|
|
|
|
|
|
# 🔹 2) User Management APIs
|
|
class UserViewSet(viewsets.ModelViewSet):
|
|
"""Handles CRUD operations for Users"""
|
|
|
|
queryset = User.objects.all()
|
|
serializer_class = UserSerializer
|
|
|
|
def get_permissions(self):
|
|
if self.action in ["me", "preferences"]:
|
|
return [Authenticated()]
|
|
|
|
return [IsAdmin()]
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Retrieve a list of users",
|
|
responses={200: UserSerializer(many=True)},
|
|
)
|
|
def list(self, request, *args, **kwargs):
|
|
return super().list(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(operation_description="Retrieve a specific user by ID")
|
|
def retrieve(self, request, *args, **kwargs):
|
|
return super().retrieve(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(operation_description="Create a new user")
|
|
def create(self, request, *args, **kwargs):
|
|
return super().create(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(operation_description="Update a user")
|
|
def update(self, request, *args, **kwargs):
|
|
return super().update(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(operation_description="Delete a user")
|
|
def destroy(self, request, *args, **kwargs):
|
|
return super().destroy(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(
|
|
method="get",
|
|
operation_description="Get active user information",
|
|
)
|
|
@action(detail=False, methods=["get"], url_path="me")
|
|
def me(self, request):
|
|
user = request.user
|
|
serializer = UserSerializer(user)
|
|
return Response(serializer.data)
|
|
|
|
@swagger_auto_schema(
|
|
method="get",
|
|
operation_description="Get current user's preferences (custom_properties)",
|
|
)
|
|
@swagger_auto_schema(
|
|
method="patch",
|
|
operation_description="Update current user's preferences (custom_properties)",
|
|
)
|
|
@action(detail=False, methods=["get", "patch"], url_path="me/preferences")
|
|
def preferences(self, request):
|
|
user = request.user
|
|
if request.method == "GET":
|
|
return Response(user.custom_properties or {})
|
|
|
|
data = request.data or {}
|
|
# Merge incoming data into existing custom_properties (shallow merge)
|
|
props = user.custom_properties or {}
|
|
props.update(data)
|
|
user.custom_properties = props
|
|
user.save(update_fields=["custom_properties"])
|
|
return Response(user.custom_properties)
|
|
|
|
|
|
# 🔹 3) Group Management APIs
|
|
class GroupViewSet(viewsets.ModelViewSet):
|
|
"""Handles CRUD operations for Groups"""
|
|
|
|
queryset = Group.objects.all()
|
|
serializer_class = GroupSerializer
|
|
permission_classes = [Authenticated]
|
|
|
|
@swagger_auto_schema(
|
|
operation_description="Retrieve a list of groups",
|
|
responses={200: GroupSerializer(many=True)},
|
|
)
|
|
def list(self, request, *args, **kwargs):
|
|
return super().list(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(operation_description="Retrieve a specific group by ID")
|
|
def retrieve(self, request, *args, **kwargs):
|
|
return super().retrieve(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(operation_description="Create a new group")
|
|
def create(self, request, *args, **kwargs):
|
|
return super().create(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(operation_description="Update a group")
|
|
def update(self, request, *args, **kwargs):
|
|
return super().update(request, *args, **kwargs)
|
|
|
|
@swagger_auto_schema(operation_description="Delete a group")
|
|
def destroy(self, request, *args, **kwargs):
|
|
return super().destroy(request, *args, **kwargs)
|
|
|
|
|
|
# 🔹 4) Permissions List API
|
|
@swagger_auto_schema(
|
|
method="get",
|
|
operation_description="Retrieve a list of all permissions",
|
|
responses={200: PermissionSerializer(many=True)},
|
|
)
|
|
@api_view(["GET"])
|
|
@permission_classes([Authenticated])
|
|
def list_permissions(request):
|
|
"""Returns a list of all available permissions"""
|
|
permissions = Permission.objects.all()
|
|
serializer = PermissionSerializer(permissions, many=True)
|
|
return Response(serializer.data)
|