mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 02:35:14 +00:00
Convert m3u accounts to a similar format to epg for status updates. Renamed DB field last_error to last_message so we can use it for more messaging. Migration to change updated_at field to not be updated everytime the m3u changes. It should only update the updated_at when we successfully update the m3u.
This commit is contained in:
parent
5dac5858f2
commit
b713b516b4
15 changed files with 398 additions and 169 deletions
|
|
@ -18,7 +18,9 @@ logger = logging.getLogger(__name__)
|
|||
# 1) EPG Source API (CRUD)
|
||||
# ─────────────────────────────
|
||||
class EPGSourceViewSet(viewsets.ModelViewSet):
|
||||
"""Handles CRUD operations for EPG sources"""
|
||||
"""
|
||||
API endpoint that allows EPG sources to be viewed or edited.
|
||||
"""
|
||||
queryset = EPGSource.objects.all()
|
||||
serializer_class = EPGSourceSerializer
|
||||
permission_classes = [IsAuthenticated]
|
||||
|
|
@ -50,6 +52,19 @@ class EPGSourceViewSet(viewsets.ModelViewSet):
|
|||
|
||||
return Response(serializer.data, status=status.HTTP_201_CREATED)
|
||||
|
||||
def partial_update(self, request, *args, **kwargs):
|
||||
"""Handle PATCH requests with special handling for is_active toggle"""
|
||||
# Check if this is just an is_active toggle
|
||||
if len(request.data) == 1 and 'is_active' in request.data:
|
||||
instance = self.get_object()
|
||||
instance.is_active = request.data['is_active']
|
||||
instance.save(update_fields=['is_active'])
|
||||
serializer = self.get_serializer(instance)
|
||||
return Response(serializer.data)
|
||||
|
||||
# Otherwise, handle as normal update
|
||||
return super().partial_update(request, *args, **kwargs)
|
||||
|
||||
# ─────────────────────────────
|
||||
# 2) Program API (CRUD)
|
||||
# ─────────────────────────────
|
||||
|
|
|
|||
42
apps/epg/migrations/0011_update_epgsource_fields.py
Normal file
42
apps/epg/migrations/0011_update_epgsource_fields.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
# Generated by Django 5.1.6 on 2025-05-04 21:43
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('epg', '0010_merge_20250503_2147'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
# Change updated_at field
|
||||
migrations.AlterField(
|
||||
model_name='epgsource',
|
||||
name='updated_at',
|
||||
field=models.DateTimeField(blank=True, help_text='Time when this source was last successfully refreshed', null=True),
|
||||
),
|
||||
|
||||
# Add new last_message field
|
||||
migrations.AddField(
|
||||
model_name='epgsource',
|
||||
name='last_message',
|
||||
field=models.TextField(blank=True, help_text='Last status message, including success results or error information', null=True),
|
||||
),
|
||||
|
||||
# Copy data from last_error to last_message
|
||||
migrations.RunPython(
|
||||
code=lambda apps, schema_editor: apps.get_model('epg', 'EPGSource').objects.all().update(
|
||||
last_message=models.F('last_error')
|
||||
),
|
||||
reverse_code=lambda apps, schema_editor: apps.get_model('epg', 'EPGSource').objects.all().update(
|
||||
last_error=models.F('last_message')
|
||||
),
|
||||
),
|
||||
|
||||
# Remove the old field
|
||||
migrations.RemoveField(
|
||||
model_name='epgsource',
|
||||
name='last_error',
|
||||
),
|
||||
]
|
||||
|
|
@ -9,12 +9,18 @@ class EPGSource(models.Model):
|
|||
('xmltv', 'XMLTV URL'),
|
||||
('schedules_direct', 'Schedules Direct API'),
|
||||
]
|
||||
|
||||
STATUS_IDLE = 'idle'
|
||||
STATUS_FETCHING = 'fetching'
|
||||
STATUS_PARSING = 'parsing'
|
||||
STATUS_ERROR = 'error'
|
||||
STATUS_SUCCESS = 'success'
|
||||
STATUS_CHOICES = [
|
||||
('idle', 'Idle'),
|
||||
('fetching', 'Fetching'),
|
||||
('parsing', 'Parsing'),
|
||||
('error', 'Error'),
|
||||
('success', 'Success'),
|
||||
(STATUS_IDLE, 'Idle'),
|
||||
(STATUS_FETCHING, 'Fetching'),
|
||||
(STATUS_PARSING, 'Parsing'),
|
||||
(STATUS_ERROR, 'Error'),
|
||||
(STATUS_SUCCESS, 'Success'),
|
||||
]
|
||||
|
||||
name = models.CharField(max_length=255, unique=True)
|
||||
|
|
@ -27,15 +33,23 @@ class EPGSource(models.Model):
|
|||
refresh_task = models.ForeignKey(
|
||||
PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True
|
||||
)
|
||||
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='idle')
|
||||
last_error = models.TextField(blank=True, null=True)
|
||||
status = models.CharField(
|
||||
max_length=20,
|
||||
choices=STATUS_CHOICES,
|
||||
default=STATUS_IDLE
|
||||
)
|
||||
last_message = models.TextField(
|
||||
null=True,
|
||||
blank=True,
|
||||
help_text="Last status message, including success results or error information"
|
||||
)
|
||||
created_at = models.DateTimeField(
|
||||
auto_now_add=True,
|
||||
help_text="Time when this source was created"
|
||||
)
|
||||
updated_at = models.DateTimeField(
|
||||
auto_now=True,
|
||||
help_text="Time when this source was last updated"
|
||||
null=True, blank=True,
|
||||
help_text="Time when this source was last successfully refreshed"
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
|
|
@ -56,6 +70,15 @@ class EPGSource(models.Model):
|
|||
|
||||
return cache
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
# Prevent auto_now behavior by handling updated_at manually
|
||||
if 'update_fields' in kwargs and 'updated_at' not in kwargs['update_fields']:
|
||||
# Don't modify updated_at for regular updates
|
||||
kwargs.setdefault('update_fields', [])
|
||||
if 'updated_at' in kwargs['update_fields']:
|
||||
kwargs['update_fields'].remove('updated_at')
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
class EPGData(models.Model):
|
||||
# Removed the Channel foreign key. We now just store the original tvg_id
|
||||
# and a name (which might simply be the tvg_id if no real channel exists).
|
||||
|
|
|
|||
|
|
@ -18,14 +18,14 @@ class EPGSourceSerializer(serializers.ModelSerializer):
|
|||
'file_path',
|
||||
'refresh_interval',
|
||||
'status',
|
||||
'last_error',
|
||||
'last_message',
|
||||
'created_at',
|
||||
'updated_at',
|
||||
'epg_data_ids'
|
||||
]
|
||||
|
||||
def get_epg_data_ids(self, obj):
|
||||
return list(obj.epgs.values_list('id', flat=True))
|
||||
return list(obj.epgs.values_list('id', flat=True))
|
||||
|
||||
class ProgramDataSerializer(serializers.ModelSerializer):
|
||||
class Meta:
|
||||
|
|
|
|||
|
|
@ -99,8 +99,8 @@ def refresh_epg_data(source_id):
|
|||
try:
|
||||
source = EPGSource.objects.get(id=source_id)
|
||||
source.status = 'error'
|
||||
source.last_error = f"Error refreshing EPG data: {str(e)}"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = f"Error refreshing EPG data: {str(e)}"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source_id, "refresh", 100, status="error", error=str(e))
|
||||
except Exception as inner_e:
|
||||
logger.error(f"Error updating source status: {inner_e}")
|
||||
|
|
@ -127,8 +127,8 @@ def fetch_xmltv(source):
|
|||
if not source.url:
|
||||
# Update source status for missing URL
|
||||
source.status = 'error'
|
||||
source.last_error = "No URL provided and no valid local file exists"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = "No URL provided and no valid local file exists"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source.id, "downloading", 100, status="error", error="No URL provided and no valid local file exists")
|
||||
return False
|
||||
|
||||
|
|
@ -167,8 +167,8 @@ def fetch_xmltv(source):
|
|||
logger.error(f"EPG URL not found (404): {source.url}")
|
||||
# Update status to error in the database
|
||||
source.status = 'error'
|
||||
source.last_error = f"EPG source '{source.name}' returned 404 error - will retry on next scheduled run"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = f"EPG source '{source.name}' returned 404 error - will retry on next scheduled run"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
|
||||
# Notify users through the WebSocket about the EPG fetch failure
|
||||
channel_layer = get_channel_layer()
|
||||
|
|
@ -197,8 +197,8 @@ def fetch_xmltv(source):
|
|||
|
||||
# Update status to error in the database
|
||||
source.status = 'error'
|
||||
source.last_error = user_message
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = user_message
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
|
||||
# Notify users through the WebSocket
|
||||
channel_layer = get_channel_layer()
|
||||
|
|
@ -297,8 +297,8 @@ def fetch_xmltv(source):
|
|||
|
||||
# Update source status to error with the error message
|
||||
source.status = 'error'
|
||||
source.last_error = user_message
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = user_message
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
|
||||
# Notify users through the WebSocket about the EPG fetch failure
|
||||
channel_layer = get_channel_layer()
|
||||
|
|
@ -329,8 +329,8 @@ def fetch_xmltv(source):
|
|||
|
||||
# Update source status
|
||||
source.status = 'error'
|
||||
source.last_error = user_message
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = user_message
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
|
||||
# Send notifications
|
||||
channel_layer = get_channel_layer()
|
||||
|
|
@ -356,8 +356,8 @@ def fetch_xmltv(source):
|
|||
|
||||
# Update source status for general exceptions too
|
||||
source.status = 'error'
|
||||
source.last_error = f"Error: {error_message}"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = f"Error: {error_message}"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
|
||||
# Ensure we update the download progress to 100 with error status
|
||||
send_epg_update(source.id, "downloading", 100, status="error", error=f"Error: {error_message}")
|
||||
|
|
@ -390,8 +390,8 @@ def parse_channels_only(source):
|
|||
logger.error(f"Failed to fetch EPG data from URL: {source.url}")
|
||||
# Update status to error
|
||||
source.status = 'error'
|
||||
source.last_error = f"Failed to fetch EPG data from URL"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = f"Failed to fetch EPG data from URL"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
# Send error notification
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="error", error="Failed to fetch EPG data")
|
||||
return False
|
||||
|
|
@ -401,16 +401,16 @@ def parse_channels_only(source):
|
|||
logger.error(f"Failed to fetch EPG data, file still missing at: {new_path}")
|
||||
# Update status to error
|
||||
source.status = 'error'
|
||||
source.last_error = f"Failed to fetch EPG data, file missing after download"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = f"Failed to fetch EPG data, file missing after download"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="error", error="File not found after download")
|
||||
return False
|
||||
else:
|
||||
logger.error(f"No URL provided for EPG source {source.name}, cannot fetch new data")
|
||||
# Update status to error
|
||||
source.status = 'error'
|
||||
source.last_error = f"No URL provided, cannot fetch EPG data"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = f"No URL provided, cannot fetch EPG data"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="error", error="No URL provided")
|
||||
return False
|
||||
|
||||
|
|
@ -495,16 +495,16 @@ def parse_channels_only(source):
|
|||
logger.error(f"EPG file not found at: {file_path}")
|
||||
# Update status to error
|
||||
source.status = 'error'
|
||||
source.last_error = f"EPG file not found: {file_path}"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = f"EPG file not found: {file_path}"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="error", error="File not found")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading EPG file {file_path}: {e}", exc_info=True)
|
||||
# Update status to error
|
||||
source.status = 'error'
|
||||
source.last_error = f"Error parsing EPG file: {str(e)}"
|
||||
source.save(update_fields=['status', 'last_error'])
|
||||
source.last_message = f"Error parsing EPG file: {str(e)}"
|
||||
source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(source.id, "parsing_channels", 100, status="error", error=str(e))
|
||||
return False
|
||||
|
||||
|
|
@ -709,6 +709,10 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
logger.info(f"Parsing programs for {total_entries} EPG entries from source: {epg_source.name}")
|
||||
|
||||
failed_entries = []
|
||||
program_count = 0
|
||||
channel_count = 0
|
||||
updated_count = 0
|
||||
|
||||
for epg in epg_entries:
|
||||
if epg.tvg_id:
|
||||
try:
|
||||
|
|
@ -723,22 +727,31 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
logger.error(f"Error parsing programs for tvg_id={epg.tvg_id}: {e}", exc_info=True)
|
||||
failed_entries.append(f"{epg.tvg_id}: {str(e)}")
|
||||
|
||||
# If any entries failed, mark the source as error but continue
|
||||
# If there were failures, include them in the message but continue
|
||||
if failed_entries:
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_error = f"Failed to parse some entries: {', '.join(failed_entries[:5])}" + (
|
||||
"..." if len(failed_entries) > 5 else "")
|
||||
epg_source.save(update_fields=['status', 'last_error'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error",
|
||||
error=f"Failed to parse {len(failed_entries)} of {total_entries} entries")
|
||||
return False
|
||||
epg_source.status = EPGSource.STATUS_SUCCESS # Still mark as success if some processed
|
||||
error_summary = f"Failed to parse {len(failed_entries)} of {total_entries} entries"
|
||||
stats_summary = f"Processed {program_count} programs across {channel_count} channels. Updated: {updated_count}."
|
||||
epg_source.last_message = f"{stats_summary} Warning: {error_summary}"
|
||||
epg_source.updated_at = timezone.now()
|
||||
epg_source.save(update_fields=['status', 'last_message', 'updated_at'])
|
||||
|
||||
# All successful
|
||||
epg_source.status = 'success'
|
||||
epg_source.save(update_fields=['status'])
|
||||
# Send completion notification with mixed status
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100,
|
||||
status="success",
|
||||
message=epg_source.last_message)
|
||||
return True
|
||||
|
||||
# Send completion notification with updated status
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="success")
|
||||
# If all successful, set a comprehensive success message
|
||||
epg_source.status = EPGSource.STATUS_SUCCESS
|
||||
epg_source.last_message = f"Successfully processed {program_count} programs across {channel_count} channels. Updated: {updated_count}."
|
||||
epg_source.updated_at = timezone.now()
|
||||
epg_source.save(update_fields=['status', 'last_message', 'updated_at'])
|
||||
|
||||
# Send completion notification with status
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100,
|
||||
status="success",
|
||||
message=epg_source.last_message)
|
||||
|
||||
logger.info(f"Completed parsing all programs for source: {epg_source.name}")
|
||||
return True
|
||||
|
|
@ -746,10 +759,12 @@ def parse_programs_for_source(epg_source, tvg_id=None):
|
|||
except Exception as e:
|
||||
logger.error(f"Error in parse_programs_for_source: {e}", exc_info=True)
|
||||
# Update status to error
|
||||
epg_source.status = 'error'
|
||||
epg_source.last_error = f"Error parsing programs: {str(e)}"
|
||||
epg_source.save(update_fields=['status', 'last_error'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100, status="error", error=str(e))
|
||||
epg_source.status = EPGSource.STATUS_ERROR
|
||||
epg_source.last_message = f"Error parsing programs: {str(e)}"
|
||||
epg_source.save(update_fields=['status', 'last_message'])
|
||||
send_epg_update(epg_source.id, "parsing_programs", 100,
|
||||
status="error",
|
||||
message=epg_source.last_message)
|
||||
return False
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -94,6 +94,19 @@ class M3UAccountViewSet(viewsets.ModelViewSet):
|
|||
# After the instance is created, return the response
|
||||
return response
|
||||
|
||||
def partial_update(self, request, *args, **kwargs):
|
||||
"""Handle PATCH requests with special handling for is_active toggle"""
|
||||
# Check if this is just an is_active toggle
|
||||
if len(request.data) == 1 and 'is_active' in request.data:
|
||||
instance = self.get_object()
|
||||
instance.is_active = request.data['is_active']
|
||||
instance.save(update_fields=['is_active'])
|
||||
serializer = self.get_serializer(instance)
|
||||
return Response(serializer.data)
|
||||
|
||||
# Otherwise, handle as normal update
|
||||
return super().partial_update(request, *args, **kwargs)
|
||||
|
||||
class M3UFilterViewSet(viewsets.ModelViewSet):
|
||||
"""Handles CRUD operations for M3U filters"""
|
||||
queryset = M3UFilter.objects.all()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,28 @@
|
|||
# Generated by Django 5.1.6 on 2025-05-04 21:43
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('m3u', '0009_m3uaccount_account_type_m3uaccount_password_and_more'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='m3uaccount',
|
||||
name='last_message',
|
||||
field=models.TextField(blank=True, null=True, help_text="Last status message, including success results or error information"),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='m3uaccount',
|
||||
name='status',
|
||||
field=models.CharField(choices=[('idle', 'Idle'), ('fetching', 'Fetching'), ('parsing', 'Parsing'), ('error', 'Error'), ('success', 'Success')], default='idle', max_length=20),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='m3uaccount',
|
||||
name='updated_at',
|
||||
field=models.DateTimeField(blank=True, help_text='Time when this account was last successfully refreshed', null=True),
|
||||
),
|
||||
]
|
||||
|
|
@ -14,6 +14,13 @@ class M3UAccount(models.Model):
|
|||
STADNARD = "STD", "Standard"
|
||||
XC = "XC", "Xtream Codes"
|
||||
|
||||
class Status(models.TextChoices):
|
||||
IDLE = "idle", "Idle"
|
||||
FETCHING = "fetching", "Fetching"
|
||||
PARSING = "parsing", "Parsing"
|
||||
ERROR = "error", "Error"
|
||||
SUCCESS = "success", "Success"
|
||||
|
||||
"""Represents an M3U Account for IPTV streams."""
|
||||
name = models.CharField(
|
||||
max_length=255,
|
||||
|
|
@ -51,8 +58,18 @@ class M3UAccount(models.Model):
|
|||
help_text="Time when this account was created"
|
||||
)
|
||||
updated_at = models.DateTimeField(
|
||||
auto_now=True,
|
||||
help_text="Time when this account was last updated"
|
||||
null=True, blank=True,
|
||||
help_text="Time when this account was last successfully refreshed"
|
||||
)
|
||||
status = models.CharField(
|
||||
max_length=20,
|
||||
choices=Status.choices,
|
||||
default=Status.IDLE
|
||||
)
|
||||
last_message = models.TextField(
|
||||
null=True,
|
||||
blank=True,
|
||||
help_text="Last status message, including success results or error information"
|
||||
)
|
||||
user_agent = models.ForeignKey(
|
||||
'core.UserAgent',
|
||||
|
|
@ -119,6 +136,15 @@ class M3UAccount(models.Model):
|
|||
|
||||
return user_agent
|
||||
|
||||
def save(self, *args, **kwargs):
|
||||
# Prevent auto_now behavior by handling updated_at manually
|
||||
if 'update_fields' in kwargs and 'updated_at' not in kwargs['update_fields']:
|
||||
# Don't modify updated_at for regular updates
|
||||
kwargs.setdefault('update_fields', [])
|
||||
if 'updated_at' in kwargs['update_fields']:
|
||||
kwargs['update_fields'].remove('updated_at')
|
||||
super().save(*args, **kwargs)
|
||||
|
||||
# def get_channel_groups(self):
|
||||
# return ChannelGroup.objects.filter(m3u_account__m3u_account=self)
|
||||
|
||||
|
|
|
|||
|
|
@ -67,6 +67,7 @@ class M3UAccountSerializer(serializers.ModelSerializer):
|
|||
'id', 'name', 'server_url', 'file_path', 'server_group',
|
||||
'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked',
|
||||
'channel_groups', 'refresh_interval', 'custom_properties', 'account_type', 'username', 'password', 'stale_stream_days',
|
||||
'status', 'last_message',
|
||||
]
|
||||
extra_kwargs = {
|
||||
'password': {
|
||||
|
|
|
|||
|
|
@ -513,9 +513,11 @@ def refresh_single_m3u_account(account_id):
|
|||
if not acquire_task_lock('refresh_single_m3u_account', account_id):
|
||||
return f"Task already running for account_id={account_id}."
|
||||
|
||||
# redis_client = RedisClient.get_client()
|
||||
# Record start time
|
||||
start_time = time.time()
|
||||
streams_created = 0
|
||||
streams_updated = 0
|
||||
streams_deleted = 0
|
||||
|
||||
try:
|
||||
account = M3UAccount.objects.get(id=account_id, is_active=True)
|
||||
|
|
@ -523,6 +525,10 @@ def refresh_single_m3u_account(account_id):
|
|||
logger.info(f"Account {account_id} is not active, skipping.")
|
||||
return
|
||||
|
||||
# Set status to fetching
|
||||
account.status = M3UAccount.Status.FETCHING
|
||||
account.save(update_fields=['status'])
|
||||
|
||||
filters = list(account.filters.all())
|
||||
except M3UAccount.DoesNotExist:
|
||||
release_task_lock('refresh_single_m3u_account', account_id)
|
||||
|
|
@ -558,105 +564,134 @@ def refresh_single_m3u_account(account_id):
|
|||
m3u_account__enabled=True # Filter by the enabled flag in the join table
|
||||
)}
|
||||
|
||||
if account.account_type == M3UAccount.Types.STADNARD:
|
||||
# Break into batches and process in parallel
|
||||
batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)]
|
||||
task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches)
|
||||
else:
|
||||
filtered_groups = [(k, v) for k, v in groups.items() if k in existing_groups]
|
||||
batches = [
|
||||
dict(filtered_groups[i:i + 2])
|
||||
for i in range(0, len(filtered_groups), 2)
|
||||
]
|
||||
task_group = group(process_xc_category.s(account_id, batch, existing_groups, hash_keys) for batch in batches)
|
||||
try:
|
||||
# Set status to parsing
|
||||
account.status = M3UAccount.Status.PARSING
|
||||
account.save(update_fields=['status'])
|
||||
|
||||
total_batches = len(batches)
|
||||
completed_batches = 0
|
||||
streams_processed = 0 # Track total streams processed
|
||||
logger.debug(f"Dispatched {len(batches)} parallel tasks for account_id={account_id}.")
|
||||
if account.account_type == M3UAccount.Types.STADNARD:
|
||||
# Break into batches and process in parallel
|
||||
batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)]
|
||||
task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches)
|
||||
else:
|
||||
filtered_groups = [(k, v) for k, v in groups.items() if k in existing_groups]
|
||||
batches = [
|
||||
dict(filtered_groups[i:i + 2])
|
||||
for i in range(0, len(filtered_groups), 2)
|
||||
]
|
||||
task_group = group(process_xc_category.s(account_id, batch, existing_groups, hash_keys) for batch in batches)
|
||||
|
||||
# result = task_group.apply_async()
|
||||
result = task_group.apply_async()
|
||||
total_batches = len(batches)
|
||||
completed_batches = 0
|
||||
streams_processed = 0 # Track total streams processed
|
||||
logger.debug(f"Dispatched {len(batches)} parallel tasks for account_id={account_id}.")
|
||||
|
||||
# Wait for all tasks to complete and collect their result IDs
|
||||
completed_task_ids = set()
|
||||
while completed_batches < total_batches:
|
||||
for async_result in result:
|
||||
if async_result.ready() and async_result.id not in completed_task_ids: # If the task has completed and we haven't counted it
|
||||
task_result = async_result.result # The result of the task
|
||||
logger.debug(f"Task completed with result: {task_result}")
|
||||
# result = task_group.apply_async()
|
||||
result = task_group.apply_async()
|
||||
|
||||
# Extract stream counts from result string if available
|
||||
if isinstance(task_result, str):
|
||||
try:
|
||||
created_match = re.search(r"(\d+) created", task_result)
|
||||
updated_match = re.search(r"(\d+) updated", task_result)
|
||||
# Wait for all tasks to complete and collect their result IDs
|
||||
completed_task_ids = set()
|
||||
while completed_batches < total_batches:
|
||||
for async_result in result:
|
||||
if async_result.ready() and async_result.id not in completed_task_ids: # If the task has completed and we haven't counted it
|
||||
task_result = async_result.result # The result of the task
|
||||
logger.debug(f"Task completed with result: {task_result}")
|
||||
|
||||
if created_match and updated_match:
|
||||
created_count = int(created_match.group(1))
|
||||
updated_count = int(updated_match.group(1))
|
||||
streams_processed += created_count + updated_count
|
||||
except (AttributeError, ValueError):
|
||||
pass
|
||||
# Extract stream counts from result string if available
|
||||
if isinstance(task_result, str):
|
||||
try:
|
||||
created_match = re.search(r"(\d+) created", task_result)
|
||||
updated_match = re.search(r"(\d+) updated", task_result)
|
||||
|
||||
completed_batches += 1
|
||||
completed_task_ids.add(async_result.id) # Mark this task as processed
|
||||
if created_match and updated_match:
|
||||
created_count = int(created_match.group(1))
|
||||
updated_count = int(updated_match.group(1))
|
||||
streams_processed += created_count + updated_count
|
||||
streams_created += created_count
|
||||
streams_updated += updated_count
|
||||
except (AttributeError, ValueError):
|
||||
pass
|
||||
|
||||
# Calculate progress
|
||||
progress = int((completed_batches / total_batches) * 100)
|
||||
completed_batches += 1
|
||||
completed_task_ids.add(async_result.id) # Mark this task as processed
|
||||
|
||||
# Calculate elapsed time and estimated remaining time
|
||||
current_elapsed = time.time() - start_time
|
||||
if progress > 0:
|
||||
estimated_total = (current_elapsed / progress) * 100
|
||||
time_remaining = max(0, estimated_total - current_elapsed)
|
||||
# Calculate progress
|
||||
progress = int((completed_batches / total_batches) * 100)
|
||||
|
||||
# Calculate elapsed time and estimated remaining time
|
||||
current_elapsed = time.time() - start_time
|
||||
if progress > 0:
|
||||
estimated_total = (current_elapsed / progress) * 100
|
||||
time_remaining = max(0, estimated_total - current_elapsed)
|
||||
else:
|
||||
time_remaining = 0
|
||||
|
||||
# Send progress update via Channels
|
||||
# Don't send 100% because we want to clean up after
|
||||
if progress == 100:
|
||||
progress = 99
|
||||
|
||||
send_m3u_update(
|
||||
account_id,
|
||||
"parsing",
|
||||
progress,
|
||||
elapsed_time=current_elapsed,
|
||||
time_remaining=time_remaining,
|
||||
streams_processed=streams_processed
|
||||
)
|
||||
|
||||
# Optionally remove completed task from the group to prevent processing it again
|
||||
result.remove(async_result)
|
||||
else:
|
||||
time_remaining = 0
|
||||
logger.debug(f"Task is still running.")
|
||||
|
||||
# Send progress update via Channels
|
||||
# Don't send 100% because we want to clean up after
|
||||
if progress == 100:
|
||||
progress = 99
|
||||
# Ensure all database transactions are committed before cleanup
|
||||
logger.info(f"All {total_batches} tasks completed, ensuring DB transactions are committed before cleanup")
|
||||
# Force a simple DB query to ensure connection sync
|
||||
Stream.objects.filter(id=-1).exists() # This will never find anything but ensures DB sync
|
||||
|
||||
send_m3u_update(
|
||||
account_id,
|
||||
"parsing",
|
||||
progress,
|
||||
elapsed_time=current_elapsed,
|
||||
time_remaining=time_remaining,
|
||||
streams_processed=streams_processed
|
||||
)
|
||||
# Now run cleanup
|
||||
cleanup_streams(account_id)
|
||||
# Send final update with complete metrics
|
||||
elapsed_time = time.time() - start_time
|
||||
send_m3u_update(
|
||||
account_id,
|
||||
"parsing",
|
||||
100,
|
||||
elapsed_time=elapsed_time,
|
||||
time_remaining=0,
|
||||
streams_processed=streams_processed,
|
||||
streams_created=streams_created,
|
||||
streams_updated=streams_updated,
|
||||
streams_deleted=streams_deleted,
|
||||
message=account.last_message
|
||||
)
|
||||
|
||||
# Optionally remove completed task from the group to prevent processing it again
|
||||
result.remove(async_result)
|
||||
else:
|
||||
logger.debug(f"Task is still running.")
|
||||
end_time = time.time()
|
||||
|
||||
# Ensure all database transactions are committed before cleanup
|
||||
logger.info(f"All {total_batches} tasks completed, ensuring DB transactions are committed before cleanup")
|
||||
# Force a simple DB query to ensure connection sync
|
||||
Stream.objects.filter(id=-1).exists() # This will never find anything but ensures DB sync
|
||||
# Calculate elapsed time
|
||||
elapsed_time = end_time - start_time
|
||||
|
||||
# Now run cleanup
|
||||
cleanup_streams(account_id)
|
||||
# Send final update with complete metrics
|
||||
elapsed_time = time.time() - start_time
|
||||
send_m3u_update(
|
||||
account_id,
|
||||
"parsing",
|
||||
100,
|
||||
elapsed_time=elapsed_time,
|
||||
time_remaining=0,
|
||||
streams_processed=streams_processed
|
||||
)
|
||||
# Set status to success and update timestamp
|
||||
account.status = M3UAccount.Status.SUCCESS
|
||||
account.last_message = (
|
||||
f"Processing completed in {elapsed_time:.1f} seconds. "
|
||||
f"Streams: {streams_created} created, {streams_updated} updated, {streams_deleted} removed. "
|
||||
f"Total processed: {streams_processed}."
|
||||
)
|
||||
account.updated_at = timezone.now()
|
||||
account.save(update_fields=['status', 'last_message', 'updated_at'])
|
||||
|
||||
end_time = time.time()
|
||||
print(f"Function took {elapsed_time} seconds to execute.")
|
||||
|
||||
# Calculate elapsed time
|
||||
elapsed_time = end_time - start_time
|
||||
account.save(update_fields=['updated_at'])
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing M3U for account {account_id}: {str(e)}")
|
||||
account.status = M3UAccount.Status.ERROR
|
||||
account.last_message = f"Error processing M3U: {str(e)}"
|
||||
account.save(update_fields=['status', 'last_message'])
|
||||
raise # Re-raise the exception for Celery to handle
|
||||
|
||||
print(f"Function took {elapsed_time} seconds to execute.")
|
||||
release_task_lock('refresh_single_m3u_account', account_id)
|
||||
|
||||
# Aggressive garbage collection
|
||||
del existing_groups, extinf_data, groups, batches
|
||||
|
|
@ -666,16 +701,6 @@ def refresh_single_m3u_account(account_id):
|
|||
if os.path.exists(cache_path):
|
||||
os.remove(cache_path)
|
||||
|
||||
release_task_lock('refresh_single_m3u_account', account_id)
|
||||
|
||||
# cursor = 0
|
||||
# while True:
|
||||
# cursor, keys = redis_client.scan(cursor, match=f"m3u_refresh:*", count=BATCH_SIZE)
|
||||
# if keys:
|
||||
# redis_client.delete(*keys) # Delete the matching keys
|
||||
# if cursor == 0:
|
||||
# break
|
||||
|
||||
return f"Dispatched jobs complete."
|
||||
|
||||
def send_m3u_update(account_id, action, progress, **kwargs):
|
||||
|
|
@ -687,6 +712,17 @@ def send_m3u_update(account_id, action, progress, **kwargs):
|
|||
"action": action,
|
||||
}
|
||||
|
||||
# Add the status and message if not already in kwargs
|
||||
try:
|
||||
account = M3UAccount.objects.get(id=account_id)
|
||||
if account:
|
||||
if "status" not in kwargs:
|
||||
data["status"] = account.status
|
||||
if "message" not in kwargs and account.last_message:
|
||||
data["message"] = account.last_message
|
||||
except:
|
||||
pass # If account can't be retrieved, continue without these fields
|
||||
|
||||
# Add the additional key-value pairs from kwargs
|
||||
data.update(kwargs)
|
||||
|
||||
|
|
@ -698,4 +734,4 @@ def send_m3u_update(account_id, action, progress, **kwargs):
|
|||
'type': 'update',
|
||||
'data': data
|
||||
}
|
||||
)
|
||||
)
|
||||
|
|
@ -261,7 +261,7 @@ export const WebsocketProvider = ({ children }) => {
|
|||
epgsState.updateEPG({
|
||||
...epg,
|
||||
status: 'error',
|
||||
last_error: parsedEvent.data.message
|
||||
last_message: parsedEvent.data.message
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -728,14 +728,22 @@ export default class API {
|
|||
}
|
||||
}
|
||||
|
||||
static async updatePlaylist(values) {
|
||||
static async updatePlaylist(values, isToggle = false) {
|
||||
const { id, ...payload } = values;
|
||||
|
||||
if (payload.custom_properties) {
|
||||
payload.custom_properties = JSON.stringify(payload.custom_properties);
|
||||
}
|
||||
|
||||
try {
|
||||
// If this is just toggling the active state, make a simpler request
|
||||
if (isToggle && 'is_active' in payload && Object.keys(payload).length === 1) {
|
||||
const response = await request(`${host}/api/m3u/accounts/${id}/`, {
|
||||
method: 'PATCH',
|
||||
body: { is_active: payload.is_active },
|
||||
});
|
||||
|
||||
usePlaylistsStore.getState().updatePlaylist(response);
|
||||
return response;
|
||||
}
|
||||
|
||||
// Original implementation for full updates
|
||||
let body = null;
|
||||
if (payload.file) {
|
||||
delete payload.server_url;
|
||||
|
|
@ -817,10 +825,22 @@ export default class API {
|
|||
}
|
||||
}
|
||||
|
||||
static async updateEPG(values) {
|
||||
static async updateEPG(values, isToggle = false) {
|
||||
const { id, ...payload } = values;
|
||||
|
||||
try {
|
||||
// If this is just toggling the active state, make a simpler request
|
||||
if (isToggle && 'is_active' in payload && Object.keys(payload).length === 1) {
|
||||
const response = await request(`${host}/api/epg/sources/${id}/`, {
|
||||
method: 'PATCH',
|
||||
body: { is_active: payload.is_active },
|
||||
});
|
||||
|
||||
useEPGsStore.getState().updateEPG(response);
|
||||
return response;
|
||||
}
|
||||
|
||||
// Original implementation for full updates
|
||||
let body = null;
|
||||
if (payload.files) {
|
||||
body = new FormData();
|
||||
|
|
|
|||
|
|
@ -53,10 +53,15 @@ const EPGsTable = () => {
|
|||
const theme = useMantineTheme();
|
||||
|
||||
const toggleActive = async (epg) => {
|
||||
await API.updateEPG({
|
||||
...epg,
|
||||
is_active: !epg.is_active,
|
||||
});
|
||||
try {
|
||||
// Send only the is_active field to trigger our special handling
|
||||
await API.updateEPG({
|
||||
id: epg.id,
|
||||
is_active: !epg.is_active,
|
||||
}, true); // Add a new parameter to indicate this is just a toggle
|
||||
} catch (error) {
|
||||
console.error('Error toggling active state:', error);
|
||||
}
|
||||
};
|
||||
|
||||
const buildProgressDisplay = (data) => {
|
||||
|
|
@ -154,7 +159,7 @@ const EPGsTable = () => {
|
|||
},
|
||||
{
|
||||
header: 'Status Message',
|
||||
accessorKey: 'last_error',
|
||||
accessorKey: 'last_message',
|
||||
size: 250,
|
||||
minSize: 150,
|
||||
enableSorting: false,
|
||||
|
|
@ -162,11 +167,11 @@ const EPGsTable = () => {
|
|||
const data = row.original;
|
||||
|
||||
// Show error message when status is error
|
||||
if (data.status === 'error' && data.last_error) {
|
||||
if (data.status === 'error' && data.last_message) {
|
||||
return (
|
||||
<Tooltip label={data.last_error} multiline width={300}>
|
||||
<Tooltip label={data.last_message} multiline width={300}>
|
||||
<Text c="dimmed" size="xs" lineClamp={2} style={{ color: theme.colors.red[6] }}>
|
||||
{data.last_error}
|
||||
{data.last_message}
|
||||
</Text>
|
||||
</Tooltip>
|
||||
);
|
||||
|
|
|
|||
|
|
@ -207,10 +207,15 @@ const M3UTable = () => {
|
|||
};
|
||||
|
||||
const toggleActive = async (playlist) => {
|
||||
await API.updatePlaylist({
|
||||
...playlist,
|
||||
is_active: !playlist.is_active,
|
||||
});
|
||||
try {
|
||||
// Send only the is_active field to trigger our special handling
|
||||
await API.updatePlaylist({
|
||||
id: playlist.id,
|
||||
is_active: !playlist.is_active,
|
||||
}, true); // Add a new parameter to indicate this is just a toggle
|
||||
} catch (error) {
|
||||
console.error('Error toggling active state:', error);
|
||||
}
|
||||
};
|
||||
|
||||
const columns = useMemo(
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ const useEPGsStore = create((set) => ({
|
|||
[data.source]: {
|
||||
...state.epgs[data.source],
|
||||
status: sourceStatus,
|
||||
last_error: data.status === 'error' ? (data.error || 'Unknown error') : state.epgs[data.source]?.last_error
|
||||
last_message: data.status === 'error' ? (data.error || 'Unknown error') : state.epgs[data.source]?.last_message
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue