mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
3001 lines
135 KiB
Python
3001 lines
135 KiB
Python
# apps/m3u/tasks.py
|
|
import logging
|
|
import re
|
|
import requests
|
|
import os
|
|
import gc
|
|
import gzip, zipfile
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from celery.app.control import Inspect
|
|
from celery.result import AsyncResult
|
|
from celery import shared_task, current_app, group
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
from django.db import transaction
|
|
from .models import M3UAccount
|
|
from apps.channels.models import Stream, ChannelGroup, ChannelGroupM3UAccount
|
|
from asgiref.sync import async_to_sync
|
|
from channels.layers import get_channel_layer
|
|
from django.utils import timezone
|
|
import time
|
|
import json
|
|
from core.utils import (
|
|
RedisClient,
|
|
acquire_task_lock,
|
|
release_task_lock,
|
|
natural_sort_key,
|
|
log_system_event,
|
|
)
|
|
from core.models import CoreSettings, UserAgent
|
|
from asgiref.sync import async_to_sync
|
|
from core.xtream_codes import Client as XCClient
|
|
from core.utils import send_websocket_update
|
|
from .utils import normalize_stream_url
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
BATCH_SIZE = 1500 # Optimized batch size for threading
|
|
m3u_dir = os.path.join(settings.MEDIA_ROOT, "cached_m3u")
|
|
|
|
|
|
def fetch_m3u_lines(account, use_cache=False):
|
|
os.makedirs(m3u_dir, exist_ok=True)
|
|
file_path = os.path.join(m3u_dir, f"{account.id}.m3u")
|
|
|
|
"""Fetch M3U file lines efficiently."""
|
|
if account.server_url:
|
|
if not use_cache or not os.path.exists(file_path):
|
|
try:
|
|
# Try to get account-specific user agent first
|
|
user_agent_obj = account.get_user_agent()
|
|
user_agent = (
|
|
user_agent_obj.user_agent
|
|
if user_agent_obj
|
|
else "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
|
)
|
|
|
|
logger.debug(
|
|
f"Using user agent: {user_agent} for M3U account: {account.name}"
|
|
)
|
|
headers = {"User-Agent": user_agent}
|
|
logger.info(f"Fetching from URL {account.server_url}")
|
|
|
|
# Set account status to FETCHING before starting download
|
|
account.status = M3UAccount.Status.FETCHING
|
|
account.last_message = "Starting download..."
|
|
account.save(update_fields=["status", "last_message"])
|
|
|
|
response = requests.get(
|
|
account.server_url, headers=headers, stream=True
|
|
)
|
|
|
|
# Log the actual response details for debugging
|
|
logger.debug(f"HTTP Response: {response.status_code} from {account.server_url}")
|
|
logger.debug(f"Content-Type: {response.headers.get('content-type', 'Not specified')}")
|
|
logger.debug(f"Content-Length: {response.headers.get('content-length', 'Not specified')}")
|
|
logger.debug(f"Response headers: {dict(response.headers)}")
|
|
|
|
# Check if we've been redirected to a different URL
|
|
if hasattr(response, 'url') and response.url != account.server_url:
|
|
logger.warning(f"Request was redirected from {account.server_url} to {response.url}")
|
|
|
|
# Check for ANY non-success status code FIRST (before raise_for_status)
|
|
if response.status_code < 200 or response.status_code >= 300:
|
|
# For error responses, read the content immediately (not streaming)
|
|
try:
|
|
response_content = response.text[:1000] # Capture up to 1000 characters
|
|
logger.error(f"Error response content: {response_content!r}")
|
|
except Exception as e:
|
|
logger.error(f"Could not read error response content: {e}")
|
|
response_content = "Could not read error response content"
|
|
|
|
# Provide specific messages for known non-standard codes
|
|
if response.status_code == 884:
|
|
error_msg = f"Server returned HTTP 884 (authentication/authorization failure) from URL: {account.server_url}. Server message: {response_content}"
|
|
elif response.status_code >= 800:
|
|
error_msg = f"Server returned non-standard HTTP status {response.status_code} from URL: {account.server_url}. Server message: {response_content}"
|
|
elif response.status_code == 404:
|
|
error_msg = f"M3U file not found (404) at URL: {account.server_url}. Server message: {response_content}"
|
|
elif response.status_code == 403:
|
|
error_msg = f"Access forbidden (403) to M3U file at URL: {account.server_url}. Server message: {response_content}"
|
|
elif response.status_code == 401:
|
|
error_msg = f"Authentication required (401) for M3U file at URL: {account.server_url}. Server message: {response_content}"
|
|
elif response.status_code == 500:
|
|
error_msg = f"Server error (500) while fetching M3U file from URL: {account.server_url}. Server message: {response_content}"
|
|
else:
|
|
error_msg = f"HTTP error ({response.status_code}) while fetching M3U file from URL: {account.server_url}. Server message: {response_content}"
|
|
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id,
|
|
"downloading",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
return [], False
|
|
|
|
# Only call raise_for_status if we have a success code (this should not raise now)
|
|
response.raise_for_status()
|
|
|
|
total_size = int(response.headers.get("Content-Length", 0))
|
|
downloaded = 0
|
|
start_time = time.time()
|
|
last_update_time = start_time
|
|
progress = 0
|
|
temp_content = b"" # Store content temporarily to validate before saving
|
|
has_content = False
|
|
|
|
# First, let's collect the content and validate it
|
|
send_m3u_update(account.id, "downloading", 0)
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
temp_content += chunk
|
|
has_content = True
|
|
|
|
downloaded += len(chunk)
|
|
elapsed_time = time.time() - start_time
|
|
|
|
# Calculate download speed in KB/s
|
|
speed = downloaded / elapsed_time / 1024 # in KB/s
|
|
|
|
# Calculate progress percentage
|
|
if total_size and total_size > 0:
|
|
progress = (downloaded / total_size) * 100
|
|
|
|
# Time remaining (in seconds)
|
|
time_remaining = (
|
|
(total_size - downloaded) / (speed * 1024)
|
|
if speed > 0
|
|
else 0
|
|
)
|
|
|
|
current_time = time.time()
|
|
if current_time - last_update_time >= 0.5:
|
|
last_update_time = current_time
|
|
if progress > 0:
|
|
# Update the account's last_message with detailed progress info
|
|
progress_msg = f"Downloading: {progress:.1f}% - {speed:.1f} KB/s - {time_remaining:.1f}s remaining"
|
|
account.last_message = progress_msg
|
|
account.save(update_fields=["last_message"])
|
|
|
|
send_m3u_update(
|
|
account.id,
|
|
"downloading",
|
|
progress,
|
|
speed=speed,
|
|
elapsed_time=elapsed_time,
|
|
time_remaining=time_remaining,
|
|
message=progress_msg,
|
|
)
|
|
|
|
# Check if we actually received any content
|
|
logger.info(f"Download completed. Has content: {has_content}, Content length: {len(temp_content)} bytes")
|
|
if not has_content or len(temp_content) == 0:
|
|
error_msg = f"Server responded successfully (HTTP {response.status_code}) but provided empty M3U file from URL: {account.server_url}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id,
|
|
"downloading",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
return [], False
|
|
|
|
# Basic validation: check if content looks like an M3U file
|
|
try:
|
|
content_str = temp_content.decode('utf-8', errors='ignore')
|
|
content_lines = content_str.strip().split('\n')
|
|
|
|
# Log first few lines for debugging (be careful not to log too much)
|
|
preview_lines = content_lines[:5]
|
|
logger.info(f"Content preview (first 5 lines): {preview_lines}")
|
|
logger.info(f"Total lines in content: {len(content_lines)}")
|
|
|
|
# Check if it's a valid M3U file (should start with #EXTM3U or contain M3U-like content)
|
|
is_valid_m3u = False
|
|
|
|
# First, check if this looks like an error response disguised as 200 OK
|
|
content_lower = content_str.lower()
|
|
if any(error_indicator in content_lower for error_indicator in [
|
|
'<html', '<!doctype html', 'error', 'not found', '404', '403', '500',
|
|
'access denied', 'unauthorized', 'forbidden', 'invalid', 'expired'
|
|
]):
|
|
logger.warning(f"Content appears to be an error response disguised as HTTP 200: {content_str[:200]!r}")
|
|
# Continue with M3U validation, but this gives us a clue
|
|
|
|
if content_lines and content_lines[0].strip().upper().startswith('#EXTM3U'):
|
|
is_valid_m3u = True
|
|
logger.info("Content validated as M3U: starts with #EXTM3U")
|
|
elif any(line.strip().startswith('#EXTINF:') for line in content_lines):
|
|
is_valid_m3u = True
|
|
logger.info("Content validated as M3U: contains #EXTINF entries")
|
|
elif any(line.strip().startswith('http') for line in content_lines):
|
|
# Has HTTP URLs, might be a simple M3U without headers
|
|
is_valid_m3u = True
|
|
logger.info("Content validated as M3U: contains HTTP URLs")
|
|
elif any(line.strip().startswith(('rtsp', 'rtp', 'udp')) for line in content_lines):
|
|
# Has RTSP/RTP/UDP URLs, might be a simple M3U without headers
|
|
is_valid_m3u = True
|
|
logger.info("Content validated as M3U: contains RTSP/RTP/UDP URLs")
|
|
|
|
if not is_valid_m3u:
|
|
# Log what we actually received for debugging
|
|
logger.error(f"Invalid M3U content received. First 200 characters: {content_str[:200]!r}")
|
|
|
|
# Try to provide more specific error messages based on content
|
|
if '<html' in content_lower or '<!doctype html' in content_lower:
|
|
error_msg = f"Server returned HTML page instead of M3U file from URL: {account.server_url}. This usually indicates an error or authentication issue."
|
|
elif 'error' in content_lower or 'not found' in content_lower:
|
|
error_msg = f"Server returned an error message instead of M3U file from URL: {account.server_url}. Content: {content_str[:100]}"
|
|
elif len(content_str.strip()) == 0:
|
|
error_msg = f"Server returned completely empty response from URL: {account.server_url}"
|
|
else:
|
|
error_msg = f"Server provided invalid M3U content from URL: {account.server_url}. Content does not appear to be a valid M3U file."
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id,
|
|
"downloading",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
return [], False
|
|
|
|
except UnicodeDecodeError:
|
|
logger.error(f"Non-text content received. First 200 bytes: {temp_content[:200]!r}")
|
|
error_msg = f"Server provided non-text content from URL: {account.server_url}. Unable to process as M3U file."
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id,
|
|
"downloading",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
return [], False
|
|
|
|
# Content is valid, save it to file
|
|
with open(file_path, "wb") as file:
|
|
file.write(temp_content)
|
|
|
|
# Final update with 100% progress
|
|
final_msg = f"Download complete. Size: {total_size/1024/1024:.2f} MB, Time: {time.time() - start_time:.1f}s"
|
|
account.last_message = final_msg
|
|
account.save(update_fields=["last_message"])
|
|
send_m3u_update(account.id, "downloading", 100, message=final_msg)
|
|
except requests.exceptions.HTTPError as e:
|
|
# Handle HTTP errors specifically with more context
|
|
status_code = e.response.status_code if e.response else "unknown"
|
|
|
|
# Try to capture the error response content
|
|
response_content = ""
|
|
if e.response:
|
|
try:
|
|
response_content = e.response.text[:500] # Limit to first 500 characters
|
|
logger.error(f"HTTP error response content: {response_content!r}")
|
|
except Exception as content_error:
|
|
logger.error(f"Could not read HTTP error response content: {content_error}")
|
|
response_content = "Could not read error response content"
|
|
|
|
if status_code == 404:
|
|
error_msg = f"M3U file not found (404) at URL: {account.server_url}. Server message: {response_content}"
|
|
elif status_code == 403:
|
|
error_msg = f"Access forbidden (403) to M3U file at URL: {account.server_url}. Server message: {response_content}"
|
|
elif status_code == 401:
|
|
error_msg = f"Authentication required (401) for M3U file at URL: {account.server_url}. Server message: {response_content}"
|
|
elif status_code == 500:
|
|
error_msg = f"Server error (500) while fetching M3U file from URL: {account.server_url}. Server message: {response_content}"
|
|
else:
|
|
error_msg = f"HTTP error ({status_code}) while fetching M3U file from URL: {account.server_url}. Server message: {response_content}"
|
|
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id,
|
|
"downloading",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
return [], False
|
|
except requests.exceptions.RequestException as e:
|
|
# Handle other request errors (connection, timeout, etc.)
|
|
if "timeout" in str(e).lower():
|
|
error_msg = f"Timeout while fetching M3U file from URL: {account.server_url}"
|
|
elif "connection" in str(e).lower():
|
|
error_msg = f"Connection error while fetching M3U file from URL: {account.server_url}"
|
|
else:
|
|
error_msg = f"Network error while fetching M3U file from URL: {account.server_url} - {str(e)}"
|
|
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id,
|
|
"downloading",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
return [], False
|
|
except Exception as e:
|
|
# Handle any other unexpected errors
|
|
error_msg = f"Unexpected error while fetching M3U file from URL: {account.server_url} - {str(e)}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id,
|
|
"downloading",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
return [], False
|
|
|
|
# Check if the file exists and is not empty (fallback check - should not happen with new validation)
|
|
if not os.path.exists(file_path) or os.path.getsize(file_path) == 0:
|
|
error_msg = f"M3U file is unexpectedly missing or empty after validation: {file_path}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id, "downloading", 100, status="error", error=error_msg
|
|
)
|
|
return [], False # Return empty list and False for success
|
|
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
return f.readlines(), True
|
|
except Exception as e:
|
|
error_msg = f"Error reading M3U file: {str(e)}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id, "downloading", 100, status="error", error=error_msg
|
|
)
|
|
return [], False
|
|
|
|
elif account.file_path:
|
|
try:
|
|
if account.file_path.endswith(".gz"):
|
|
with gzip.open(account.file_path, "rt", encoding="utf-8") as f:
|
|
return f.readlines(), True
|
|
|
|
elif account.file_path.endswith(".zip"):
|
|
with zipfile.ZipFile(account.file_path, "r") as zip_file:
|
|
for name in zip_file.namelist():
|
|
if name.endswith(".m3u"):
|
|
with zip_file.open(name) as f:
|
|
return [
|
|
line.decode("utf-8") for line in f.readlines()
|
|
], True
|
|
|
|
error_msg = (
|
|
f"No .m3u file found in ZIP archive: {account.file_path}"
|
|
)
|
|
logger.warning(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id, "downloading", 100, status="error", error=error_msg
|
|
)
|
|
return [], False
|
|
|
|
else:
|
|
with open(account.file_path, "r", encoding="utf-8") as f:
|
|
return f.readlines(), True
|
|
|
|
except (IOError, OSError, zipfile.BadZipFile, gzip.BadGzipFile) as e:
|
|
error_msg = f"Error opening file {account.file_path}: {e}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account.id, "downloading", 100, status="error", error=error_msg
|
|
)
|
|
return [], False
|
|
|
|
# Neither server_url nor uploaded_file is available
|
|
error_msg = "No M3U source available (missing URL and file)"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(account.id, "downloading", 100, status="error", error=error_msg)
|
|
return [], False
|
|
|
|
|
|
def get_case_insensitive_attr(attributes, key, default=""):
|
|
"""Get attribute value using case-insensitive key lookup."""
|
|
for attr_key, attr_value in attributes.items():
|
|
if attr_key.lower() == key.lower():
|
|
return attr_value
|
|
return default
|
|
|
|
|
|
def parse_extinf_line(line: str) -> dict:
|
|
"""
|
|
Parse an EXTINF line from an M3U file.
|
|
This function removes the "#EXTINF:" prefix, then extracts all key="value" attributes,
|
|
and treats everything after the last attribute as the display name.
|
|
|
|
Returns a dictionary with:
|
|
- 'attributes': a dict of attribute key/value pairs (e.g. tvg-id, tvg-logo, group-title)
|
|
- 'display_name': the text after the attributes (the fallback display name)
|
|
- 'name': the value from tvg-name (if present) or the display name otherwise.
|
|
"""
|
|
if not line.startswith("#EXTINF:"):
|
|
return None
|
|
content = line[len("#EXTINF:") :].strip()
|
|
|
|
# Single pass: extract all attributes AND track the last attribute position
|
|
# This regex matches both key="value" and key='value' patterns
|
|
attrs = {}
|
|
last_attr_end = 0
|
|
|
|
# Use a single regex that handles both quote types
|
|
for match in re.finditer(r'([^\s]+)=(["\'])([^\2]*?)\2', content):
|
|
key = match.group(1)
|
|
value = match.group(3)
|
|
attrs[key] = value
|
|
last_attr_end = match.end()
|
|
|
|
# Everything after the last attribute (skipping leading comma and whitespace) is the display name
|
|
if last_attr_end > 0:
|
|
remaining = content[last_attr_end:].strip()
|
|
# Remove leading comma if present
|
|
if remaining.startswith(','):
|
|
remaining = remaining[1:].strip()
|
|
display_name = remaining
|
|
else:
|
|
# No attributes found, try the old comma-split method as fallback
|
|
parts = content.split(',', 1)
|
|
if len(parts) == 2:
|
|
display_name = parts[1].strip()
|
|
else:
|
|
display_name = content.strip()
|
|
|
|
# Use tvg-name attribute if available; otherwise try tvc-guide-title, then fall back to display name.
|
|
name = get_case_insensitive_attr(attrs, "tvg-name", None)
|
|
if not name:
|
|
name = get_case_insensitive_attr(attrs, "tvc-guide-title", None)
|
|
if not name:
|
|
name = display_name
|
|
return {"attributes": attrs, "display_name": display_name, "name": name}
|
|
|
|
|
|
@shared_task
|
|
def refresh_m3u_accounts():
|
|
"""Queue background parse for all active M3UAccounts."""
|
|
active_accounts = M3UAccount.objects.filter(is_active=True)
|
|
count = 0
|
|
for account in active_accounts:
|
|
refresh_single_m3u_account.delay(account.id)
|
|
count += 1
|
|
|
|
msg = f"Queued M3U refresh for {count} active account(s)."
|
|
logger.info(msg)
|
|
return msg
|
|
|
|
|
|
def check_field_lengths(streams_to_create):
|
|
for stream in streams_to_create:
|
|
for field, value in stream.__dict__.items():
|
|
if isinstance(value, str) and len(value) > 255:
|
|
print(f"{field} --- {value}")
|
|
|
|
print("")
|
|
print("")
|
|
|
|
|
|
@shared_task
|
|
def process_groups(account, groups, scan_start_time=None):
|
|
"""Process groups and update their relationships with the M3U account.
|
|
|
|
Args:
|
|
account: M3UAccount instance
|
|
groups: Dict of group names to custom properties
|
|
scan_start_time: Timestamp when the scan started (for consistent last_seen marking)
|
|
"""
|
|
# Use scan_start_time if provided, otherwise current time
|
|
# This ensures consistency with stream processing and cleanup logic
|
|
if scan_start_time is None:
|
|
scan_start_time = timezone.now()
|
|
|
|
existing_groups = {
|
|
group.name: group
|
|
for group in ChannelGroup.objects.filter(name__in=groups.keys())
|
|
}
|
|
logger.info(f"Currently {len(existing_groups)} existing groups")
|
|
|
|
# Check if we should auto-enable new groups based on account settings
|
|
account_custom_props = account.custom_properties or {}
|
|
auto_enable_new_groups_live = account_custom_props.get("auto_enable_new_groups_live", True)
|
|
|
|
# Separate existing groups from groups that need to be created
|
|
existing_group_objs = []
|
|
groups_to_create = []
|
|
|
|
for group_name, custom_props in groups.items():
|
|
if group_name in existing_groups:
|
|
existing_group_objs.append(existing_groups[group_name])
|
|
else:
|
|
groups_to_create.append(ChannelGroup(name=group_name))
|
|
|
|
# Create new groups and fetch them back with IDs
|
|
newly_created_group_objs = []
|
|
if groups_to_create:
|
|
logger.info(f"Creating {len(groups_to_create)} new groups for account {account.id}")
|
|
newly_created_group_objs = list(ChannelGroup.bulk_create_and_fetch(groups_to_create))
|
|
logger.debug(f"Successfully created {len(newly_created_group_objs)} new groups")
|
|
|
|
# Combine all groups
|
|
all_group_objs = existing_group_objs + newly_created_group_objs
|
|
|
|
# Get existing relationships for this account
|
|
existing_relationships = {
|
|
rel.channel_group.name: rel
|
|
for rel in ChannelGroupM3UAccount.objects.filter(
|
|
m3u_account=account,
|
|
channel_group__name__in=groups.keys()
|
|
).select_related('channel_group')
|
|
}
|
|
|
|
relations_to_create = []
|
|
relations_to_update = []
|
|
|
|
for group in all_group_objs:
|
|
custom_props = groups.get(group.name, {})
|
|
|
|
if group.name in existing_relationships:
|
|
# Update existing relationship if xc_id has changed (preserve other custom properties)
|
|
existing_rel = existing_relationships[group.name]
|
|
|
|
# Get existing custom properties (now JSONB, no need to parse)
|
|
existing_custom_props = existing_rel.custom_properties or {}
|
|
|
|
# Get the new xc_id from groups data
|
|
new_xc_id = custom_props.get("xc_id")
|
|
existing_xc_id = existing_custom_props.get("xc_id")
|
|
|
|
# Only update if xc_id has changed
|
|
if new_xc_id != existing_xc_id:
|
|
# Merge new xc_id with existing custom properties to preserve user settings
|
|
updated_custom_props = existing_custom_props.copy()
|
|
if new_xc_id is not None:
|
|
updated_custom_props["xc_id"] = new_xc_id
|
|
elif "xc_id" in updated_custom_props:
|
|
# Remove xc_id if it's no longer provided (e.g., converting from XC to standard)
|
|
del updated_custom_props["xc_id"]
|
|
|
|
existing_rel.custom_properties = updated_custom_props
|
|
existing_rel.last_seen = scan_start_time
|
|
existing_rel.is_stale = False
|
|
relations_to_update.append(existing_rel)
|
|
logger.debug(f"Updated xc_id for group '{group.name}' from '{existing_xc_id}' to '{new_xc_id}' - account {account.id}")
|
|
else:
|
|
# Update last_seen even if xc_id hasn't changed
|
|
existing_rel.last_seen = scan_start_time
|
|
existing_rel.is_stale = False
|
|
relations_to_update.append(existing_rel)
|
|
logger.debug(f"xc_id unchanged for group '{group.name}' - account {account.id}")
|
|
else:
|
|
# Create new relationship - this group is new to this M3U account
|
|
# Use the auto_enable setting to determine if it should start enabled
|
|
if not auto_enable_new_groups_live:
|
|
logger.info(f"Group '{group.name}' is new to account {account.id} - creating relationship but DISABLED (auto_enable_new_groups_live=False)")
|
|
|
|
relations_to_create.append(
|
|
ChannelGroupM3UAccount(
|
|
channel_group=group,
|
|
m3u_account=account,
|
|
custom_properties=custom_props,
|
|
enabled=auto_enable_new_groups_live,
|
|
last_seen=scan_start_time,
|
|
is_stale=False,
|
|
)
|
|
)
|
|
|
|
# Bulk create new relationships
|
|
if relations_to_create:
|
|
ChannelGroupM3UAccount.objects.bulk_create(relations_to_create, ignore_conflicts=True)
|
|
logger.debug(f"Created {len(relations_to_create)} new group relationships for account {account.id}")
|
|
|
|
# Bulk update existing relationships
|
|
if relations_to_update:
|
|
ChannelGroupM3UAccount.objects.bulk_update(relations_to_update, ['custom_properties', 'last_seen', 'is_stale'])
|
|
logger.info(f"Updated {len(relations_to_update)} existing group relationships for account {account.id}")
|
|
|
|
|
|
def cleanup_stale_group_relationships(account, scan_start_time):
|
|
"""
|
|
Remove group relationships that haven't been seen since the stale retention period.
|
|
This follows the same logic as stream cleanup for consistency.
|
|
"""
|
|
# Calculate cutoff date for stale group relationships
|
|
stale_cutoff = scan_start_time - timezone.timedelta(days=account.stale_stream_days)
|
|
logger.info(
|
|
f"Removing group relationships not seen since {stale_cutoff} for M3U account {account.id}"
|
|
)
|
|
|
|
# Find stale relationships
|
|
stale_relationships = ChannelGroupM3UAccount.objects.filter(
|
|
m3u_account=account,
|
|
last_seen__lt=stale_cutoff
|
|
).select_related('channel_group')
|
|
|
|
relations_to_delete = list(stale_relationships)
|
|
deleted_count = len(relations_to_delete)
|
|
|
|
if deleted_count > 0:
|
|
logger.info(
|
|
f"Found {deleted_count} stale group relationships for account {account.id}: "
|
|
f"{[rel.channel_group.name for rel in relations_to_delete]}"
|
|
)
|
|
|
|
# Delete the stale relationships
|
|
stale_relationships.delete()
|
|
|
|
# Check if any of the deleted relationships left groups with no remaining associations
|
|
orphaned_group_ids = []
|
|
for rel in relations_to_delete:
|
|
group = rel.channel_group
|
|
|
|
# Check if this group has any remaining M3U account relationships
|
|
remaining_m3u_relationships = ChannelGroupM3UAccount.objects.filter(
|
|
channel_group=group
|
|
).exists()
|
|
|
|
# Check if this group has any direct channels (not through M3U accounts)
|
|
has_direct_channels = group.related_channels().exists()
|
|
|
|
# If no relationships and no direct channels, it's safe to delete
|
|
if not remaining_m3u_relationships and not has_direct_channels:
|
|
orphaned_group_ids.append(group.id)
|
|
logger.debug(f"Group '{group.name}' has no remaining associations and will be deleted")
|
|
|
|
# Delete truly orphaned groups
|
|
if orphaned_group_ids:
|
|
deleted_groups = list(ChannelGroup.objects.filter(id__in=orphaned_group_ids).values_list('name', flat=True))
|
|
ChannelGroup.objects.filter(id__in=orphaned_group_ids).delete()
|
|
logger.info(f"Deleted {len(orphaned_group_ids)} orphaned groups that had no remaining associations: {deleted_groups}")
|
|
else:
|
|
logger.debug(f"No stale group relationships found for account {account.id}")
|
|
|
|
return deleted_count
|
|
|
|
|
|
def collect_xc_streams(account_id, enabled_groups):
|
|
"""Collect all XC streams in a single API call and filter by enabled groups."""
|
|
account = M3UAccount.objects.get(id=account_id)
|
|
all_streams = []
|
|
|
|
# Create a mapping from category_id to group info for filtering
|
|
enabled_category_ids = {}
|
|
for group_name, props in enabled_groups.items():
|
|
if "xc_id" in props:
|
|
enabled_category_ids[str(props["xc_id"])] = {
|
|
"name": group_name,
|
|
"props": props
|
|
}
|
|
|
|
try:
|
|
with XCClient(
|
|
account.server_url,
|
|
account.username,
|
|
account.password,
|
|
account.get_user_agent(),
|
|
) as xc_client:
|
|
|
|
# Fetch ALL live streams in a single API call (much more efficient)
|
|
logger.info("Fetching ALL live streams from XC provider...")
|
|
all_xc_streams = xc_client.get_all_live_streams() # Get all streams without category filter
|
|
|
|
if not all_xc_streams:
|
|
logger.warning("No live streams returned from XC provider")
|
|
return []
|
|
|
|
logger.info(f"Retrieved {len(all_xc_streams)} total live streams from provider")
|
|
|
|
# Filter streams based on enabled categories
|
|
filtered_count = 0
|
|
for stream in all_xc_streams:
|
|
# Get the category_id for this stream
|
|
category_id = str(stream.get("category_id", ""))
|
|
|
|
# Only include streams from enabled categories
|
|
if category_id in enabled_category_ids:
|
|
group_info = enabled_category_ids[category_id]
|
|
|
|
# Convert XC stream to our standard format with all properties preserved
|
|
stream_data = {
|
|
"name": stream["name"],
|
|
"url": xc_client.get_stream_url(stream["stream_id"]),
|
|
"attributes": {
|
|
"tvg-id": stream.get("epg_channel_id", ""),
|
|
"tvg-logo": stream.get("stream_icon", ""),
|
|
"group-title": group_info["name"],
|
|
# Preserve all XC stream properties as custom attributes
|
|
"stream_id": str(stream.get("stream_id", "")),
|
|
"category_id": category_id,
|
|
"stream_type": stream.get("stream_type", ""),
|
|
"added": stream.get("added", ""),
|
|
"is_adult": str(stream.get("is_adult", "0")),
|
|
"custom_sid": stream.get("custom_sid", ""),
|
|
# Include any other properties that might be present
|
|
**{k: str(v) for k, v in stream.items() if k not in [
|
|
"name", "stream_id", "epg_channel_id", "stream_icon",
|
|
"category_id", "stream_type", "added", "is_adult", "custom_sid"
|
|
] and v is not None}
|
|
}
|
|
}
|
|
all_streams.append(stream_data)
|
|
filtered_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to fetch XC streams: {str(e)}")
|
|
return []
|
|
|
|
logger.info(f"Filtered {filtered_count} streams from {len(enabled_category_ids)} enabled categories")
|
|
return all_streams
|
|
|
|
def process_xc_category_direct(account_id, batch, groups, hash_keys):
|
|
from django.db import connections
|
|
|
|
# Ensure clean database connections for threading
|
|
connections.close_all()
|
|
|
|
account = M3UAccount.objects.get(id=account_id)
|
|
|
|
streams_to_create = []
|
|
streams_to_update = []
|
|
stream_hashes = {}
|
|
|
|
try:
|
|
with XCClient(
|
|
account.server_url,
|
|
account.username,
|
|
account.password,
|
|
account.get_user_agent(),
|
|
) as xc_client:
|
|
# Log the batch details to help with debugging
|
|
logger.debug(f"Processing XC batch: {batch}")
|
|
|
|
for group_name, props in batch.items():
|
|
# Check if we have a valid xc_id for this group
|
|
if "xc_id" not in props:
|
|
logger.error(
|
|
f"Missing xc_id for group {group_name} in batch {batch}"
|
|
)
|
|
continue
|
|
|
|
# Get actual group ID from the mapping
|
|
group_id = groups.get(group_name)
|
|
if not group_id:
|
|
logger.error(f"Group {group_name} not found in enabled groups")
|
|
continue
|
|
|
|
try:
|
|
logger.debug(
|
|
f"Fetching streams for XC category: {group_name} (ID: {props['xc_id']})"
|
|
)
|
|
streams = xc_client.get_live_category_streams(props["xc_id"])
|
|
|
|
if not streams:
|
|
logger.warning(
|
|
f"No streams found for XC category {group_name} (ID: {props['xc_id']})"
|
|
)
|
|
continue
|
|
|
|
logger.debug(
|
|
f"Found {len(streams)} streams for category {group_name}"
|
|
)
|
|
|
|
for stream in streams:
|
|
name = stream["name"]
|
|
url = xc_client.get_stream_url(stream["stream_id"])
|
|
tvg_id = stream.get("epg_channel_id", "")
|
|
tvg_logo = stream.get("stream_icon", "")
|
|
group_title = group_name
|
|
|
|
stream_hash = Stream.generate_hash_key(
|
|
name, url, tvg_id, hash_keys, m3u_id=account_id, group=group_title
|
|
)
|
|
stream_props = {
|
|
"name": name,
|
|
"url": url,
|
|
"logo_url": tvg_logo,
|
|
"tvg_id": tvg_id,
|
|
"m3u_account": account,
|
|
"channel_group_id": int(group_id),
|
|
"stream_hash": stream_hash,
|
|
"custom_properties": stream,
|
|
"is_stale": False,
|
|
}
|
|
|
|
if stream_hash not in stream_hashes:
|
|
stream_hashes[stream_hash] = stream_props
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing XC category {group_name} (ID: {props['xc_id']}): {str(e)}"
|
|
)
|
|
continue
|
|
|
|
# Process all found streams
|
|
existing_streams = {
|
|
s.stream_hash: s
|
|
for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys()).select_related('m3u_account').only(
|
|
'id', 'stream_hash', 'name', 'url', 'logo_url', 'tvg_id', 'custom_properties', 'last_seen', 'updated_at', 'm3u_account'
|
|
)
|
|
}
|
|
|
|
for stream_hash, stream_props in stream_hashes.items():
|
|
if stream_hash in existing_streams:
|
|
obj = existing_streams[stream_hash]
|
|
# Optimized field comparison for XC streams
|
|
changed = (
|
|
obj.name != stream_props["name"] or
|
|
obj.url != stream_props["url"] or
|
|
obj.logo_url != stream_props["logo_url"] or
|
|
obj.tvg_id != stream_props["tvg_id"] or
|
|
obj.custom_properties != stream_props["custom_properties"]
|
|
)
|
|
|
|
if changed:
|
|
for key, value in stream_props.items():
|
|
setattr(obj, key, value)
|
|
obj.last_seen = timezone.now()
|
|
obj.updated_at = timezone.now() # Update timestamp only for changed streams
|
|
obj.is_stale = False
|
|
streams_to_update.append(obj)
|
|
else:
|
|
# Always update last_seen, even if nothing else changed
|
|
obj.last_seen = timezone.now()
|
|
obj.is_stale = False
|
|
# Don't update updated_at for unchanged streams
|
|
streams_to_update.append(obj)
|
|
|
|
# Remove from existing_streams since we've processed it
|
|
del existing_streams[stream_hash]
|
|
else:
|
|
stream_props["last_seen"] = timezone.now()
|
|
stream_props["updated_at"] = (
|
|
timezone.now()
|
|
) # Set initial updated_at for new streams
|
|
stream_props["is_stale"] = False
|
|
streams_to_create.append(Stream(**stream_props))
|
|
|
|
try:
|
|
with transaction.atomic():
|
|
if streams_to_create:
|
|
Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True)
|
|
|
|
if streams_to_update:
|
|
# Simplified bulk update for better performance
|
|
Stream.objects.bulk_update(
|
|
streams_to_update,
|
|
['name', 'url', 'logo_url', 'tvg_id', 'custom_properties', 'last_seen', 'updated_at', 'is_stale'],
|
|
batch_size=150 # Smaller batch size for XC processing
|
|
)
|
|
|
|
# Update last_seen for any remaining existing streams that weren't processed
|
|
if len(existing_streams.keys()) > 0:
|
|
Stream.objects.bulk_update(existing_streams.values(), ["last_seen"])
|
|
except Exception as e:
|
|
logger.error(f"Bulk operation failed for XC streams: {str(e)}")
|
|
|
|
retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
|
|
|
|
except Exception as e:
|
|
logger.error(f"XC category processing error: {str(e)}")
|
|
retval = f"Error processing XC batch: {str(e)}"
|
|
finally:
|
|
# Clean up database connections for threading
|
|
connections.close_all()
|
|
|
|
# Aggressive garbage collection
|
|
del streams_to_create, streams_to_update, stream_hashes, existing_streams
|
|
gc.collect()
|
|
|
|
return retval
|
|
|
|
|
|
def process_m3u_batch_direct(account_id, batch, groups, hash_keys):
|
|
"""Processes a batch of M3U streams using bulk operations with thread-safe DB connections."""
|
|
from django.db import connections
|
|
|
|
# Ensure clean database connections for threading
|
|
connections.close_all()
|
|
|
|
account = M3UAccount.objects.get(id=account_id)
|
|
|
|
compiled_filters = [
|
|
(
|
|
re.compile(
|
|
f.regex_pattern,
|
|
(
|
|
re.IGNORECASE
|
|
if (f.custom_properties or {}).get(
|
|
"case_sensitive", True
|
|
)
|
|
== False
|
|
else 0
|
|
),
|
|
),
|
|
f,
|
|
)
|
|
for f in account.filters.order_by("order")
|
|
]
|
|
|
|
streams_to_create = []
|
|
streams_to_update = []
|
|
stream_hashes = {}
|
|
|
|
logger.debug(f"Processing batch of {len(batch)} for M3U account {account_id}")
|
|
if compiled_filters:
|
|
logger.debug(f"Using compiled filters: {[f[1].regex_pattern for f in compiled_filters]}")
|
|
for stream_info in batch:
|
|
try:
|
|
name, url = stream_info["name"], stream_info["url"]
|
|
|
|
# Validate URL length - maximum of 4096 characters
|
|
if url and len(url) > 4096:
|
|
logger.warning(f"Skipping stream '{name}': URL too long ({len(url)} characters, max 4096)")
|
|
continue
|
|
|
|
tvg_id, tvg_logo = get_case_insensitive_attr(
|
|
stream_info["attributes"], "tvg-id", ""
|
|
), get_case_insensitive_attr(stream_info["attributes"], "tvg-logo", "")
|
|
group_title = get_case_insensitive_attr(
|
|
stream_info["attributes"], "group-title", "Default Group"
|
|
)
|
|
logger.debug(f"Processing stream: {name} - {url} in group {group_title}")
|
|
include = True
|
|
for pattern, filter in compiled_filters:
|
|
logger.trace(f"Checking filter pattern {pattern}")
|
|
target = name
|
|
if filter.filter_type == "url":
|
|
target = url
|
|
elif filter.filter_type == "group":
|
|
target = group_title
|
|
|
|
if pattern.search(target or ""):
|
|
logger.debug(
|
|
f"Stream {name} - {url} matches filter pattern {filter.regex_pattern}"
|
|
)
|
|
include = not filter.exclude
|
|
break
|
|
|
|
if not include:
|
|
logger.debug(f"Stream excluded by filter, skipping.")
|
|
continue
|
|
|
|
# Filter out disabled groups for this account
|
|
if group_title not in groups:
|
|
logger.debug(
|
|
f"Skipping stream in disabled or excluded group: {group_title}"
|
|
)
|
|
continue
|
|
|
|
stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys, m3u_id=account_id, group=group_title)
|
|
stream_props = {
|
|
"name": name,
|
|
"url": url,
|
|
"logo_url": tvg_logo,
|
|
"tvg_id": tvg_id,
|
|
"m3u_account": account,
|
|
"channel_group_id": int(groups.get(group_title)),
|
|
"stream_hash": stream_hash,
|
|
"custom_properties": stream_info["attributes"],
|
|
"is_stale": False,
|
|
}
|
|
|
|
if stream_hash not in stream_hashes:
|
|
stream_hashes[stream_hash] = stream_props
|
|
except Exception as e:
|
|
logger.error(f"Failed to process stream {name}: {e}")
|
|
logger.error(json.dumps(stream_info))
|
|
|
|
existing_streams = {
|
|
s.stream_hash: s
|
|
for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys()).select_related('m3u_account').only(
|
|
'id', 'stream_hash', 'name', 'url', 'logo_url', 'tvg_id', 'custom_properties', 'last_seen', 'updated_at', 'm3u_account'
|
|
)
|
|
}
|
|
|
|
for stream_hash, stream_props in stream_hashes.items():
|
|
if stream_hash in existing_streams:
|
|
obj = existing_streams[stream_hash]
|
|
# Optimized field comparison
|
|
changed = (
|
|
obj.name != stream_props["name"] or
|
|
obj.url != stream_props["url"] or
|
|
obj.logo_url != stream_props["logo_url"] or
|
|
obj.tvg_id != stream_props["tvg_id"] or
|
|
obj.custom_properties != stream_props["custom_properties"]
|
|
)
|
|
|
|
# Always update last_seen
|
|
obj.last_seen = timezone.now()
|
|
|
|
if changed:
|
|
# Only update fields that changed and set updated_at
|
|
obj.name = stream_props["name"]
|
|
obj.url = stream_props["url"]
|
|
obj.logo_url = stream_props["logo_url"]
|
|
obj.tvg_id = stream_props["tvg_id"]
|
|
obj.custom_properties = stream_props["custom_properties"]
|
|
obj.updated_at = timezone.now()
|
|
|
|
# Always mark as not stale since we saw it in this refresh
|
|
obj.is_stale = False
|
|
|
|
streams_to_update.append(obj)
|
|
else:
|
|
# New stream
|
|
stream_props["last_seen"] = timezone.now()
|
|
stream_props["updated_at"] = timezone.now()
|
|
stream_props["is_stale"] = False
|
|
streams_to_create.append(Stream(**stream_props))
|
|
|
|
try:
|
|
with transaction.atomic():
|
|
if streams_to_create:
|
|
Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True)
|
|
|
|
if streams_to_update:
|
|
# Update all streams in a single bulk operation
|
|
Stream.objects.bulk_update(
|
|
streams_to_update,
|
|
['name', 'url', 'logo_url', 'tvg_id', 'custom_properties', 'last_seen', 'updated_at', 'is_stale'],
|
|
batch_size=200
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Bulk operation failed: {str(e)}")
|
|
|
|
retval = f"M3U account: {account_id}, Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
|
|
|
|
# Aggressive garbage collection
|
|
# del streams_to_create, streams_to_update, stream_hashes, existing_streams
|
|
# from core.utils import cleanup_memory
|
|
# cleanup_memory(log_usage=True, force_collection=True)
|
|
|
|
# Clean up database connections for threading
|
|
connections.close_all()
|
|
|
|
return retval
|
|
|
|
|
|
def cleanup_streams(account_id, scan_start_time=timezone.now):
|
|
account = M3UAccount.objects.get(id=account_id, is_active=True)
|
|
existing_groups = ChannelGroup.objects.filter(
|
|
m3u_accounts__m3u_account=account,
|
|
m3u_accounts__enabled=True,
|
|
).values_list("id", flat=True)
|
|
logger.info(
|
|
f"Found {len(existing_groups)} active groups for M3U account {account_id}"
|
|
)
|
|
|
|
# Calculate cutoff date for stale streams
|
|
stale_cutoff = scan_start_time - timezone.timedelta(days=account.stale_stream_days)
|
|
logger.info(
|
|
f"Removing streams not seen since {stale_cutoff} for M3U account {account_id}"
|
|
)
|
|
|
|
# Delete streams that are not in active groups
|
|
streams_to_delete = Stream.objects.filter(m3u_account=account).exclude(
|
|
channel_group__in=existing_groups
|
|
)
|
|
|
|
# Also delete streams that haven't been seen for longer than stale_stream_days
|
|
stale_streams = Stream.objects.filter(
|
|
m3u_account=account, last_seen__lt=stale_cutoff
|
|
)
|
|
|
|
deleted_count = streams_to_delete.count()
|
|
stale_count = stale_streams.count()
|
|
|
|
streams_to_delete.delete()
|
|
stale_streams.delete()
|
|
|
|
total_deleted = deleted_count + stale_count
|
|
logger.info(
|
|
f"Cleanup for M3U account {account_id} complete: {deleted_count} streams removed due to group filter, {stale_count} removed as stale"
|
|
)
|
|
|
|
# Return the total count of deleted streams
|
|
return total_deleted
|
|
|
|
|
|
@shared_task
|
|
def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False, scan_start_time=None):
|
|
"""Refresh M3U groups for an account.
|
|
|
|
Args:
|
|
account_id: ID of the M3U account
|
|
use_cache: Whether to use cached M3U file
|
|
full_refresh: Whether this is part of a full refresh
|
|
scan_start_time: Timestamp when the scan started (for consistent last_seen marking)
|
|
"""
|
|
if not acquire_task_lock("refresh_m3u_account_groups", account_id):
|
|
return f"Task already running for account_id={account_id}.", None
|
|
|
|
try:
|
|
account = M3UAccount.objects.get(id=account_id, is_active=True)
|
|
except M3UAccount.DoesNotExist:
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return f"M3UAccount with ID={account_id} not found or inactive.", None
|
|
|
|
extinf_data = []
|
|
groups = {"Default Group": {}}
|
|
|
|
if account.account_type == M3UAccount.Types.XC:
|
|
# Log detailed information about the account
|
|
logger.info(
|
|
f"Processing XC account {account_id} with URL: {account.server_url}"
|
|
)
|
|
logger.debug(
|
|
f"Username: {account.username}, Has password: {'Yes' if account.password else 'No'}"
|
|
)
|
|
|
|
# Validate required fields
|
|
if not account.server_url:
|
|
error_msg = "Missing server URL for Xtream Codes account"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id, "processing_groups", 100, status="error", error=error_msg
|
|
)
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return error_msg, None
|
|
|
|
if not account.username or not account.password:
|
|
error_msg = "Missing username or password for Xtream Codes account"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id, "processing_groups", 100, status="error", error=error_msg
|
|
)
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return error_msg, None
|
|
|
|
try:
|
|
# Ensure server URL is properly formatted
|
|
server_url = account.server_url.rstrip("/")
|
|
if not (
|
|
server_url.startswith("http://") or server_url.startswith("https://")
|
|
):
|
|
server_url = f"http://{server_url}"
|
|
|
|
# User agent handling - completely rewritten
|
|
try:
|
|
# Debug the user agent issue
|
|
logger.debug(f"Getting user agent for account {account.id}")
|
|
|
|
# Use a hardcoded user agent string to avoid any issues with object structure
|
|
user_agent_string = (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
|
)
|
|
|
|
try:
|
|
# Try to get the user agent directly from the database
|
|
if account.user_agent_id:
|
|
ua_obj = UserAgent.objects.get(id=account.user_agent_id)
|
|
if (
|
|
ua_obj
|
|
and hasattr(ua_obj, "user_agent")
|
|
and ua_obj.user_agent
|
|
):
|
|
user_agent_string = ua_obj.user_agent
|
|
logger.debug(
|
|
f"Using user agent from account: {user_agent_string}"
|
|
)
|
|
else:
|
|
# Get default user agent from CoreSettings
|
|
default_ua_id = CoreSettings.get_default_user_agent_id()
|
|
logger.debug(
|
|
f"Default user agent ID from settings: {default_ua_id}"
|
|
)
|
|
if default_ua_id:
|
|
ua_obj = UserAgent.objects.get(id=default_ua_id)
|
|
if (
|
|
ua_obj
|
|
and hasattr(ua_obj, "user_agent")
|
|
and ua_obj.user_agent
|
|
):
|
|
user_agent_string = ua_obj.user_agent
|
|
logger.debug(
|
|
f"Using default user agent: {user_agent_string}"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(
|
|
f"Error getting user agent, using fallback: {str(e)}"
|
|
)
|
|
|
|
logger.debug(f"Final user agent string: {user_agent_string}")
|
|
except Exception as e:
|
|
user_agent_string = (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
|
)
|
|
logger.warning(
|
|
f"Exception in user agent handling, using fallback: {str(e)}"
|
|
)
|
|
|
|
logger.info(
|
|
f"Creating XCClient with URL: {account.server_url}, Username: {account.username}, User-Agent: {user_agent_string}"
|
|
)
|
|
|
|
# Create XCClient with explicit error handling
|
|
try:
|
|
with XCClient(
|
|
account.server_url, account.username, account.password, user_agent_string
|
|
) as xc_client:
|
|
logger.info(f"XCClient instance created successfully")
|
|
|
|
# Authenticate with detailed error handling
|
|
try:
|
|
logger.debug(f"Authenticating with XC server {server_url}")
|
|
auth_result = xc_client.authenticate()
|
|
logger.debug(f"Authentication response: {auth_result}")
|
|
|
|
# Queue async profile refresh task to run in background
|
|
# This prevents any delay in the main refresh process
|
|
try:
|
|
logger.info(f"Queueing background profile refresh for account {account.name}")
|
|
refresh_account_profiles.delay(account.id)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to queue profile refresh task: {str(e)}")
|
|
# Don't fail the main refresh if profile refresh can't be queued
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to authenticate with XC server: {str(e)}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id,
|
|
"processing_groups",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return error_msg, None
|
|
|
|
# Get categories with detailed error handling
|
|
try:
|
|
logger.info(f"Getting live categories from XC server")
|
|
xc_categories = xc_client.get_live_categories()
|
|
logger.info(
|
|
f"Found {len(xc_categories)} categories: {xc_categories}"
|
|
)
|
|
|
|
# Validate response
|
|
if not isinstance(xc_categories, list):
|
|
error_msg = (
|
|
f"Unexpected response from XC server: {xc_categories}"
|
|
)
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id,
|
|
"processing_groups",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return error_msg, None
|
|
|
|
if len(xc_categories) == 0:
|
|
logger.warning("No categories found in XC server response")
|
|
|
|
for category in xc_categories:
|
|
cat_name = category.get("category_name", "Unknown Category")
|
|
cat_id = category.get("category_id", "0")
|
|
logger.info(f"Adding category: {cat_name} (ID: {cat_id})")
|
|
groups[cat_name] = {
|
|
"xc_id": cat_id,
|
|
}
|
|
except Exception as e:
|
|
error_msg = f"Failed to get categories from XC server: {str(e)}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id,
|
|
"processing_groups",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return error_msg, None
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to create XC Client: {str(e)}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id,
|
|
"processing_groups",
|
|
100,
|
|
status="error",
|
|
error=error_msg,
|
|
)
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return error_msg, None
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error occurred in XC Client: {str(e)}"
|
|
logger.error(error_msg)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = error_msg
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id, "processing_groups", 100, status="error", error=error_msg
|
|
)
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return error_msg, None
|
|
else:
|
|
# Here's the key change - use the success flag from fetch_m3u_lines
|
|
lines, success = fetch_m3u_lines(account, use_cache)
|
|
if not success:
|
|
# If fetch failed, don't continue processing
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
return f"Failed to fetch M3U data for account_id={account_id}.", None
|
|
|
|
# Log basic file structure for debugging
|
|
logger.debug(f"Processing {len(lines)} lines from M3U file")
|
|
|
|
line_count = 0
|
|
extinf_count = 0
|
|
url_count = 0
|
|
valid_stream_count = 0
|
|
problematic_lines = []
|
|
|
|
for line_index, line in enumerate(lines):
|
|
line_count += 1
|
|
line = line.strip()
|
|
|
|
if line.startswith("#EXTINF"):
|
|
extinf_count += 1
|
|
parsed = parse_extinf_line(line)
|
|
if parsed:
|
|
group_title_attr = get_case_insensitive_attr(
|
|
parsed["attributes"], "group-title", ""
|
|
)
|
|
if group_title_attr:
|
|
group_name = group_title_attr
|
|
# Log new groups as they're discovered
|
|
if group_name not in groups:
|
|
logger.debug(
|
|
f"Found new group for M3U account {account_id}: '{group_name}'"
|
|
)
|
|
groups[group_name] = {}
|
|
|
|
extinf_data.append(parsed)
|
|
else:
|
|
# Log problematic EXTINF lines
|
|
logger.warning(
|
|
f"Failed to parse EXTINF at line {line_index+1}: {line[:200]}"
|
|
)
|
|
problematic_lines.append((line_index + 1, line[:200]))
|
|
|
|
elif extinf_data and (line.startswith("http") or line.startswith("rtsp") or line.startswith("rtp") or line.startswith("udp")):
|
|
url_count += 1
|
|
# Normalize UDP URLs only (e.g., remove VLC-specific @ prefix)
|
|
normalized_url = normalize_stream_url(line) if line.startswith("udp") else line
|
|
# Associate URL with the last EXTINF line
|
|
extinf_data[-1]["url"] = normalized_url
|
|
valid_stream_count += 1
|
|
|
|
# Periodically log progress for large files
|
|
if valid_stream_count % 1000 == 0:
|
|
logger.debug(
|
|
f"Processed {valid_stream_count} valid streams so far for M3U account: {account_id}"
|
|
)
|
|
|
|
# Log summary statistics
|
|
logger.info(
|
|
f"M3U parsing complete - Lines: {line_count}, EXTINF: {extinf_count}, URLs: {url_count}, Valid streams: {valid_stream_count}"
|
|
)
|
|
|
|
if problematic_lines:
|
|
logger.warning(
|
|
f"Found {len(problematic_lines)} problematic lines during parsing"
|
|
)
|
|
for i, (line_num, content) in enumerate(
|
|
problematic_lines[:10]
|
|
): # Log max 10 examples
|
|
logger.warning(f"Problematic line #{i+1} at line {line_num}: {content}")
|
|
if len(problematic_lines) > 10:
|
|
logger.warning(
|
|
f"... and {len(problematic_lines) - 10} more problematic lines"
|
|
)
|
|
|
|
# Log group statistics
|
|
logger.info(
|
|
f"Found {len(groups)} groups in M3U file: {', '.join(list(groups.keys())[:20])}"
|
|
+ ("..." if len(groups) > 20 else "")
|
|
)
|
|
|
|
# Cache processed data
|
|
cache_path = os.path.join(m3u_dir, f"{account_id}.json")
|
|
with open(cache_path, "w", encoding="utf-8") as f:
|
|
json.dump(
|
|
{
|
|
"extinf_data": extinf_data,
|
|
"groups": groups,
|
|
},
|
|
f,
|
|
)
|
|
logger.debug(f"Cached parsed M3U data to {cache_path}")
|
|
|
|
send_m3u_update(account_id, "processing_groups", 0)
|
|
|
|
process_groups(account, groups, scan_start_time)
|
|
|
|
release_task_lock("refresh_m3u_account_groups", account_id)
|
|
|
|
if not full_refresh:
|
|
# Use update() instead of save() to avoid triggering signals
|
|
M3UAccount.objects.filter(id=account_id).update(
|
|
status=M3UAccount.Status.PENDING_SETUP,
|
|
last_message="M3U groups loaded. Please select groups or refresh M3U to complete setup.",
|
|
)
|
|
send_m3u_update(
|
|
account_id,
|
|
"processing_groups",
|
|
100,
|
|
status="pending_setup",
|
|
message="M3U groups loaded. Please select groups or refresh M3U to complete setup.",
|
|
)
|
|
|
|
return extinf_data, groups
|
|
|
|
|
|
def delete_m3u_refresh_task_by_id(account_id):
|
|
"""
|
|
Delete the periodic task associated with an M3U account ID.
|
|
Can be called directly or from the post_delete signal.
|
|
Returns True if a task was found and deleted, False otherwise.
|
|
"""
|
|
try:
|
|
task = None
|
|
task_name = f"m3u_account-refresh-{account_id}"
|
|
|
|
# Look for task by name
|
|
try:
|
|
from django_celery_beat.models import PeriodicTask, IntervalSchedule
|
|
|
|
task = PeriodicTask.objects.get(name=task_name)
|
|
logger.debug(f"Found task by name: {task.id} for M3UAccount {account_id}")
|
|
except PeriodicTask.DoesNotExist:
|
|
logger.warning(f"No PeriodicTask found with name {task_name}")
|
|
return False
|
|
|
|
# Now delete the task and its interval
|
|
if task:
|
|
# Store interval info before deleting the task
|
|
interval_id = None
|
|
if hasattr(task, "interval") and task.interval:
|
|
interval_id = task.interval.id
|
|
|
|
# Count how many TOTAL tasks use this interval (including this one)
|
|
tasks_with_same_interval = PeriodicTask.objects.filter(
|
|
interval_id=interval_id
|
|
).count()
|
|
logger.debug(
|
|
f"Interval {interval_id} is used by {tasks_with_same_interval} tasks total"
|
|
)
|
|
|
|
# Delete the task first
|
|
task_id = task.id
|
|
task.delete()
|
|
logger.debug(f"Successfully deleted periodic task {task_id}")
|
|
|
|
# Now check if we should delete the interval
|
|
# We only delete if it was the ONLY task using this interval
|
|
if interval_id and tasks_with_same_interval == 1:
|
|
try:
|
|
interval = IntervalSchedule.objects.get(id=interval_id)
|
|
logger.debug(
|
|
f"Deleting interval schedule {interval_id} (not shared with other tasks)"
|
|
)
|
|
interval.delete()
|
|
logger.debug(f"Successfully deleted interval {interval_id}")
|
|
except IntervalSchedule.DoesNotExist:
|
|
logger.warning(f"Interval {interval_id} no longer exists")
|
|
elif interval_id:
|
|
logger.debug(
|
|
f"Not deleting interval {interval_id} as it's shared with {tasks_with_same_interval-1} other tasks"
|
|
)
|
|
|
|
return True
|
|
return False
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error deleting periodic task for M3UAccount {account_id}: {str(e)}",
|
|
exc_info=True,
|
|
)
|
|
return False
|
|
|
|
|
|
@shared_task
|
|
def sync_auto_channels(account_id, scan_start_time=None):
|
|
"""
|
|
Automatically create/update/delete channels to match streams in groups with auto_channel_sync enabled.
|
|
Preserves existing channel UUIDs to maintain M3U link integrity.
|
|
Called after M3U refresh completes successfully.
|
|
"""
|
|
from apps.channels.models import (
|
|
Channel,
|
|
ChannelGroup,
|
|
ChannelGroupM3UAccount,
|
|
Stream,
|
|
ChannelStream,
|
|
)
|
|
from apps.epg.models import EPGData
|
|
from django.utils import timezone
|
|
|
|
try:
|
|
account = M3UAccount.objects.get(id=account_id)
|
|
logger.info(f"Starting auto channel sync for M3U account {account.name}")
|
|
|
|
# Always use scan_start_time as the cutoff for last_seen
|
|
if scan_start_time is not None:
|
|
if isinstance(scan_start_time, str):
|
|
scan_start_time = timezone.datetime.fromisoformat(scan_start_time)
|
|
else:
|
|
scan_start_time = timezone.now()
|
|
|
|
# Get groups with auto sync enabled for this account
|
|
auto_sync_groups = ChannelGroupM3UAccount.objects.filter(
|
|
m3u_account=account, enabled=True, auto_channel_sync=True
|
|
).select_related("channel_group")
|
|
|
|
channels_created = 0
|
|
channels_updated = 0
|
|
channels_deleted = 0
|
|
|
|
for group_relation in auto_sync_groups:
|
|
channel_group = group_relation.channel_group
|
|
start_number = group_relation.auto_sync_channel_start or 1.0
|
|
|
|
# Get force_dummy_epg, group_override, and regex patterns from group custom_properties
|
|
group_custom_props = {}
|
|
force_dummy_epg = False # Backward compatibility: legacy option to disable EPG
|
|
override_group_id = None
|
|
name_regex_pattern = None
|
|
name_replace_pattern = None
|
|
name_match_regex = None
|
|
channel_profile_ids = None
|
|
channel_sort_order = None
|
|
channel_sort_reverse = False
|
|
stream_profile_id = None
|
|
custom_logo_id = None
|
|
custom_epg_id = None # New option: select specific EPG source (takes priority over force_dummy_epg)
|
|
if group_relation.custom_properties:
|
|
group_custom_props = group_relation.custom_properties
|
|
force_dummy_epg = group_custom_props.get("force_dummy_epg", False)
|
|
override_group_id = group_custom_props.get("group_override")
|
|
name_regex_pattern = group_custom_props.get("name_regex_pattern")
|
|
name_replace_pattern = group_custom_props.get(
|
|
"name_replace_pattern"
|
|
)
|
|
name_match_regex = group_custom_props.get("name_match_regex")
|
|
channel_profile_ids = group_custom_props.get("channel_profile_ids")
|
|
custom_epg_id = group_custom_props.get("custom_epg_id")
|
|
channel_sort_order = group_custom_props.get("channel_sort_order")
|
|
channel_sort_reverse = group_custom_props.get(
|
|
"channel_sort_reverse", False
|
|
)
|
|
stream_profile_id = group_custom_props.get("stream_profile_id")
|
|
custom_logo_id = group_custom_props.get("custom_logo_id")
|
|
|
|
# Determine which group to use for created channels
|
|
target_group = channel_group
|
|
if override_group_id:
|
|
try:
|
|
target_group = ChannelGroup.objects.get(id=override_group_id)
|
|
logger.info(
|
|
f"Using override group '{target_group.name}' instead of '{channel_group.name}' for auto-created channels"
|
|
)
|
|
except ChannelGroup.DoesNotExist:
|
|
logger.warning(
|
|
f"Override group with ID {override_group_id} not found, using original group '{channel_group.name}'"
|
|
)
|
|
|
|
logger.info(
|
|
f"Processing auto sync for group: {channel_group.name} (start: {start_number})"
|
|
)
|
|
|
|
# Get all current streams in this group for this M3U account, filter out stale streams
|
|
current_streams = Stream.objects.filter(
|
|
m3u_account=account,
|
|
channel_group=channel_group,
|
|
last_seen__gte=scan_start_time,
|
|
)
|
|
|
|
# --- FILTER STREAMS BY NAME MATCH REGEX IF SPECIFIED ---
|
|
if name_match_regex:
|
|
try:
|
|
current_streams = current_streams.filter(
|
|
name__iregex=name_match_regex
|
|
)
|
|
except re.error as e:
|
|
logger.warning(
|
|
f"Invalid name_match_regex '{name_match_regex}' for group '{channel_group.name}': {e}. Skipping name filter."
|
|
)
|
|
|
|
# --- APPLY CHANNEL SORT ORDER ---
|
|
streams_is_list = False # Track if we converted to list
|
|
if channel_sort_order and channel_sort_order != "":
|
|
if channel_sort_order == "name":
|
|
# Use natural sorting for names to handle numbers correctly
|
|
current_streams = list(current_streams)
|
|
current_streams.sort(
|
|
key=lambda stream: natural_sort_key(stream.name),
|
|
reverse=channel_sort_reverse,
|
|
)
|
|
streams_is_list = True
|
|
elif channel_sort_order == "tvg_id":
|
|
order_prefix = "-" if channel_sort_reverse else ""
|
|
current_streams = current_streams.order_by(f"{order_prefix}tvg_id")
|
|
elif channel_sort_order == "updated_at":
|
|
order_prefix = "-" if channel_sort_reverse else ""
|
|
current_streams = current_streams.order_by(
|
|
f"{order_prefix}updated_at"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Unknown channel_sort_order '{channel_sort_order}' for group '{channel_group.name}'. Using provider order."
|
|
)
|
|
order_prefix = "-" if channel_sort_reverse else ""
|
|
current_streams = current_streams.order_by(f"{order_prefix}id")
|
|
else:
|
|
# Provider order (default) - can still be reversed
|
|
order_prefix = "-" if channel_sort_reverse else ""
|
|
current_streams = current_streams.order_by(f"{order_prefix}id")
|
|
|
|
# Get existing auto-created channels for this account (regardless of current group)
|
|
# We'll find them by their stream associations instead of just group location
|
|
existing_channels = Channel.objects.filter(
|
|
auto_created=True, auto_created_by=account
|
|
).select_related("logo", "epg_data")
|
|
|
|
# Create mapping of existing channels by their associated stream
|
|
# This approach finds channels even if they've been moved to different groups
|
|
existing_channel_map = {}
|
|
for channel in existing_channels:
|
|
# Get streams associated with this channel that belong to our M3U account and original group
|
|
channel_streams = ChannelStream.objects.filter(
|
|
channel=channel,
|
|
stream__m3u_account=account,
|
|
stream__channel_group=channel_group, # Match streams from the original group
|
|
).select_related("stream")
|
|
|
|
# Map each of our M3U account's streams to this channel
|
|
for channel_stream in channel_streams:
|
|
if channel_stream.stream:
|
|
existing_channel_map[channel_stream.stream.id] = channel
|
|
|
|
# Track which streams we've processed
|
|
processed_stream_ids = set()
|
|
|
|
# Check if we have streams - handle both QuerySet and list cases
|
|
has_streams = (
|
|
len(current_streams) > 0
|
|
if streams_is_list
|
|
else current_streams.exists()
|
|
)
|
|
|
|
if not has_streams:
|
|
logger.debug(f"No streams found in group {channel_group.name}")
|
|
# Delete all existing auto channels if no streams
|
|
channels_to_delete = [ch for ch in existing_channel_map.values()]
|
|
if channels_to_delete:
|
|
deleted_count = len(channels_to_delete)
|
|
Channel.objects.filter(
|
|
id__in=[ch.id for ch in channels_to_delete]
|
|
).delete()
|
|
channels_deleted += deleted_count
|
|
logger.debug(
|
|
f"Deleted {deleted_count} auto channels (no streams remaining)"
|
|
)
|
|
continue
|
|
|
|
# Prepare profiles to assign to new channels
|
|
from apps.channels.models import ChannelProfile, ChannelProfileMembership
|
|
|
|
if (
|
|
channel_profile_ids
|
|
and isinstance(channel_profile_ids, list)
|
|
and len(channel_profile_ids) > 0
|
|
):
|
|
# Convert all to int (in case they're strings)
|
|
try:
|
|
profile_ids = [int(pid) for pid in channel_profile_ids]
|
|
except Exception:
|
|
profile_ids = []
|
|
profiles_to_assign = list(
|
|
ChannelProfile.objects.filter(id__in=profile_ids)
|
|
)
|
|
else:
|
|
profiles_to_assign = list(ChannelProfile.objects.all())
|
|
|
|
# Get stream profile to assign if specified
|
|
from core.models import StreamProfile
|
|
stream_profile_to_assign = None
|
|
if stream_profile_id:
|
|
try:
|
|
stream_profile_to_assign = StreamProfile.objects.get(id=int(stream_profile_id))
|
|
logger.info(
|
|
f"Will assign stream profile '{stream_profile_to_assign.name}' to auto-synced streams in group '{channel_group.name}'"
|
|
)
|
|
except (StreamProfile.DoesNotExist, ValueError, TypeError):
|
|
logger.warning(
|
|
f"Stream profile with ID {stream_profile_id} not found for group '{channel_group.name}', streams will use default profile"
|
|
)
|
|
stream_profile_to_assign = None
|
|
|
|
# Process each current stream
|
|
current_channel_number = start_number
|
|
|
|
# Always renumber all existing channels to match current sort order
|
|
# This ensures channels are always in the correct sequence
|
|
channels_to_renumber = []
|
|
temp_channel_number = start_number
|
|
|
|
# Get all channel numbers that are already in use by other channels (not auto-created by this account)
|
|
used_numbers = set(
|
|
Channel.objects.exclude(
|
|
auto_created=True, auto_created_by=account
|
|
).values_list("channel_number", flat=True)
|
|
)
|
|
|
|
for stream in current_streams:
|
|
if stream.id in existing_channel_map:
|
|
channel = existing_channel_map[stream.id]
|
|
|
|
# Find next available number starting from temp_channel_number
|
|
target_number = temp_channel_number
|
|
while target_number in used_numbers:
|
|
target_number += 1
|
|
|
|
# Add this number to used_numbers so we don't reuse it in this batch
|
|
used_numbers.add(target_number)
|
|
|
|
if channel.channel_number != target_number:
|
|
channel.channel_number = target_number
|
|
channels_to_renumber.append(channel)
|
|
logger.debug(
|
|
f"Will renumber channel '{channel.name}' to {target_number}"
|
|
)
|
|
|
|
temp_channel_number += 1.0
|
|
if temp_channel_number % 1 != 0: # Has decimal
|
|
temp_channel_number = int(temp_channel_number) + 1.0
|
|
|
|
# Bulk update channel numbers if any need renumbering
|
|
if channels_to_renumber:
|
|
Channel.objects.bulk_update(channels_to_renumber, ["channel_number"])
|
|
logger.info(
|
|
f"Renumbered {len(channels_to_renumber)} channels to maintain sort order"
|
|
)
|
|
|
|
# Reset channel number counter for processing new channels
|
|
current_channel_number = start_number
|
|
|
|
for stream in current_streams:
|
|
processed_stream_ids.add(stream.id)
|
|
try:
|
|
# Parse custom properties for additional info
|
|
stream_custom_props = stream.custom_properties or {}
|
|
tvc_guide_stationid = stream_custom_props.get("tvc-guide-stationid")
|
|
|
|
# --- REGEX FIND/REPLACE LOGIC ---
|
|
original_name = stream.name
|
|
new_name = original_name
|
|
if name_regex_pattern is not None:
|
|
# If replace is None, treat as empty string (remove match)
|
|
replace = (
|
|
name_replace_pattern
|
|
if name_replace_pattern is not None
|
|
else ""
|
|
)
|
|
try:
|
|
# Convert $1, $2, etc. to \1, \2, etc. for consistency with M3U profiles
|
|
safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', replace)
|
|
new_name = re.sub(
|
|
name_regex_pattern, safe_replace_pattern, original_name
|
|
)
|
|
except re.error as e:
|
|
logger.warning(
|
|
f"Regex error for group '{channel_group.name}': {e}. Using original name."
|
|
)
|
|
new_name = original_name
|
|
|
|
# Check if we already have a channel for this stream
|
|
existing_channel = existing_channel_map.get(stream.id)
|
|
|
|
if existing_channel:
|
|
# Update existing channel if needed (channel number already handled above)
|
|
channel_updated = False
|
|
|
|
# Use new_name instead of stream.name
|
|
if existing_channel.name != new_name:
|
|
existing_channel.name = new_name
|
|
channel_updated = True
|
|
|
|
if existing_channel.tvg_id != stream.tvg_id:
|
|
existing_channel.tvg_id = stream.tvg_id
|
|
channel_updated = True
|
|
|
|
if existing_channel.tvc_guide_stationid != tvc_guide_stationid:
|
|
existing_channel.tvc_guide_stationid = tvc_guide_stationid
|
|
channel_updated = True
|
|
|
|
# Check if channel group needs to be updated (in case override was added/changed)
|
|
if existing_channel.channel_group != target_group:
|
|
existing_channel.channel_group = target_group
|
|
channel_updated = True
|
|
logger.info(
|
|
f"Moved auto channel '{existing_channel.name}' from '{existing_channel.channel_group.name if existing_channel.channel_group else 'None'}' to '{target_group.name}'"
|
|
)
|
|
|
|
# Handle logo updates
|
|
current_logo = None
|
|
if custom_logo_id:
|
|
# Use the custom logo specified in group settings
|
|
from apps.channels.models import Logo
|
|
try:
|
|
current_logo = Logo.objects.get(id=custom_logo_id)
|
|
except Logo.DoesNotExist:
|
|
logger.warning(
|
|
f"Custom logo with ID {custom_logo_id} not found for existing channel, falling back to stream logo"
|
|
)
|
|
# Fall back to stream logo if custom logo not found
|
|
if stream.logo_url:
|
|
current_logo, _ = Logo.objects.get_or_create(
|
|
url=stream.logo_url,
|
|
defaults={
|
|
"name": stream.name or stream.tvg_id or "Unknown"
|
|
},
|
|
)
|
|
elif stream.logo_url:
|
|
# No custom logo configured, use stream logo
|
|
from apps.channels.models import Logo
|
|
|
|
current_logo, _ = Logo.objects.get_or_create(
|
|
url=stream.logo_url,
|
|
defaults={
|
|
"name": stream.name or stream.tvg_id or "Unknown"
|
|
},
|
|
)
|
|
|
|
if existing_channel.logo != current_logo:
|
|
existing_channel.logo = current_logo
|
|
channel_updated = True
|
|
|
|
# Handle EPG data updates
|
|
current_epg_data = None
|
|
if custom_epg_id:
|
|
# Use the custom EPG specified in group settings (e.g., a dummy EPG)
|
|
from apps.epg.models import EPGSource
|
|
try:
|
|
epg_source = EPGSource.objects.get(id=custom_epg_id)
|
|
# For dummy EPGs, select the first (and typically only) EPGData entry from this source
|
|
if epg_source.source_type == 'dummy':
|
|
current_epg_data = EPGData.objects.filter(
|
|
epg_source=epg_source
|
|
).first()
|
|
if not current_epg_data:
|
|
logger.warning(
|
|
f"No EPGData found for dummy EPG source {epg_source.name} (ID: {custom_epg_id})"
|
|
)
|
|
else:
|
|
# For non-dummy sources, try to find existing EPGData by tvg_id
|
|
if stream.tvg_id:
|
|
current_epg_data = EPGData.objects.filter(
|
|
tvg_id=stream.tvg_id,
|
|
epg_source=epg_source
|
|
).first()
|
|
except EPGSource.DoesNotExist:
|
|
logger.warning(
|
|
f"Custom EPG source with ID {custom_epg_id} not found for existing channel, falling back to auto-match"
|
|
)
|
|
# Fall back to auto-match by tvg_id
|
|
if stream.tvg_id and not force_dummy_epg:
|
|
current_epg_data = EPGData.objects.filter(
|
|
tvg_id=stream.tvg_id
|
|
).first()
|
|
elif stream.tvg_id and not force_dummy_epg:
|
|
# Auto-match EPG by tvg_id (original behavior)
|
|
current_epg_data = EPGData.objects.filter(
|
|
tvg_id=stream.tvg_id
|
|
).first()
|
|
# If force_dummy_epg is True and no custom_epg_id, current_epg_data stays None
|
|
|
|
if existing_channel.epg_data != current_epg_data:
|
|
existing_channel.epg_data = current_epg_data
|
|
channel_updated = True
|
|
|
|
# Handle stream profile updates for the channel
|
|
if stream_profile_to_assign and existing_channel.stream_profile != stream_profile_to_assign:
|
|
existing_channel.stream_profile = stream_profile_to_assign
|
|
channel_updated = True
|
|
|
|
if channel_updated:
|
|
existing_channel.save()
|
|
channels_updated += 1
|
|
logger.debug(
|
|
f"Updated auto channel: {existing_channel.channel_number} - {existing_channel.name}"
|
|
)
|
|
|
|
# Update channel profile memberships for existing channels
|
|
current_memberships = set(
|
|
ChannelProfileMembership.objects.filter(
|
|
channel=existing_channel, enabled=True
|
|
).values_list("channel_profile_id", flat=True)
|
|
)
|
|
|
|
target_profile_ids = set(
|
|
profile.id for profile in profiles_to_assign
|
|
)
|
|
|
|
# Only update if memberships have changed
|
|
if current_memberships != target_profile_ids:
|
|
# Disable all current memberships
|
|
ChannelProfileMembership.objects.filter(
|
|
channel=existing_channel
|
|
).update(enabled=False)
|
|
|
|
# Enable/create memberships for target profiles
|
|
for profile in profiles_to_assign:
|
|
membership, created = (
|
|
ChannelProfileMembership.objects.get_or_create(
|
|
channel_profile=profile,
|
|
channel=existing_channel,
|
|
defaults={"enabled": True},
|
|
)
|
|
)
|
|
if not created and not membership.enabled:
|
|
membership.enabled = True
|
|
membership.save()
|
|
|
|
logger.debug(
|
|
f"Updated profile memberships for auto channel: {existing_channel.name}"
|
|
)
|
|
|
|
else:
|
|
# Create new channel
|
|
# Find next available channel number
|
|
target_number = current_channel_number
|
|
while target_number in used_numbers:
|
|
target_number += 1
|
|
|
|
# Add this number to used_numbers
|
|
used_numbers.add(target_number)
|
|
|
|
channel = Channel.objects.create(
|
|
channel_number=target_number,
|
|
name=new_name,
|
|
tvg_id=stream.tvg_id,
|
|
tvc_guide_stationid=tvc_guide_stationid,
|
|
channel_group=target_group,
|
|
user_level=0,
|
|
auto_created=True,
|
|
auto_created_by=account,
|
|
)
|
|
|
|
# Associate the stream with the channel
|
|
ChannelStream.objects.create(
|
|
channel=channel, stream=stream, order=0
|
|
)
|
|
|
|
# Assign to correct profiles
|
|
memberships = [
|
|
ChannelProfileMembership(
|
|
channel_profile=profile, channel=channel, enabled=True
|
|
)
|
|
for profile in profiles_to_assign
|
|
]
|
|
if memberships:
|
|
ChannelProfileMembership.objects.bulk_create(memberships)
|
|
|
|
# Try to match EPG data
|
|
if custom_epg_id:
|
|
# Use the custom EPG specified in group settings (e.g., a dummy EPG)
|
|
from apps.epg.models import EPGSource
|
|
try:
|
|
epg_source = EPGSource.objects.get(id=custom_epg_id)
|
|
# For dummy EPGs, select the first (and typically only) EPGData entry from this source
|
|
if epg_source.source_type == 'dummy':
|
|
epg_data = EPGData.objects.filter(
|
|
epg_source=epg_source
|
|
).first()
|
|
if epg_data:
|
|
channel.epg_data = epg_data
|
|
channel.save(update_fields=["epg_data"])
|
|
else:
|
|
logger.warning(
|
|
f"No EPGData found for dummy EPG source {epg_source.name} (ID: {custom_epg_id})"
|
|
)
|
|
else:
|
|
# For non-dummy sources, try to find existing EPGData by tvg_id
|
|
if stream.tvg_id:
|
|
epg_data = EPGData.objects.filter(
|
|
tvg_id=stream.tvg_id,
|
|
epg_source=epg_source
|
|
).first()
|
|
if epg_data:
|
|
channel.epg_data = epg_data
|
|
channel.save(update_fields=["epg_data"])
|
|
except EPGSource.DoesNotExist:
|
|
logger.warning(
|
|
f"Custom EPG source with ID {custom_epg_id} not found, falling back to auto-match"
|
|
)
|
|
# Fall back to auto-match by tvg_id
|
|
if stream.tvg_id and not force_dummy_epg:
|
|
epg_data = EPGData.objects.filter(
|
|
tvg_id=stream.tvg_id
|
|
).first()
|
|
if epg_data:
|
|
channel.epg_data = epg_data
|
|
channel.save(update_fields=["epg_data"])
|
|
elif stream.tvg_id and not force_dummy_epg:
|
|
# Auto-match EPG by tvg_id (original behavior)
|
|
epg_data = EPGData.objects.filter(
|
|
tvg_id=stream.tvg_id
|
|
).first()
|
|
if epg_data:
|
|
channel.epg_data = epg_data
|
|
channel.save(update_fields=["epg_data"])
|
|
elif force_dummy_epg:
|
|
# Force dummy EPG with no custom EPG selected (set to None)
|
|
channel.epg_data = None
|
|
channel.save(update_fields=["epg_data"])
|
|
|
|
# Handle logo
|
|
if custom_logo_id:
|
|
# Use the custom logo specified in group settings
|
|
from apps.channels.models import Logo
|
|
try:
|
|
custom_logo = Logo.objects.get(id=custom_logo_id)
|
|
channel.logo = custom_logo
|
|
channel.save(update_fields=["logo"])
|
|
except Logo.DoesNotExist:
|
|
logger.warning(
|
|
f"Custom logo with ID {custom_logo_id} not found, falling back to stream logo"
|
|
)
|
|
# Fall back to stream logo if custom logo not found
|
|
if stream.logo_url:
|
|
logo, _ = Logo.objects.get_or_create(
|
|
url=stream.logo_url,
|
|
defaults={
|
|
"name": stream.name or stream.tvg_id or "Unknown"
|
|
},
|
|
)
|
|
channel.logo = logo
|
|
channel.save(update_fields=["logo"])
|
|
elif stream.logo_url:
|
|
from apps.channels.models import Logo
|
|
|
|
logo, _ = Logo.objects.get_or_create(
|
|
url=stream.logo_url,
|
|
defaults={
|
|
"name": stream.name or stream.tvg_id or "Unknown"
|
|
},
|
|
)
|
|
channel.logo = logo
|
|
channel.save(update_fields=["logo"])
|
|
|
|
# Handle stream profile assignment
|
|
if stream_profile_to_assign:
|
|
channel.stream_profile = stream_profile_to_assign
|
|
channel.save(update_fields=['stream_profile'])
|
|
channels_created += 1
|
|
logger.debug(
|
|
f"Created auto channel: {channel.channel_number} - {channel.name}"
|
|
)
|
|
|
|
# Increment channel number for next iteration
|
|
current_channel_number += 1.0
|
|
if current_channel_number % 1 != 0: # Has decimal
|
|
current_channel_number = int(current_channel_number) + 1.0
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error processing auto channel for stream {stream.name}: {str(e)}"
|
|
)
|
|
continue
|
|
|
|
# Delete channels for streams that no longer exist
|
|
channels_to_delete = []
|
|
for stream_id, channel in existing_channel_map.items():
|
|
if stream_id not in processed_stream_ids:
|
|
channels_to_delete.append(channel)
|
|
|
|
if channels_to_delete:
|
|
deleted_count = len(channels_to_delete)
|
|
Channel.objects.filter(
|
|
id__in=[ch.id for ch in channels_to_delete]
|
|
).delete()
|
|
channels_deleted += deleted_count
|
|
logger.debug(
|
|
f"Deleted {deleted_count} auto channels for removed streams"
|
|
)
|
|
|
|
# Additional cleanup: Remove auto-created channels that no longer have any valid streams
|
|
# This handles the case where streams were deleted due to stale retention policy
|
|
orphaned_channels = Channel.objects.filter(
|
|
auto_created=True,
|
|
auto_created_by=account
|
|
).exclude(
|
|
# Exclude channels that still have valid stream associations
|
|
id__in=ChannelStream.objects.filter(
|
|
stream__m3u_account=account,
|
|
stream__isnull=False
|
|
).values_list('channel_id', flat=True)
|
|
)
|
|
|
|
orphaned_count = orphaned_channels.count()
|
|
if orphaned_count > 0:
|
|
orphaned_channels.delete()
|
|
channels_deleted += orphaned_count
|
|
logger.info(
|
|
f"Deleted {orphaned_count} orphaned auto channels with no valid streams"
|
|
)
|
|
|
|
logger.info(
|
|
f"Auto channel sync complete for account {account.name}: {channels_created} created, {channels_updated} updated, {channels_deleted} deleted"
|
|
)
|
|
return f"Auto sync: {channels_created} channels created, {channels_updated} updated, {channels_deleted} deleted"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in auto channel sync for account {account_id}: {str(e)}")
|
|
return f"Auto sync error: {str(e)}"
|
|
|
|
|
|
def get_transformed_credentials(account, profile=None):
|
|
"""
|
|
Get transformed credentials for XtreamCodes API calls.
|
|
|
|
Args:
|
|
account: M3UAccount instance
|
|
profile: M3UAccountProfile instance (optional, if not provided will use primary profile)
|
|
|
|
Returns:
|
|
tuple: (transformed_url, transformed_username, transformed_password)
|
|
"""
|
|
import re
|
|
import urllib.parse
|
|
|
|
# If no profile is provided, find the primary active profile
|
|
if profile is None:
|
|
try:
|
|
from apps.m3u.models import M3UAccountProfile
|
|
profile = M3UAccountProfile.objects.filter(
|
|
m3u_account=account,
|
|
is_active=True
|
|
).first()
|
|
if profile:
|
|
logger.debug(f"Using primary profile '{profile.name}' for URL transformation")
|
|
else:
|
|
logger.debug(f"No active profiles found for account {account.name}, using base credentials")
|
|
except Exception as e:
|
|
logger.warning(f"Could not get primary profile for account {account.name}: {e}")
|
|
profile = None
|
|
|
|
base_url = account.server_url
|
|
base_username = account.username
|
|
base_password = account.password # Build a complete URL with credentials (similar to how IPTV URLs are structured)
|
|
# Format: http://server.com:port/live/username/password/1234.ts
|
|
if base_url and base_username and base_password:
|
|
# Remove trailing slash from server URL if present
|
|
clean_server_url = base_url.rstrip('/')
|
|
|
|
# Build the complete URL with embedded credentials
|
|
complete_url = f"{clean_server_url}/live/{base_username}/{base_password}/1234.ts"
|
|
logger.debug(f"Built complete URL: {complete_url}")
|
|
|
|
# Apply profile-specific transformations if profile is provided
|
|
if profile and profile.search_pattern and profile.replace_pattern:
|
|
try:
|
|
# Handle backreferences in the replacement pattern
|
|
safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', profile.replace_pattern)
|
|
|
|
# Apply transformation to the complete URL
|
|
transformed_complete_url = re.sub(profile.search_pattern, safe_replace_pattern, complete_url)
|
|
logger.info(f"Transformed complete URL: {complete_url} -> {transformed_complete_url}")
|
|
|
|
# Extract components from the transformed URL
|
|
# Pattern: http://server.com:port/live/username/password/1234.ts
|
|
parsed_url = urllib.parse.urlparse(transformed_complete_url)
|
|
path_parts = [part for part in parsed_url.path.split('/') if part]
|
|
|
|
if len(path_parts) >= 2:
|
|
# Extract username and password from path
|
|
transformed_username = path_parts[1]
|
|
transformed_password = path_parts[2]
|
|
|
|
# Rebuild server URL without the username/password path
|
|
transformed_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
|
if parsed_url.port:
|
|
transformed_url = f"{parsed_url.scheme}://{parsed_url.hostname}:{parsed_url.port}"
|
|
|
|
logger.debug(f"Extracted transformed credentials:")
|
|
logger.debug(f" Server URL: {transformed_url}")
|
|
logger.debug(f" Username: {transformed_username}")
|
|
logger.debug(f" Password: {transformed_password}")
|
|
|
|
return transformed_url, transformed_username, transformed_password
|
|
else:
|
|
logger.warning(f"Could not extract credentials from transformed URL: {transformed_complete_url}")
|
|
return base_url, base_username, base_password
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error transforming URL for profile {profile.name if profile else 'unknown'}: {e}")
|
|
return base_url, base_username, base_password
|
|
else:
|
|
# No profile or no transformation patterns
|
|
return base_url, base_username, base_password
|
|
else:
|
|
logger.warning(f"Missing credentials for account {account.name}")
|
|
return base_url, base_username, base_password
|
|
|
|
|
|
@shared_task
|
|
def refresh_account_profiles(account_id):
|
|
"""Refresh account information for all active profiles of an XC account.
|
|
|
|
This task runs asynchronously in the background after account refresh completes.
|
|
It includes rate limiting delays between profile authentications to prevent provider bans.
|
|
"""
|
|
from django.conf import settings
|
|
import time
|
|
|
|
try:
|
|
account = M3UAccount.objects.get(id=account_id, is_active=True)
|
|
|
|
if account.account_type != M3UAccount.Types.XC:
|
|
logger.debug(f"Account {account_id} is not XC type, skipping profile refresh")
|
|
return f"Account {account_id} is not an XtreamCodes account"
|
|
|
|
from apps.m3u.models import M3UAccountProfile
|
|
|
|
profiles = M3UAccountProfile.objects.filter(
|
|
m3u_account=account,
|
|
is_active=True
|
|
)
|
|
|
|
if not profiles.exists():
|
|
logger.info(f"No active profiles found for account {account.name}")
|
|
return f"No active profiles for account {account_id}"
|
|
|
|
# Get user agent for this account
|
|
try:
|
|
user_agent_string = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
|
if account.user_agent_id:
|
|
from core.models import UserAgent
|
|
ua_obj = UserAgent.objects.get(id=account.user_agent_id)
|
|
if ua_obj and hasattr(ua_obj, "user_agent") and ua_obj.user_agent:
|
|
user_agent_string = ua_obj.user_agent
|
|
except Exception as e:
|
|
logger.warning(f"Error getting user agent, using fallback: {str(e)}")
|
|
logger.debug(f"Using user agent for profile refresh: {user_agent_string}")
|
|
# Get rate limiting delay from settings
|
|
profile_delay = getattr(settings, 'XC_PROFILE_REFRESH_DELAY', 2.5)
|
|
|
|
profiles_updated = 0
|
|
profiles_failed = 0
|
|
|
|
logger.info(f"Starting background refresh for {profiles.count()} profiles of account {account.name}")
|
|
|
|
for idx, profile in enumerate(profiles):
|
|
try:
|
|
# Add delay between profiles to prevent rate limiting (except for first profile)
|
|
if idx > 0:
|
|
logger.info(f"Waiting {profile_delay}s before refreshing next profile to avoid rate limiting")
|
|
time.sleep(profile_delay)
|
|
|
|
# Get transformed credentials for this specific profile
|
|
profile_url, profile_username, profile_password = get_transformed_credentials(account, profile)
|
|
|
|
# Create a separate XC client for this profile's credentials
|
|
with XCClient(
|
|
profile_url,
|
|
profile_username,
|
|
profile_password,
|
|
user_agent_string
|
|
) as profile_client:
|
|
# Authenticate with this profile's credentials
|
|
if profile_client.authenticate():
|
|
# Get account information specific to this profile's credentials
|
|
profile_account_info = profile_client.get_account_info()
|
|
|
|
# Merge with existing custom_properties if they exist
|
|
existing_props = profile.custom_properties or {}
|
|
existing_props.update(profile_account_info)
|
|
profile.custom_properties = existing_props
|
|
profile.save(update_fields=['custom_properties'])
|
|
|
|
profiles_updated += 1
|
|
logger.info(f"Updated account information for profile '{profile.name}' ({profiles_updated}/{profiles.count()})")
|
|
else:
|
|
profiles_failed += 1
|
|
logger.warning(f"Failed to authenticate profile '{profile.name}' with transformed credentials")
|
|
|
|
except Exception as profile_error:
|
|
profiles_failed += 1
|
|
logger.error(f"Failed to update account information for profile '{profile.name}': {str(profile_error)}")
|
|
# Continue with other profiles even if one fails
|
|
|
|
result_msg = f"Profile refresh complete for account {account.name}: {profiles_updated} updated, {profiles_failed} failed"
|
|
logger.info(result_msg)
|
|
return result_msg
|
|
|
|
except M3UAccount.DoesNotExist:
|
|
error_msg = f"Account {account_id} not found"
|
|
logger.error(error_msg)
|
|
return error_msg
|
|
except Exception as e:
|
|
error_msg = f"Error refreshing profiles for account {account_id}: {str(e)}"
|
|
logger.error(error_msg)
|
|
return error_msg
|
|
|
|
|
|
@shared_task
|
|
def refresh_account_info(profile_id):
|
|
"""Refresh only the account information for a specific M3U profile."""
|
|
if not acquire_task_lock("refresh_account_info", profile_id):
|
|
return f"Account info refresh task already running for profile_id={profile_id}."
|
|
|
|
try:
|
|
from apps.m3u.models import M3UAccountProfile
|
|
import re
|
|
|
|
profile = M3UAccountProfile.objects.get(id=profile_id)
|
|
account = profile.m3u_account
|
|
|
|
if account.account_type != M3UAccount.Types.XC:
|
|
release_task_lock("refresh_account_info", profile_id)
|
|
return f"Profile {profile_id} belongs to account {account.id} which is not an XtreamCodes account."
|
|
|
|
# Get transformed credentials using the helper function
|
|
transformed_url, transformed_username, transformed_password = get_transformed_credentials(account, profile)
|
|
|
|
# Initialize XtreamCodes client with extracted/transformed credentials
|
|
client = XCClient(
|
|
transformed_url,
|
|
transformed_username,
|
|
transformed_password,
|
|
account.get_user_agent(),
|
|
) # Authenticate and get account info
|
|
auth_result = client.authenticate()
|
|
if not auth_result:
|
|
error_msg = f"Authentication failed for profile {profile.name} ({profile_id})"
|
|
logger.error(error_msg)
|
|
|
|
# Send error notification to frontend via websocket
|
|
send_websocket_update(
|
|
"updates",
|
|
"update",
|
|
{
|
|
"type": "account_info_refresh_error",
|
|
"profile_id": profile_id,
|
|
"profile_name": profile.name,
|
|
"error": "Authentication failed with the provided credentials",
|
|
"message": f"Failed to authenticate profile '{profile.name}'. Please check the credentials."
|
|
}
|
|
)
|
|
|
|
release_task_lock("refresh_account_info", profile_id)
|
|
return error_msg
|
|
|
|
# Get account information
|
|
account_info = client.get_account_info()
|
|
|
|
# Update only this specific profile with the new account info
|
|
if not profile.custom_properties:
|
|
profile.custom_properties = {}
|
|
profile.custom_properties.update(account_info)
|
|
profile.save()
|
|
|
|
# Send success notification to frontend via websocket
|
|
send_websocket_update(
|
|
"updates",
|
|
"update",
|
|
{
|
|
"type": "account_info_refresh_success",
|
|
"profile_id": profile_id,
|
|
"profile_name": profile.name,
|
|
"message": f"Account information successfully refreshed for profile '{profile.name}'"
|
|
}
|
|
)
|
|
|
|
release_task_lock("refresh_account_info", profile_id)
|
|
return f"Account info refresh completed for profile {profile_id} ({profile.name})."
|
|
|
|
except M3UAccountProfile.DoesNotExist:
|
|
error_msg = f"Profile {profile_id} not found"
|
|
logger.error(error_msg)
|
|
|
|
send_websocket_update(
|
|
"updates",
|
|
"update",
|
|
{
|
|
"type": "account_refresh_error",
|
|
"profile_id": profile_id,
|
|
"error": "Profile not found",
|
|
"message": f"Profile {profile_id} not found"
|
|
}
|
|
)
|
|
|
|
release_task_lock("refresh_account_info", profile_id)
|
|
return error_msg
|
|
except Exception as e:
|
|
error_msg = f"Error refreshing account info for profile {profile_id}: {str(e)}"
|
|
logger.error(error_msg)
|
|
|
|
send_websocket_update(
|
|
"updates",
|
|
"update",
|
|
{
|
|
"type": "account_refresh_error",
|
|
"profile_id": profile_id,
|
|
"error": str(e),
|
|
"message": f"Failed to refresh account info: {str(e)}"
|
|
}
|
|
)
|
|
|
|
release_task_lock("refresh_account_info", profile_id)
|
|
return error_msg
|
|
@shared_task
|
|
def refresh_single_m3u_account(account_id):
|
|
"""Splits M3U processing into chunks and dispatches them as parallel tasks."""
|
|
if not acquire_task_lock("refresh_single_m3u_account", account_id):
|
|
return f"Task already running for account_id={account_id}."
|
|
|
|
# Record start time
|
|
refresh_start_timestamp = timezone.now() # For the cleanup function
|
|
start_time = time.time() # For tracking elapsed time as float
|
|
streams_created = 0
|
|
streams_updated = 0
|
|
streams_deleted = 0
|
|
|
|
try:
|
|
account = M3UAccount.objects.get(id=account_id, is_active=True)
|
|
if not account.is_active:
|
|
logger.debug(f"Account {account_id} is not active, skipping.")
|
|
release_task_lock("refresh_single_m3u_account", account_id)
|
|
return
|
|
|
|
# Set status to fetching
|
|
account.status = M3UAccount.Status.FETCHING
|
|
account.save(update_fields=['status'])
|
|
|
|
filters = list(account.filters.all())
|
|
|
|
# Check if VOD is enabled for this account
|
|
vod_enabled = False
|
|
if account.custom_properties:
|
|
custom_props = account.custom_properties or {}
|
|
vod_enabled = custom_props.get('enable_vod', False)
|
|
|
|
except M3UAccount.DoesNotExist:
|
|
# The M3U account doesn't exist, so delete the periodic task if it exists
|
|
logger.warning(
|
|
f"M3U account with ID {account_id} not found, but task was triggered. Cleaning up orphaned task."
|
|
)
|
|
|
|
# Call the helper function to delete the task
|
|
if delete_m3u_refresh_task_by_id(account_id):
|
|
logger.info(
|
|
f"Successfully cleaned up orphaned task for M3U account {account_id}"
|
|
)
|
|
else:
|
|
logger.debug(f"No orphaned task found for M3U account {account_id}")
|
|
|
|
release_task_lock("refresh_single_m3u_account", account_id)
|
|
return f"M3UAccount with ID={account_id} not found or inactive, task cleaned up"
|
|
|
|
# Fetch M3U lines and handle potential issues
|
|
extinf_data = []
|
|
groups = None
|
|
|
|
cache_path = os.path.join(m3u_dir, f"{account_id}.json")
|
|
if os.path.exists(cache_path):
|
|
try:
|
|
with open(cache_path, "r") as file:
|
|
data = json.load(file)
|
|
|
|
extinf_data = data["extinf_data"]
|
|
groups = data["groups"]
|
|
except json.JSONDecodeError as e:
|
|
# Handle corrupted JSON file
|
|
logger.error(
|
|
f"Error parsing cached M3U data for account {account_id}: {str(e)}"
|
|
)
|
|
|
|
# Backup the corrupted file for potential analysis
|
|
backup_path = f"{cache_path}.corrupted"
|
|
try:
|
|
os.rename(cache_path, backup_path)
|
|
logger.info(f"Renamed corrupted cache file to {backup_path}")
|
|
except OSError as rename_err:
|
|
logger.warning(
|
|
f"Failed to rename corrupted cache file: {str(rename_err)}"
|
|
)
|
|
|
|
# Reset the data to empty structures
|
|
extinf_data = []
|
|
groups = None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error reading cached M3U data: {str(e)}")
|
|
extinf_data = []
|
|
groups = None
|
|
|
|
if not extinf_data:
|
|
try:
|
|
logger.info(f"Calling refresh_m3u_groups for account {account_id}")
|
|
result = refresh_m3u_groups(account_id, full_refresh=True, scan_start_time=refresh_start_timestamp)
|
|
logger.trace(f"refresh_m3u_groups result: {result}")
|
|
|
|
# Check for completely empty result or missing groups
|
|
if not result or result[1] is None:
|
|
logger.error(
|
|
f"Failed to refresh M3U groups for account {account_id}: {result}"
|
|
)
|
|
release_task_lock("refresh_single_m3u_account", account_id)
|
|
return "Failed to update m3u account - download failed or other error"
|
|
|
|
extinf_data, groups = result
|
|
|
|
# XC accounts can have empty extinf_data but valid groups
|
|
try:
|
|
account = M3UAccount.objects.get(id=account_id)
|
|
is_xc_account = account.account_type == M3UAccount.Types.XC
|
|
except M3UAccount.DoesNotExist:
|
|
is_xc_account = False
|
|
|
|
# For XC accounts, empty extinf_data is normal at this stage
|
|
if not extinf_data and not is_xc_account:
|
|
logger.error(f"No streams found for non-XC account {account_id}")
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = "No streams found in M3U source"
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id, "parsing", 100, status="error", error="No streams found"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Exception in refresh_m3u_groups: {str(e)}", exc_info=True)
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = f"Error refreshing M3U groups: {str(e)}"
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id,
|
|
"parsing",
|
|
100,
|
|
status="error",
|
|
error=f"Error refreshing M3U groups: {str(e)}",
|
|
)
|
|
release_task_lock("refresh_single_m3u_account", account_id)
|
|
return "Failed to update m3u account"
|
|
|
|
# Only proceed with parsing if we actually have data and no errors were encountered
|
|
# Get account type to handle XC accounts differently
|
|
try:
|
|
is_xc_account = account.account_type == M3UAccount.Types.XC
|
|
except Exception:
|
|
is_xc_account = False
|
|
|
|
# Modified validation logic for different account types
|
|
if (not groups) or (not is_xc_account and not extinf_data):
|
|
logger.error(f"No data to process for account {account_id}")
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = "No data available for processing"
|
|
account.save(update_fields=["status", "last_message"])
|
|
send_m3u_update(
|
|
account_id,
|
|
"parsing",
|
|
100,
|
|
status="error",
|
|
error="No data available for processing",
|
|
)
|
|
release_task_lock("refresh_single_m3u_account", account_id)
|
|
return "Failed to update m3u account, no data available"
|
|
|
|
hash_keys = CoreSettings.get_m3u_hash_key().split(",")
|
|
|
|
existing_groups = {
|
|
group.name: group.id
|
|
for group in ChannelGroup.objects.filter(
|
|
m3u_accounts__m3u_account=account, # Filter by the M3UAccount
|
|
m3u_accounts__enabled=True, # Filter by the enabled flag in the join table
|
|
)
|
|
}
|
|
|
|
try:
|
|
# Set status to parsing
|
|
account.status = M3UAccount.Status.PARSING
|
|
account.save(update_fields=["status"])
|
|
|
|
# Commit any pending transactions before threading
|
|
from django.db import transaction
|
|
transaction.commit()
|
|
|
|
# Initialize stream counters
|
|
streams_created = 0
|
|
streams_updated = 0
|
|
|
|
if account.account_type == M3UAccount.Types.STADNARD:
|
|
logger.debug(
|
|
f"Processing Standard account ({account_id}) with groups: {existing_groups}"
|
|
)
|
|
# Break into batches and process with threading - use global batch size
|
|
batches = [
|
|
extinf_data[i : i + BATCH_SIZE]
|
|
for i in range(0, len(extinf_data), BATCH_SIZE)
|
|
]
|
|
|
|
logger.info(f"Processing {len(extinf_data)} streams in {len(batches)} thread batches")
|
|
|
|
# Use 2 threads for optimal database connection handling
|
|
max_workers = min(2, len(batches))
|
|
logger.debug(f"Using {max_workers} threads for processing")
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit batch processing tasks using direct functions (now thread-safe)
|
|
future_to_batch = {
|
|
executor.submit(process_m3u_batch_direct, account_id, batch, existing_groups, hash_keys): i
|
|
for i, batch in enumerate(batches)
|
|
}
|
|
|
|
completed_batches = 0
|
|
total_batches = len(batches)
|
|
|
|
# Process completed batches as they finish
|
|
for future in as_completed(future_to_batch):
|
|
batch_idx = future_to_batch[future]
|
|
try:
|
|
result = future.result()
|
|
completed_batches += 1
|
|
|
|
# Extract stream counts from result
|
|
if isinstance(result, str):
|
|
try:
|
|
created_match = re.search(r"(\d+) created", result)
|
|
updated_match = re.search(r"(\d+) updated", result)
|
|
if created_match and updated_match:
|
|
created_count = int(created_match.group(1))
|
|
updated_count = int(updated_match.group(1))
|
|
streams_created += created_count
|
|
streams_updated += updated_count
|
|
except (AttributeError, ValueError):
|
|
pass
|
|
|
|
# Send progress update
|
|
progress = int((completed_batches / total_batches) * 100)
|
|
current_elapsed = time.time() - start_time
|
|
|
|
if progress > 0:
|
|
estimated_total = (current_elapsed / progress) * 100
|
|
time_remaining = max(0, estimated_total - current_elapsed)
|
|
else:
|
|
time_remaining = 0
|
|
|
|
send_m3u_update(
|
|
account_id,
|
|
"parsing",
|
|
progress,
|
|
elapsed_time=current_elapsed,
|
|
time_remaining=time_remaining,
|
|
streams_processed=streams_created + streams_updated,
|
|
)
|
|
|
|
logger.debug(f"Thread batch {completed_batches}/{total_batches} completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in thread batch {batch_idx}: {str(e)}")
|
|
completed_batches += 1 # Still count it to avoid hanging
|
|
|
|
logger.info(f"Thread-based processing completed for account {account_id}")
|
|
else:
|
|
# For XC accounts, get the groups with their custom properties containing xc_id
|
|
logger.debug(f"Processing XC account with groups: {existing_groups}")
|
|
|
|
# Get the ChannelGroupM3UAccount entries with their custom_properties
|
|
channel_group_relationships = ChannelGroupM3UAccount.objects.filter(
|
|
m3u_account=account, enabled=True
|
|
).select_related("channel_group")
|
|
|
|
filtered_groups = {}
|
|
for rel in channel_group_relationships:
|
|
group_name = rel.channel_group.name
|
|
group_id = rel.channel_group.id
|
|
|
|
# Load the custom properties with the xc_id
|
|
custom_props = rel.custom_properties or {}
|
|
if "xc_id" in custom_props:
|
|
filtered_groups[group_name] = {
|
|
"xc_id": custom_props["xc_id"],
|
|
"channel_group_id": group_id,
|
|
}
|
|
logger.debug(
|
|
f"Added group {group_name} with xc_id {custom_props['xc_id']}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"No xc_id found in custom properties for group {group_name}"
|
|
)
|
|
|
|
logger.info(
|
|
f"Filtered {len(filtered_groups)} groups for processing: {filtered_groups}"
|
|
)
|
|
|
|
# Collect all XC streams in a single API call and filter by enabled categories
|
|
logger.info("Fetching all XC streams from provider and filtering by enabled categories...")
|
|
all_xc_streams = collect_xc_streams(account_id, filtered_groups)
|
|
|
|
if not all_xc_streams:
|
|
logger.warning("No streams collected from XC groups")
|
|
else:
|
|
# Now batch by stream count (like standard M3U processing)
|
|
batches = [
|
|
all_xc_streams[i : i + BATCH_SIZE]
|
|
for i in range(0, len(all_xc_streams), BATCH_SIZE)
|
|
]
|
|
|
|
logger.info(f"Processing {len(all_xc_streams)} XC streams in {len(batches)} batches")
|
|
|
|
# Use threading for XC stream processing - now with consistent batch sizes
|
|
max_workers = min(4, len(batches))
|
|
logger.debug(f"Using {max_workers} threads for XC stream processing")
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit stream batch processing tasks (reuse standard M3U processing)
|
|
future_to_batch = {
|
|
executor.submit(process_m3u_batch_direct, account_id, batch, existing_groups, hash_keys): i
|
|
for i, batch in enumerate(batches)
|
|
}
|
|
|
|
completed_batches = 0
|
|
total_batches = len(batches)
|
|
|
|
# Process completed batches as they finish
|
|
for future in as_completed(future_to_batch):
|
|
batch_idx = future_to_batch[future]
|
|
try:
|
|
result = future.result()
|
|
completed_batches += 1
|
|
|
|
# Extract stream counts from result
|
|
if isinstance(result, str):
|
|
try:
|
|
created_match = re.search(r"(\d+) created", result)
|
|
updated_match = re.search(r"(\d+) updated", result)
|
|
if created_match and updated_match:
|
|
created_count = int(created_match.group(1))
|
|
updated_count = int(updated_match.group(1))
|
|
streams_created += created_count
|
|
streams_updated += updated_count
|
|
except (AttributeError, ValueError):
|
|
pass
|
|
|
|
# Send progress update
|
|
progress = int((completed_batches / total_batches) * 100)
|
|
current_elapsed = time.time() - start_time
|
|
|
|
if progress > 0:
|
|
estimated_total = (current_elapsed / progress) * 100
|
|
time_remaining = max(0, estimated_total - current_elapsed)
|
|
else:
|
|
time_remaining = 0
|
|
|
|
send_m3u_update(
|
|
account_id,
|
|
"parsing",
|
|
progress,
|
|
elapsed_time=current_elapsed,
|
|
time_remaining=time_remaining,
|
|
streams_processed=streams_created + streams_updated,
|
|
)
|
|
|
|
logger.debug(f"XC thread batch {completed_batches}/{total_batches} completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in XC thread batch {batch_idx}: {str(e)}")
|
|
completed_batches += 1 # Still count it to avoid hanging
|
|
|
|
logger.info(f"XC thread-based processing completed for account {account_id}")
|
|
|
|
# Ensure all database transactions are committed before cleanup
|
|
logger.info(
|
|
f"All thread processing completed, ensuring DB transactions are committed before cleanup"
|
|
)
|
|
# Force a simple DB query to ensure connection sync
|
|
Stream.objects.filter(
|
|
id=-1
|
|
).exists() # This will never find anything but ensures DB sync
|
|
|
|
# Mark streams that weren't seen in this refresh as stale (pending deletion)
|
|
stale_stream_count = Stream.objects.filter(
|
|
m3u_account=account,
|
|
last_seen__lt=refresh_start_timestamp
|
|
).update(is_stale=True)
|
|
logger.info(f"Marked {stale_stream_count} streams as stale for account {account_id}")
|
|
|
|
# Mark group relationships that weren't seen in this refresh as stale (pending deletion)
|
|
stale_group_count = ChannelGroupM3UAccount.objects.filter(
|
|
m3u_account=account,
|
|
last_seen__lt=refresh_start_timestamp
|
|
).update(is_stale=True)
|
|
logger.info(f"Marked {stale_group_count} group relationships as stale for account {account_id}")
|
|
|
|
# Now run cleanup
|
|
streams_deleted = cleanup_streams(account_id, refresh_start_timestamp)
|
|
|
|
# Cleanup stale group relationships (follows same retention policy as streams)
|
|
cleanup_stale_group_relationships(account, refresh_start_timestamp)
|
|
|
|
# Run auto channel sync after successful refresh
|
|
auto_sync_message = ""
|
|
try:
|
|
sync_result = sync_auto_channels(
|
|
account_id, scan_start_time=str(refresh_start_timestamp)
|
|
)
|
|
logger.info(
|
|
f"Auto channel sync result for account {account_id}: {sync_result}"
|
|
)
|
|
if sync_result and "created" in sync_result:
|
|
auto_sync_message = f" {sync_result}."
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Error running auto channel sync for account {account_id}: {str(e)}"
|
|
)
|
|
|
|
# Calculate elapsed time
|
|
elapsed_time = time.time() - start_time
|
|
|
|
# Calculate total streams processed
|
|
streams_processed = streams_created + streams_updated
|
|
|
|
# Set status to success and update timestamp BEFORE sending the final update
|
|
account.status = M3UAccount.Status.SUCCESS
|
|
account.last_message = (
|
|
f"Processing completed in {elapsed_time:.1f} seconds. "
|
|
f"Streams: {streams_created} created, {streams_updated} updated, {streams_deleted} removed. "
|
|
f"Total processed: {streams_processed}.{auto_sync_message}"
|
|
)
|
|
account.updated_at = timezone.now()
|
|
account.save(update_fields=["status", "last_message", "updated_at"])
|
|
|
|
# Log system event for M3U refresh
|
|
log_system_event(
|
|
event_type='m3u_refresh',
|
|
account_name=account.name,
|
|
elapsed_time=round(elapsed_time, 2),
|
|
streams_created=streams_created,
|
|
streams_updated=streams_updated,
|
|
streams_deleted=streams_deleted,
|
|
total_processed=streams_processed,
|
|
)
|
|
|
|
# Send final update with complete metrics and explicitly include success status
|
|
send_m3u_update(
|
|
account_id,
|
|
"parsing",
|
|
100,
|
|
status="success", # Explicitly set status to success
|
|
elapsed_time=elapsed_time,
|
|
time_remaining=0,
|
|
streams_processed=streams_processed,
|
|
streams_created=streams_created,
|
|
streams_updated=streams_updated,
|
|
streams_deleted=streams_deleted,
|
|
message=account.last_message,
|
|
)
|
|
|
|
# Trigger VOD refresh if enabled and account is XtreamCodes type
|
|
if vod_enabled and account.account_type == M3UAccount.Types.XC:
|
|
logger.info(f"VOD is enabled for account {account_id}, triggering VOD refresh")
|
|
try:
|
|
from apps.vod.tasks import refresh_vod_content
|
|
refresh_vod_content.delay(account_id)
|
|
logger.info(f"VOD refresh task queued for account {account_id}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to queue VOD refresh for account {account_id}: {str(e)}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing M3U for account {account_id}: {str(e)}")
|
|
account.status = M3UAccount.Status.ERROR
|
|
account.last_message = f"Error processing M3U: {str(e)}"
|
|
account.save(update_fields=["status", "last_message"])
|
|
raise # Re-raise the exception for Celery to handle
|
|
|
|
release_task_lock("refresh_single_m3u_account", account_id)
|
|
|
|
# Aggressive garbage collection
|
|
# Only delete variables if they exist
|
|
if 'existing_groups' in locals():
|
|
del existing_groups
|
|
if 'extinf_data' in locals():
|
|
del extinf_data
|
|
if 'groups' in locals():
|
|
del groups
|
|
if 'batches' in locals():
|
|
del batches
|
|
|
|
from core.utils import cleanup_memory
|
|
|
|
cleanup_memory(log_usage=True, force_collection=True)
|
|
|
|
# Clean up cache file since we've fully processed it
|
|
if os.path.exists(cache_path):
|
|
os.remove(cache_path)
|
|
|
|
return f"Dispatched jobs complete."
|
|
|
|
|
|
def send_m3u_update(account_id, action, progress, **kwargs):
|
|
# Start with the base data dictionary
|
|
data = {
|
|
"progress": progress,
|
|
"type": "m3u_refresh",
|
|
"account": account_id,
|
|
"action": action,
|
|
}
|
|
|
|
# Add the status and message if not already in kwargs
|
|
try:
|
|
account = M3UAccount.objects.get(id=account_id)
|
|
if account:
|
|
if "status" not in kwargs:
|
|
data["status"] = account.status
|
|
if "message" not in kwargs and account.last_message:
|
|
data["message"] = account.last_message
|
|
except:
|
|
pass # If account can't be retrieved, continue without these fields
|
|
|
|
# Add the additional key-value pairs from kwargs
|
|
data.update(kwargs)
|
|
send_websocket_update("updates", "update", data, collect_garbage=False)
|
|
|
|
# Explicitly clear data reference to help garbage collection
|
|
data = None
|