Dispatcharr/apps/plugins/api_views.py
Dispatcharr 5b31440018 Updated Plugins
Added Import + Delete
Added Modal confirmations
Safer Defaults
2025-09-07 11:54:22 -05:00

306 lines
12 KiB
Python

import logging
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.decorators import api_view
from django.conf import settings
from django.core.files.uploadedfile import UploadedFile
import io
import os
import zipfile
import shutil
import tempfile
from apps.accounts.permissions import (
Authenticated,
permission_classes_by_method,
)
from .loader import PluginManager
from .models import PluginConfig
logger = logging.getLogger(__name__)
class PluginsListAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def get(self, request):
pm = PluginManager.get()
# Ensure registry is up-to-date on each request
pm.discover_plugins()
return Response({"plugins": pm.list_plugins()})
class PluginReloadAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def post(self, request):
pm = PluginManager.get()
pm.discover_plugins()
return Response({"success": True, "count": len(pm._registry)})
class PluginImportAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def post(self, request):
file: UploadedFile = request.FILES.get("file")
if not file:
return Response({"success": False, "error": "Missing 'file' upload"}, status=status.HTTP_400_BAD_REQUEST)
pm = PluginManager.get()
plugins_dir = pm.plugins_dir
try:
zf = zipfile.ZipFile(file)
except zipfile.BadZipFile:
return Response({"success": False, "error": "Invalid zip file"}, status=status.HTTP_400_BAD_REQUEST)
# Extract to a temporary directory first to avoid server reload thrash
tmp_root = tempfile.mkdtemp(prefix="plugin_import_")
try:
file_members = [m for m in zf.infolist() if not m.is_dir()]
if not file_members:
shutil.rmtree(tmp_root, ignore_errors=True)
return Response({"success": False, "error": "Archive is empty"}, status=status.HTTP_400_BAD_REQUEST)
for member in file_members:
name = member.filename
if not name or name.endswith("/"):
continue
# Normalize and prevent path traversal
norm = os.path.normpath(name)
if norm.startswith("..") or os.path.isabs(norm):
shutil.rmtree(tmp_root, ignore_errors=True)
return Response({"success": False, "error": "Unsafe path in archive"}, status=status.HTTP_400_BAD_REQUEST)
dest_path = os.path.join(tmp_root, norm)
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
with zf.open(member, 'r') as src, open(dest_path, 'wb') as dst:
shutil.copyfileobj(src, dst)
# Find candidate directory containing plugin.py or __init__.py
candidates = []
for dirpath, dirnames, filenames in os.walk(tmp_root):
has_pluginpy = "plugin.py" in filenames
has_init = "__init__.py" in filenames
if has_pluginpy or has_init:
depth = len(os.path.relpath(dirpath, tmp_root).split(os.sep))
candidates.append((0 if has_pluginpy else 1, depth, dirpath))
if not candidates:
shutil.rmtree(tmp_root, ignore_errors=True)
return Response({"success": False, "error": "Invalid plugin: missing plugin.py or package __init__.py"}, status=status.HTTP_400_BAD_REQUEST)
candidates.sort()
chosen = candidates[0][2]
# Determine plugin key: prefer chosen folder name; if chosen is tmp_root, use zip base name
base_name = os.path.splitext(getattr(file, "name", "plugin"))[0]
plugin_key = os.path.basename(chosen.rstrip(os.sep))
if chosen.rstrip(os.sep) == tmp_root.rstrip(os.sep):
plugin_key = base_name
plugin_key = plugin_key.replace(" ", "_").lower()
final_dir = os.path.join(plugins_dir, plugin_key)
if os.path.exists(final_dir):
# If final dir exists but contains a valid plugin, refuse; otherwise clear it
if os.path.exists(os.path.join(final_dir, "plugin.py")) or os.path.exists(os.path.join(final_dir, "__init__.py")):
shutil.rmtree(tmp_root, ignore_errors=True)
return Response({"success": False, "error": f"Plugin '{plugin_key}' already exists"}, status=status.HTTP_400_BAD_REQUEST)
try:
shutil.rmtree(final_dir)
except Exception:
pass
# Move chosen directory into final location
if chosen.rstrip(os.sep) == tmp_root.rstrip(os.sep):
# Move all contents into final_dir
os.makedirs(final_dir, exist_ok=True)
for item in os.listdir(tmp_root):
shutil.move(os.path.join(tmp_root, item), os.path.join(final_dir, item))
else:
shutil.move(chosen, final_dir)
# Cleanup temp
shutil.rmtree(tmp_root, ignore_errors=True)
target_dir = final_dir
finally:
try:
shutil.rmtree(tmp_root, ignore_errors=True)
except Exception:
pass
# Reload discovery and validate plugin entry
pm.discover_plugins()
plugin = pm._registry.get(plugin_key)
if not plugin:
# Cleanup the copied folder to avoid leaving invalid plugin behind
try:
shutil.rmtree(target_dir, ignore_errors=True)
except Exception:
pass
return Response({"success": False, "error": "Invalid plugin: missing Plugin class in plugin.py or __init__.py"}, status=status.HTTP_400_BAD_REQUEST)
# Extra validation: ensure Plugin.run exists
instance = getattr(plugin, "instance", None)
run_method = getattr(instance, "run", None)
if not callable(run_method):
try:
shutil.rmtree(target_dir, ignore_errors=True)
except Exception:
pass
return Response({"success": False, "error": "Invalid plugin: Plugin class must define a callable run(action, params, context)"}, status=status.HTTP_400_BAD_REQUEST)
# Find DB config to return enabled/ever_enabled
try:
cfg = PluginConfig.objects.get(key=plugin_key)
enabled = cfg.enabled
ever_enabled = getattr(cfg, "ever_enabled", False)
except PluginConfig.DoesNotExist:
enabled = False
ever_enabled = False
return Response({
"success": True,
"plugin": {
"key": plugin.key,
"name": plugin.name,
"version": plugin.version,
"description": plugin.description,
"enabled": enabled,
"ever_enabled": ever_enabled,
"fields": plugin.fields or [],
"actions": plugin.actions or [],
}
})
class PluginSettingsAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def post(self, request, key):
pm = PluginManager.get()
data = request.data or {}
settings = data.get("settings", {})
try:
updated = pm.update_settings(key, settings)
return Response({"success": True, "settings": updated})
except Exception as e:
return Response({"success": False, "error": str(e)}, status=status.HTTP_400_BAD_REQUEST)
class PluginRunAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def post(self, request, key):
pm = PluginManager.get()
action = request.data.get("action")
params = request.data.get("params", {})
if not action:
return Response({"success": False, "error": "Missing 'action'"}, status=status.HTTP_400_BAD_REQUEST)
# Respect plugin enabled flag
try:
cfg = PluginConfig.objects.get(key=key)
if not cfg.enabled:
return Response({"success": False, "error": "Plugin is disabled"}, status=status.HTTP_403_FORBIDDEN)
except PluginConfig.DoesNotExist:
return Response({"success": False, "error": "Plugin not found"}, status=status.HTTP_404_NOT_FOUND)
try:
result = pm.run_action(key, action, params)
return Response({"success": True, "result": result})
except PermissionError as e:
return Response({"success": False, "error": str(e)}, status=status.HTTP_403_FORBIDDEN)
except Exception as e:
logger.exception("Plugin action failed")
return Response({"success": False, "error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
class PluginEnabledAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def post(self, request, key):
enabled = request.data.get("enabled")
if enabled is None:
return Response({"success": False, "error": "Missing 'enabled' boolean"}, status=status.HTTP_400_BAD_REQUEST)
try:
cfg = PluginConfig.objects.get(key=key)
cfg.enabled = bool(enabled)
# Mark that this plugin has been enabled at least once
if cfg.enabled and not cfg.ever_enabled:
cfg.ever_enabled = True
cfg.save(update_fields=["enabled", "ever_enabled", "updated_at"])
return Response({"success": True, "enabled": cfg.enabled, "ever_enabled": cfg.ever_enabled})
except PluginConfig.DoesNotExist:
return Response({"success": False, "error": "Plugin not found"}, status=status.HTTP_404_NOT_FOUND)
class PluginDeleteAPIView(APIView):
def get_permissions(self):
try:
return [
perm() for perm in permission_classes_by_method[self.request.method]
]
except KeyError:
return [Authenticated()]
def delete(self, request, key):
pm = PluginManager.get()
plugins_dir = pm.plugins_dir
target_dir = os.path.join(plugins_dir, key)
# Safety: ensure path inside plugins_dir
abs_plugins = os.path.abspath(plugins_dir) + os.sep
abs_target = os.path.abspath(target_dir)
if not abs_target.startswith(abs_plugins):
return Response({"success": False, "error": "Invalid plugin path"}, status=status.HTTP_400_BAD_REQUEST)
# Remove files
if os.path.isdir(target_dir):
try:
shutil.rmtree(target_dir)
except Exception as e:
return Response({"success": False, "error": f"Failed to delete plugin files: {e}"}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# Remove DB record
try:
PluginConfig.objects.filter(key=key).delete()
except Exception:
pass
# Reload registry
pm.discover_plugins()
return Response({"success": True})