Merge remote-tracking branch 'origin/dev' into xtream

This commit is contained in:
dekzter 2025-04-30 15:10:59 -04:00
commit 3ea8c05466
28 changed files with 806 additions and 205 deletions

View file

@ -44,28 +44,17 @@ jobs:
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Increment Build Number
if: steps.check_actor.outputs.is_bot != 'true'
id: increment_build
- name: Generate timestamp for build
id: timestamp
run: |
python scripts/increment_build.py
BUILD=$(python -c "import version; print(version.__build__)")
echo "build=${BUILD}" >> $GITHUB_OUTPUT
- name: Commit Build Number Update
if: steps.check_actor.outputs.is_bot != 'true'
run: |
git add version.py
git commit -m "Increment build number to ${{ steps.increment_build.outputs.build }} [skip ci]"
git push
TIMESTAMP=$(date -u +'%Y%m%d%H%M%S')
echo "timestamp=${TIMESTAMP}" >> $GITHUB_OUTPUT
- name: Extract version info
id: version
run: |
VERSION=$(python -c "import version; print(version.__version__)")
BUILD=$(python -c "import version; print(version.__build__)")
echo "version=${VERSION}" >> $GITHUB_OUTPUT
echo "build=${BUILD}" >> $GITHUB_OUTPUT
echo "sha_short=${GITHUB_SHA::7}" >> $GITHUB_OUTPUT
- name: Set repository and image metadata
@ -108,9 +97,10 @@ jobs:
platforms: linux/amd64 # Fast build - amd64 only
tags: |
ghcr.io/${{ steps.meta.outputs.repo_owner }}/${{ steps.meta.outputs.repo_name }}:${{ steps.meta.outputs.branch_tag }}
ghcr.io/${{ steps.meta.outputs.repo_owner }}/${{ steps.meta.outputs.repo_name }}:${{ steps.version.outputs.version }}-${{ steps.version.outputs.build }}
ghcr.io/${{ steps.meta.outputs.repo_owner }}/${{ steps.meta.outputs.repo_name }}:${{ steps.version.outputs.version }}-${{ steps.timestamp.outputs.timestamp }}
ghcr.io/${{ steps.meta.outputs.repo_owner }}/${{ steps.meta.outputs.repo_name }}:${{ steps.version.outputs.sha_short }}
build-args: |
BRANCH=${{ github.ref_name }}
REPO_URL=https://github.com/${{ github.repository }}
TIMESTAMP=${{ steps.timestamp.outputs.timestamp }}
file: ./docker/Dockerfile

View file

@ -541,6 +541,71 @@ class ChannelViewSet(viewsets.ModelViewSet):
except Exception as e:
return Response({"error": str(e)}, status=400)
@swagger_auto_schema(
method='post',
operation_description="Associate multiple channels with EPG data without triggering a full refresh",
request_body=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'associations': openapi.Schema(
type=openapi.TYPE_ARRAY,
items=openapi.Schema(
type=openapi.TYPE_OBJECT,
properties={
'channel_id': openapi.Schema(type=openapi.TYPE_INTEGER),
'epg_data_id': openapi.Schema(type=openapi.TYPE_INTEGER)
}
)
)
}
),
responses={200: "EPG data linked for multiple channels"}
)
@action(detail=False, methods=['post'], url_path='batch-set-epg')
def batch_set_epg(self, request):
"""Efficiently associate multiple channels with EPG data at once."""
associations = request.data.get('associations', [])
channels_updated = 0
programs_refreshed = 0
unique_epg_ids = set()
for assoc in associations:
channel_id = assoc.get('channel_id')
epg_data_id = assoc.get('epg_data_id')
if not channel_id:
continue
try:
# Get the channel
channel = Channel.objects.get(id=channel_id)
# Set the EPG data
channel.epg_data_id = epg_data_id
channel.save(update_fields=['epg_data'])
channels_updated += 1
# Track unique EPG data IDs
if epg_data_id:
unique_epg_ids.add(epg_data_id)
except Channel.DoesNotExist:
logger.error(f"Channel with ID {channel_id} not found")
except Exception as e:
logger.error(f"Error setting EPG data for channel {channel_id}: {str(e)}")
# Trigger program refresh for unique EPG data IDs
from apps.epg.tasks import parse_programs_for_tvg_id
for epg_id in unique_epg_ids:
parse_programs_for_tvg_id.delay(epg_id)
programs_refreshed += 1
return Response({
'success': True,
'channels_updated': channels_updated,
'programs_refreshed': programs_refreshed
})
# ─────────────────────────────────────────────────────────
# 4) Bulk Delete Streams
# ─────────────────────────────────────────────────────────
@ -629,14 +694,35 @@ class LogoViewSet(viewsets.ModelViewSet):
if logo_url.startswith("/data"): # Local file
if not os.path.exists(logo_url):
raise Http404("Image not found")
mimetype = mimetypes.guess_type(logo_url)
return FileResponse(open(logo_url, "rb"), content_type=mimetype)
# Get proper mime type (first item of the tuple)
content_type, _ = mimetypes.guess_type(logo_url)
if not content_type:
content_type = 'image/jpeg' # Default to a common image type
# Use context manager and set Content-Disposition to inline
response = StreamingHttpResponse(open(logo_url, "rb"), content_type=content_type)
response['Content-Disposition'] = 'inline; filename="{}"'.format(os.path.basename(logo_url))
return response
else: # Remote image
try:
remote_response = requests.get(logo_url, stream=True)
if remote_response.status_code == 200:
return StreamingHttpResponse(remote_response.iter_content(chunk_size=8192), content_type=remote_response.headers['Content-Type'])
# Try to get content type from response headers first
content_type = remote_response.headers.get('Content-Type')
# If no content type in headers or it's empty, guess based on URL
if not content_type:
content_type, _ = mimetypes.guess_type(logo_url)
# If still no content type, default to common image type
if not content_type:
content_type = 'image/jpeg'
response = StreamingHttpResponse(remote_response.iter_content(chunk_size=8192), content_type=content_type)
response['Content-Disposition'] = 'inline; filename="{}"'.format(os.path.basename(logo_url))
return response
raise Http404("Remote image not found")
except requests.RequestException:
raise Http404("Error fetching remote image")
@ -681,6 +767,7 @@ class BulkUpdateChannelMembershipAPIView(APIView):
updates = serializer.validated_data['channels']
channel_ids = [entry['channel_id'] for entry in updates]
memberships = ChannelProfileMembership.objects.filter(
channel_profile=channel_profile,
channel_id__in=channel_ids

View file

@ -407,6 +407,53 @@ class Channel(models.Model):
if current_count > 0:
redis_client.decr(profile_connections_key)
def update_stream_profile(self, new_profile_id):
"""
Updates the profile for the current stream and adjusts connection counts.
Args:
new_profile_id: The ID of the new stream profile to use
Returns:
bool: True if successful, False otherwise
"""
redis_client = RedisClient.get_client()
# Get current stream ID
stream_id_bytes = redis_client.get(f"channel_stream:{self.id}")
if not stream_id_bytes:
logger.debug("No active stream found for channel")
return False
stream_id = int(stream_id_bytes)
# Get current profile ID
current_profile_id_bytes = redis_client.get(f"stream_profile:{stream_id}")
if not current_profile_id_bytes:
logger.debug("No profile found for current stream")
return False
current_profile_id = int(current_profile_id_bytes)
# Don't do anything if the profile is already set to the requested one
if current_profile_id == new_profile_id:
return True
# Decrement connection count for old profile
old_profile_connections_key = f"profile_connections:{current_profile_id}"
old_count = int(redis_client.get(old_profile_connections_key) or 0)
if old_count > 0:
redis_client.decr(old_profile_connections_key)
# Update the profile mapping
redis_client.set(f"stream_profile:{stream_id}", new_profile_id)
# Increment connection count for new profile
new_profile_connections_key = f"profile_connections:{new_profile_id}"
redis_client.incr(new_profile_connections_key)
logger.info(f"Updated stream {stream_id} profile from {current_profile_id} to {new_profile_id}")
return True
class ChannelProfile(models.Model):
name = models.CharField(max_length=100, unique=True)

View file

@ -75,21 +75,42 @@ def match_epg_channels():
matched_channels = []
channels_to_update = []
channels_json = [{
"id": channel.id,
"name": channel.name,
"tvg_id": channel.tvg_id,
"fallback_name": channel.tvg_id.strip() if channel.tvg_id else channel.name,
"norm_chan": normalize_name(channel.tvg_id.strip() if channel.tvg_id else channel.name)
} for channel in Channel.objects.all() if not channel.epg_data]
# Get channels that don't have EPG data assigned
channels_without_epg = Channel.objects.filter(epg_data__isnull=True)
logger.info(f"Found {channels_without_epg.count()} channels without EPG data")
epg_json = [{
'id': epg.id,
'tvg_id': epg.tvg_id,
'name': epg.name,
'norm_name': normalize_name(epg.name),
'epg_source_id': epg.epg_source.id,
} for epg in EPGData.objects.all()]
channels_json = []
for channel in channels_without_epg:
# Normalize TVG ID - strip whitespace and convert to lowercase
normalized_tvg_id = channel.tvg_id.strip().lower() if channel.tvg_id else ""
if normalized_tvg_id:
logger.info(f"Processing channel {channel.id} '{channel.name}' with TVG ID='{normalized_tvg_id}'")
channels_json.append({
"id": channel.id,
"name": channel.name,
"tvg_id": normalized_tvg_id, # Use normalized TVG ID
"original_tvg_id": channel.tvg_id, # Keep original for reference
"fallback_name": normalized_tvg_id if normalized_tvg_id else channel.name,
"norm_chan": normalize_name(normalized_tvg_id if normalized_tvg_id else channel.name)
})
# Similarly normalize EPG data TVG IDs
epg_json = []
for epg in EPGData.objects.all():
normalized_tvg_id = epg.tvg_id.strip().lower() if epg.tvg_id else ""
epg_json.append({
'id': epg.id,
'tvg_id': normalized_tvg_id, # Use normalized TVG ID
'original_tvg_id': epg.tvg_id, # Keep original for reference
'name': epg.name,
'norm_name': normalize_name(epg.name),
'epg_source_id': epg.epg_source.id if epg.epg_source else None,
})
# Log available EPG data TVG IDs for debugging
unique_epg_tvg_ids = set(e['tvg_id'] for e in epg_json if e['tvg_id'])
logger.info(f"Available EPG TVG IDs: {', '.join(sorted(unique_epg_tvg_ids))}")
payload = {
"channels": channels_json,
@ -159,12 +180,25 @@ def match_epg_channels():
logger.info("Finished EPG matching logic.")
# Send update with additional information for refreshing UI
channel_layer = get_channel_layer()
associations = [
{"channel_id": chan["id"], "epg_data_id": chan["epg_data_id"]}
for chan in channels_to_update_dicts
]
async_to_sync(channel_layer.group_send)(
'updates',
{
'type': 'update',
"data": {"success": True, "type": "epg_match"}
"data": {
"success": True,
"type": "epg_match",
"refresh_channels": True, # Flag to tell frontend to refresh channels
"matches_count": total_matched,
"message": f"EPG matching complete: {total_matched} channel(s) matched",
"associations": associations # Add the associations data
}
}
)

View file

@ -140,10 +140,6 @@ class EPGGridAPIView(APIView):
}
dummy_programs.append(dummy_program)
# Also update the channel to use this dummy tvg_id
channel.tvg_id = dummy_tvg_id
channel.save(update_fields=['tvg_id'])
except Exception as e:
logger.error(f"Error creating dummy programs for channel {channel.name} (ID: {channel.id}): {str(e)}")

View file

@ -2,6 +2,7 @@ from django.db import models
from django.utils import timezone
from django_celery_beat.models import PeriodicTask
from django.conf import settings
import os
class EPGSource(models.Model):
SOURCE_TYPE_CHOICES = [

View file

@ -6,6 +6,7 @@ from .redis_keys import RedisKeys
from .constants import TS_PACKET_SIZE, ChannelMetadataField
from redis.exceptions import ConnectionError, TimeoutError
from .utils import get_logger
from django.db import DatabaseError # Add import for error handling
logger = get_logger()
@ -45,6 +46,42 @@ class ChannelStatus:
'buffer_index': int(buffer_index_value.decode('utf-8')) if buffer_index_value else 0,
}
# Add stream ID and name information
stream_id_bytes = metadata.get(ChannelMetadataField.STREAM_ID.encode('utf-8'))
if stream_id_bytes:
try:
stream_id = int(stream_id_bytes.decode('utf-8'))
info['stream_id'] = stream_id
# Look up stream name from database
try:
from apps.channels.models import Stream
stream = Stream.objects.filter(id=stream_id).first()
if stream:
info['stream_name'] = stream.name
except (ImportError, DatabaseError) as e:
logger.warning(f"Failed to get stream name for ID {stream_id}: {e}")
except ValueError:
logger.warning(f"Invalid stream_id format in Redis: {stream_id_bytes}")
# Add M3U profile information
m3u_profile_id_bytes = metadata.get(ChannelMetadataField.M3U_PROFILE.encode('utf-8'))
if m3u_profile_id_bytes:
try:
m3u_profile_id = int(m3u_profile_id_bytes.decode('utf-8'))
info['m3u_profile_id'] = m3u_profile_id
# Look up M3U profile name from database
try:
from apps.m3u.models import M3UAccountProfile
m3u_profile = M3UAccountProfile.objects.filter(id=m3u_profile_id).first()
if m3u_profile:
info['m3u_profile_name'] = m3u_profile.name
except (ImportError, DatabaseError) as e:
logger.warning(f"Failed to get M3U profile name for ID {m3u_profile_id}: {e}")
except ValueError:
logger.warning(f"Invalid m3u_profile_id format in Redis: {m3u_profile_id_bytes}")
# Add timing information
state_changed_field = ChannelMetadataField.STATE_CHANGED_AT.encode('utf-8')
if state_changed_field in metadata:
@ -285,6 +322,24 @@ class ChannelStatus:
'uptime': uptime
}
# Add stream ID and name information
stream_id_bytes = metadata.get(ChannelMetadataField.STREAM_ID.encode('utf-8'))
if stream_id_bytes:
try:
stream_id = int(stream_id_bytes.decode('utf-8'))
info['stream_id'] = stream_id
# Look up stream name from database
try:
from apps.channels.models import Stream
stream = Stream.objects.filter(id=stream_id).first()
if stream:
info['stream_name'] = stream.name
except (ImportError, DatabaseError) as e:
logger.warning(f"Failed to get stream name for ID {stream_id}: {e}")
except ValueError:
logger.warning(f"Invalid stream_id format in Redis: {stream_id_bytes}")
# Add data throughput information to basic info
total_bytes_bytes = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.TOTAL_BYTES.encode('utf-8'))
if total_bytes_bytes:
@ -341,6 +396,24 @@ class ChannelStatus:
# Add clients to info
info['clients'] = clients
# Add M3U profile information
m3u_profile_id_bytes = metadata.get(ChannelMetadataField.M3U_PROFILE.encode('utf-8'))
if m3u_profile_id_bytes:
try:
m3u_profile_id = int(m3u_profile_id_bytes.decode('utf-8'))
info['m3u_profile_id'] = m3u_profile_id
# Look up M3U profile name from database
try:
from apps.m3u.models import M3UAccountProfile
m3u_profile = M3UAccountProfile.objects.filter(id=m3u_profile_id).first()
if m3u_profile:
info['m3u_profile_name'] = m3u_profile.name
except (ImportError, DatabaseError) as e:
logger.warning(f"Failed to get M3U profile name for ID {m3u_profile_id}: {e}")
except ValueError:
logger.warning(f"Invalid m3u_profile_id format in Redis: {m3u_profile_id_bytes}")
return info
except Exception as e:
logger.error(f"Error getting channel info: {e}")

View file

@ -15,6 +15,7 @@ import time
import sys
import os
import json
import gevent # Add gevent import
from typing import Dict, Optional, Set
from apps.proxy.config import TSConfig as Config
from apps.channels.models import Channel, Stream
@ -209,7 +210,7 @@ class ProxyServer:
if shutdown_delay > 0:
logger.info(f"Waiting {shutdown_delay}s before stopping channel...")
time.sleep(shutdown_delay)
gevent.sleep(shutdown_delay) # REPLACE: time.sleep(shutdown_delay)
# Re-check client count before stopping
total = self.redis_client.scard(client_set_key) or 0
@ -336,7 +337,7 @@ class ProxyServer:
final_delay = delay + jitter
logger.error(f"Error in event listener: {e}. Retrying in {final_delay:.1f}s (attempt {retry_count})")
time.sleep(final_delay)
gevent.sleep(final_delay) # REPLACE: time.sleep(final_delay)
# Try to clean up the old connection
try:
@ -350,7 +351,7 @@ class ProxyServer:
except Exception as e:
logger.error(f"Error in event listener: {e}")
# Add a short delay to prevent rapid retries on persistent errors
time.sleep(5)
gevent.sleep(5) # REPLACE: time.sleep(5)
thread = threading.Thread(target=event_listener, daemon=True)
thread.name = "redis-event-listener"
@ -1000,7 +1001,7 @@ class ProxyServer:
except Exception as e:
logger.error(f"Error in cleanup thread: {e}", exc_info=True)
time.sleep(ConfigHelper.cleanup_check_interval())
gevent.sleep(ConfigHelper.cleanup_check_interval()) # REPLACE: time.sleep(ConfigHelper.cleanup_check_interval())
thread = threading.Thread(target=cleanup_task, daemon=True)
thread.name = "ts-proxy-cleanup"

View file

@ -7,7 +7,7 @@ import logging
import time
import json
from django.shortcuts import get_object_or_404
from apps.channels.models import Channel
from apps.channels.models import Channel, Stream
from apps.proxy.config import TSConfig as Config
from ..server import ProxyServer
from ..redis_keys import RedisKeys
@ -58,7 +58,7 @@ class ChannelService:
# Verify the stream_id was set
stream_id_value = proxy_server.redis_client.hget(metadata_key, ChannelMetadataField.STREAM_ID)
if stream_id_value:
logger.info(f"Verified stream_id {stream_id_value.decode('utf-8')} is now set in Redis")
logger.debug(f"Verified stream_id {stream_id_value.decode('utf-8')} is now set in Redis")
else:
logger.error(f"Failed to set stream_id {stream_id} in Redis before initialization")
@ -82,7 +82,7 @@ class ChannelService:
return success
@staticmethod
def change_stream_url(channel_id, new_url=None, user_agent=None, target_stream_id=None):
def change_stream_url(channel_id, new_url=None, user_agent=None, target_stream_id=None, m3u_profile_id=None):
"""
Change the URL of an existing stream.
@ -91,6 +91,7 @@ class ChannelService:
new_url: New stream URL (optional if target_stream_id is provided)
user_agent: Optional user agent to update
target_stream_id: Optional target stream ID to switch to
m3u_profile_id: Optional M3U profile ID to update
Returns:
dict: Result information including success status and diagnostics
@ -109,6 +110,10 @@ class ChannelService:
new_url = stream_info['url']
user_agent = stream_info['user_agent']
stream_id = target_stream_id
# Extract M3U profile ID from stream info if available
if 'm3u_profile_id' in stream_info:
m3u_profile_id = stream_info['m3u_profile_id']
logger.info(f"Found M3U profile ID {m3u_profile_id} for stream ID {stream_id}")
elif target_stream_id:
# If we have both URL and target_stream_id, use the target_stream_id
stream_id = target_stream_id
@ -163,7 +168,7 @@ class ChannelService:
# Update metadata in Redis regardless of ownership
if proxy_server.redis_client:
try:
ChannelService._update_channel_metadata(channel_id, new_url, user_agent, stream_id)
ChannelService._update_channel_metadata(channel_id, new_url, user_agent, stream_id, m3u_profile_id)
result['metadata_updated'] = True
except Exception as e:
logger.error(f"Error updating Redis metadata: {e}", exc_info=True)
@ -176,7 +181,7 @@ class ChannelService:
old_url = manager.url
# Update the stream
success = manager.update_url(new_url)
success = manager.update_url(new_url, stream_id)
logger.info(f"Stream URL changed from {old_url} to {new_url}, result: {success}")
result.update({
@ -188,7 +193,7 @@ class ChannelService:
# If we're not the owner, publish an event for the owner to pick up
logger.info(f"Not the owner, requesting URL change via Redis PubSub")
if proxy_server.redis_client:
ChannelService._publish_stream_switch_event(channel_id, new_url, user_agent, stream_id)
ChannelService._publish_stream_switch_event(channel_id, new_url, user_agent, stream_id, m3u_profile_id)
result.update({
'direct_update': False,
'event_published': True,
@ -413,7 +418,7 @@ class ChannelService:
# Helper methods for Redis operations
@staticmethod
def _update_channel_metadata(channel_id, url, user_agent=None, stream_id=None):
def _update_channel_metadata(channel_id, url, user_agent=None, stream_id=None, m3u_profile_id=None):
"""Update channel metadata in Redis"""
proxy_server = ProxyServer.get_instance()
@ -432,7 +437,11 @@ class ChannelService:
metadata[ChannelMetadataField.USER_AGENT] = user_agent
if stream_id:
metadata[ChannelMetadataField.STREAM_ID] = str(stream_id)
logger.info(f"Updating stream ID to {stream_id} in Redis for channel {channel_id}")
if m3u_profile_id:
metadata[ChannelMetadataField.M3U_PROFILE] = str(m3u_profile_id)
# Also update the stream switch time field
metadata[ChannelMetadataField.STREAM_SWITCH_TIME] = str(time.time())
# Use the appropriate method based on the key type
if key_type == 'hash':
@ -448,11 +457,11 @@ class ChannelService:
switch_key = RedisKeys.switch_request(channel_id)
proxy_server.redis_client.setex(switch_key, 30, url) # 30 second TTL
logger.info(f"Updated metadata for channel {channel_id} in Redis")
logger.debug(f"Updated metadata for channel {channel_id} in Redis")
return True
@staticmethod
def _publish_stream_switch_event(channel_id, new_url, user_agent=None, stream_id=None):
def _publish_stream_switch_event(channel_id, new_url, user_agent=None, stream_id=None, m3u_profile_id=None):
"""Publish a stream switch event to Redis pubsub"""
proxy_server = ProxyServer.get_instance()
@ -465,6 +474,7 @@ class ChannelService:
"url": new_url,
"user_agent": user_agent,
"stream_id": stream_id,
"m3u_profile_id": m3u_profile_id,
"requester": proxy_server.worker_id,
"timestamp": time.time()
}

View file

@ -11,6 +11,7 @@ from .redis_keys import RedisKeys
from .config_helper import ConfigHelper
from .constants import TS_PACKET_SIZE
from .utils import get_logger
import gevent.event
logger = get_logger()
@ -46,6 +47,7 @@ class StreamBuffer:
# Track timers for proper cleanup
self.stopping = False
self.fill_timers = []
self.chunk_available = gevent.event.Event()
def add_chunk(self, chunk):
"""Add data with optimized Redis storage and TS packet alignment"""
@ -96,6 +98,9 @@ class StreamBuffer:
if writes_done > 0:
logger.debug(f"Added {writes_done} chunks ({self.target_chunk_size} bytes each) to Redis for channel {self.channel_id} at index {self.index}")
self.chunk_available.set() # Signal that new data is available
self.chunk_available.clear() # Reset for next notification
return True
except Exception as e:

View file

@ -6,6 +6,7 @@ This module handles generating and delivering video streams to clients.
import time
import logging
import threading
import gevent # Add this import at the top of your file
from apps.proxy.config import TSConfig as Config
from .server import ProxyServer
from .utils import create_ts_packet, get_logger
@ -135,7 +136,7 @@ class StreamGenerator:
return False
# Wait a bit before checking again
time.sleep(0.1)
gevent.sleep(0.1)
# Timed out waiting
logger.warning(f"[{self.client_id}] Timed out waiting for initialization")
@ -199,16 +200,16 @@ class StreamGenerator:
self.bytes_sent += len(keepalive_packet)
self.last_yield_time = time.time()
self.consecutive_empty = 0 # Reset consecutive counter but keep total empty_reads
time.sleep(Config.KEEPALIVE_INTERVAL)
gevent.sleep(Config.KEEPALIVE_INTERVAL) # Replace time.sleep
else:
# Standard wait with backoff
sleep_time = min(0.1 * self.consecutive_empty, 1.0)
time.sleep(sleep_time)
gevent.sleep(sleep_time) # Replace time.sleep
# Log empty reads periodically
if self.empty_reads % 50 == 0:
stream_status = "healthy" if (self.stream_manager and self.stream_manager.healthy) else "unknown"
logger.debug(f"[{self.client_id}] Waiting for chunks beyond {self.local_index} (buffer at {self.buffer.index}, stream: {stream_status})")
logger.debug(f"[{self.client_id}] Waiting for chunks beyond {self.local_index} for channel: {self.channel_id} (buffer at {self.buffer.index}, stream: {stream_status})")
# Check for ghost clients
if self._is_ghost_client(self.local_index):
@ -277,7 +278,7 @@ class StreamGenerator:
yield chunk
self.bytes_sent += len(chunk)
self.chunks_sent += 1
logger.debug(f"[{self.client_id}] Sent chunk {self.chunks_sent} ({len(chunk)} bytes) to client")
logger.debug(f"[{self.client_id}] Sent chunk {self.chunks_sent} ({len(chunk)} bytes) for channel {self.channel_id} to client")
current_time = time.time()
@ -380,14 +381,15 @@ class StreamGenerator:
client_count = proxy_server.client_managers[self.channel_id].get_total_client_count()
# Only the last client or owner should release the stream
if client_count <= 1 and proxy_server.am_i_owner(self.channel_id):
from apps.channels.models import Stream
from apps.channels.models import Channel
try:
stream = Stream.objects.get(pk=stream_id)
stream.release_stream()
# Get the channel by UUID
channel = Channel.objects.get(uuid=self.channel_id)
channel.release_stream()
stream_released = True
logger.debug(f"[{self.client_id}] Released stream {stream_id} for channel {self.channel_id}")
logger.debug(f"[{self.client_id}] Released stream for channel {self.channel_id}")
except Exception as e:
logger.error(f"[{self.client_id}] Error releasing stream {stream_id}: {e}")
logger.error(f"[{self.client_id}] Error releasing stream for channel {self.channel_id}: {e}")
except Exception as e:
logger.error(f"[{self.client_id}] Error checking stream data for release: {e}")
@ -415,7 +417,7 @@ class StreamGenerator:
# Use the config setting instead of hardcoded value
shutdown_delay = getattr(Config, 'CHANNEL_SHUTDOWN_DELAY', 5)
logger.info(f"Waiting {shutdown_delay}s before checking if channel should be stopped")
time.sleep(shutdown_delay)
gevent.sleep(shutdown_delay) # Replace time.sleep
# After delay, check global client count
if self.channel_id in proxy_server.client_managers:
@ -426,9 +428,7 @@ class StreamGenerator:
else:
logger.info(f"Not shutting down channel {self.channel_id}, {total} clients still connected")
shutdown_thread = threading.Thread(target=delayed_shutdown)
shutdown_thread.daemon = True
shutdown_thread.start()
gevent.spawn(delayed_shutdown)
def create_stream_generator(channel_id, client_id, client_ip, client_user_agent, channel_initializing=False):
"""

View file

@ -6,6 +6,7 @@ import time
import socket
import requests
import subprocess
import gevent # Add this import
from typing import Optional, List
from django.shortcuts import get_object_or_404
from apps.proxy.config import TSConfig as Config
@ -157,7 +158,7 @@ class StreamManager:
url_failed = False
if self.url_switching:
logger.debug("Skipping connection attempt during URL switch")
time.sleep(0.1)
gevent.sleep(0.1) # REPLACE time.sleep(0.1)
continue
# Connection retry loop for current URL
while self.running and self.retry_count < self.max_retries and not url_failed:
@ -205,7 +206,7 @@ class StreamManager:
# Wait with exponential backoff before retrying
timeout = min(.25 * self.retry_count, 3) # Cap at 3 seconds
logger.info(f"Reconnecting in {timeout} seconds... (attempt {self.retry_count}/{self.max_retries})")
time.sleep(timeout)
gevent.sleep(timeout) # REPLACE time.sleep(timeout)
except Exception as e:
logger.error(f"Connection error: {e}", exc_info=True)
@ -218,7 +219,7 @@ class StreamManager:
# Wait with exponential backoff before retrying
timeout = min(.25 * self.retry_count, 3) # Cap at 3 seconds
logger.info(f"Reconnecting in {timeout} seconds after error... (attempt {self.retry_count}/{self.max_retries})")
time.sleep(timeout)
gevent.sleep(timeout) # REPLACE time.sleep(timeout)
# If URL failed and we're still running, try switching to another stream
if url_failed and self.running:
@ -425,7 +426,7 @@ class StreamManager:
else:
if not self.running:
break
time.sleep(0.1)
gevent.sleep(0.1) # REPLACE time.sleep(0.1)
else:
# Handle direct HTTP connection
chunk_count = 0
@ -502,15 +503,6 @@ class StreamManager:
owner_key = RedisKeys.channel_owner(self.channel_id)
current_owner = self.buffer.redis_client.get(owner_key)
if current_owner and current_owner.decode('utf-8') == self.worker_id:
try:
from apps.channels.models import Stream
stream = Stream.objects.get(pk=self.current_stream_id)
stream.release_stream()
logger.info(f"Released stream {self.current_stream_id} for channel {self.channel_id}")
except Exception as e:
logger.error(f"Error releasing stream {self.current_stream_id}: {e}")
# Cancel all buffer check timers
for timer in list(self._buffer_check_timers):
try:
@ -544,7 +536,7 @@ class StreamManager:
# Set running to false to ensure thread exits
self.running = False
def update_url(self, new_url):
def update_url(self, new_url, stream_id=None):
"""Update stream URL and reconnect with proper cleanup for both HTTP and transcode sessions"""
if new_url == self.url:
logger.info(f"URL unchanged: {new_url}")
@ -552,6 +544,28 @@ class StreamManager:
logger.info(f"Switching stream URL from {self.url} to {new_url}")
# Import both models for proper resource management
from apps.channels.models import Stream, Channel
# Update stream profile if we're switching streams
if self.current_stream_id and stream_id and self.current_stream_id != stream_id:
try:
# Get the channel by UUID
channel = Channel.objects.get(uuid=self.channel_id)
# Get stream to find its profile
new_stream = Stream.objects.get(pk=stream_id)
# Use the new method to update the profile and manage connection counts
if new_stream.m3u_account_id:
success = channel.update_stream_profile(new_stream.m3u_account_id)
if success:
logger.debug(f"Updated stream profile for channel {self.channel_id} to use profile from stream {stream_id}")
else:
logger.warning(f"Failed to update stream profile for channel {self.channel_id}")
except Exception as e:
logger.error(f"Error updating stream profile for channel {self.channel_id}: {e}")
# CRITICAL: Set a flag to prevent immediate reconnection with old URL
self.url_switching = True
@ -568,6 +582,14 @@ class StreamManager:
self.url = new_url
self.connected = False
# Update stream ID if provided
if stream_id:
old_stream_id = self.current_stream_id
self.current_stream_id = stream_id
# Add stream ID to tried streams for proper tracking
self.tried_stream_ids.add(stream_id)
logger.info(f"Updated stream ID from {old_stream_id} to {stream_id} for channel {self.buffer.channel_id}")
# Reset retry counter to allow immediate reconnect
self.retry_count = 0
@ -653,7 +675,7 @@ class StreamManager:
except Exception as e:
logger.error(f"Error in health monitor: {e}")
time.sleep(self.health_check_interval)
gevent.sleep(self.health_check_interval) # REPLACE time.sleep(self.health_check_interval)
def _attempt_reconnect(self):
"""Attempt to reconnect to the current stream"""
@ -697,6 +719,29 @@ class StreamManager:
logger.error(f"Error in reconnect attempt: {e}", exc_info=True)
return False
def _attempt_health_recovery(self):
"""Attempt to recover stream health by switching to another stream"""
try:
logger.info(f"Attempting health recovery for channel {self.channel_id}")
# Don't try to switch if we're already in the process of switching URLs
if self.url_switching:
logger.info("URL switching already in progress, skipping health recovery")
return
# Try to switch to next stream
switch_result = self._try_next_stream()
if switch_result:
logger.info(f"Health recovery successful - switched to new stream for channel {self.channel_id}")
return True
else:
logger.warning(f"Health recovery failed - no alternative streams available for channel {self.channel_id}")
return False
except Exception as e:
logger.error(f"Error in health recovery attempt: {e}", exc_info=True)
return False
def _close_connection(self):
"""Close HTTP connection resources"""
# Close response if it exists
@ -1005,7 +1050,7 @@ class StreamManager:
logger.info(f"Stream metadata updated for channel {self.channel_id} to stream ID {stream_id}")
# IMPORTANT: Just update the URL, don't stop the channel or release resources
switch_result = self.update_url(new_url)
switch_result = self.update_url(new_url, stream_id)
if not switch_result:
logger.error(f"Failed to update URL for stream ID {stream_id}")
return False
@ -1015,4 +1060,4 @@ class StreamManager:
except Exception as e:
logger.error(f"Error trying next stream for channel {self.channel_id}: {e}", exc_info=True)
return False
return False

View file

@ -193,7 +193,8 @@ def stream_ts(request, channel_id):
break
else:
logger.warning(f"[{client_id}] Alternate stream #{alt['stream_id']} failed validation: {message}")
# Release stream lock before redirecting
channel.release_stream()
# Final decision based on validation results
if is_valid:
logger.info(f"[{client_id}] Redirecting to validated URL: {final_url} ({message})")
@ -261,7 +262,6 @@ def stream_ts(request, channel_id):
logger.info(f"[{client_id}] Successfully initialized channel {channel_id}")
channel_initializing = True
logger.info(f"[{client_id}] Channel {channel_id} initialization started")
# Register client - can do this regardless of initialization state
# Create local resources if needed
@ -351,6 +351,7 @@ def change_stream(request, channel_id):
# Use the info from the stream
new_url = stream_info['url']
user_agent = stream_info['user_agent']
m3u_profile_id = stream_info.get('m3u_profile_id')
# Stream ID will be passed to change_stream_url later
elif not new_url:
return JsonResponse({'error': 'Either url or stream_id must be provided'}, status=400)
@ -359,7 +360,7 @@ def change_stream(request, channel_id):
# Use the service layer instead of direct implementation
# Pass stream_id to ensure proper connection tracking
result = ChannelService.change_stream_url(channel_id, new_url, user_agent, stream_id)
result = ChannelService.change_stream_url(channel_id, new_url, user_agent, stream_id, m3u_profile_id)
# Get the stream manager before updating URL
stream_manager = proxy_server.stream_managers.get(channel_id)

View file

@ -95,8 +95,8 @@ def environment(request):
@api_view(['GET'])
def version(request):
# Import version information
from version import __version__, __build__
from version import __version__, __timestamp__
return Response({
'version': __version__,
'build': __build__,
'timestamp': __timestamp__,
})

View file

@ -5,6 +5,8 @@ ARG BRANCH=main
# This will be overridden by the GitHub Actions workflow
# when building the Docker image for production.
ARG REPO_URL=https://github.com/Dispatcharr/Dispatcharr
# Add timestamp argument
ARG TIMESTAMP
ENV PATH="/dispatcharrpy/bin:$PATH" \
VIRTUAL_ENV=/dispatcharrpy \
@ -26,8 +28,16 @@ RUN apt-get update && \
virtualenv /dispatcharrpy && \
git clone -b ${BRANCH} ${REPO_URL} /app && \
cd /app && \
rm -rf .git && \
cd /app && \
rm -rf .git
# Update version.py with build timestamp if provided
RUN if [ -n "$TIMESTAMP" ]; then \
echo "Updating timestamp to ${TIMESTAMP} in version.py" && \
sed -i "s|__timestamp__ = None.*|__timestamp__ = '${TIMESTAMP}' # Set during CI/CD build process|" /app/version.py && \
cat /app/version.py; \
fi
RUN cd /app && \
pip install --no-cache-dir -r requirements.txt
# Use a dedicated Node.js stage for frontend building

View file

@ -3,11 +3,9 @@ docker build --build-arg BRANCH=dev -t dispatcharr/dispatcharr:dev -f Dockerfile
# Get version information
VERSION=$(python -c "import sys; sys.path.append('..'); import version; print(version.__version__)")
BUILD=$(python -c "import sys; sys.path.append('..'); import version; print(version.__build__)")
# Build with version tags
# Build with version tag
docker build --build-arg BRANCH=dev \
-t dispatcharr/dispatcharr:dev \
-t dispatcharr/dispatcharr:${VERSION}-${BUILD} \
-t dispatcharr/dispatcharr:${VERSION} \
-f Dockerfile ..
.

View file

@ -7,6 +7,7 @@ server {
proxy_connect_timeout 75;
proxy_send_timeout 300;
proxy_read_timeout 300;
client_max_body_size 128M; # Allow file uploads up to 128MB
# Serve Django via uWSGI
location / {
@ -84,13 +85,14 @@ server {
# Route TS proxy requests to the dedicated instance
location /proxy/ {
proxy_pass http://127.0.0.1:5656;
include uwsgi_params;
uwsgi_pass unix:/app/uwsgi.sock;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
client_max_body_size 0;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

View file

@ -28,6 +28,7 @@ static-map = /static=/app/static
workers = 4
threads = 4
enable-threads = true
thread-stacksize=512
# Optimize for streaming
http = 0.0.0.0:5656

View file

@ -14,7 +14,7 @@ import useEPGsStore from './store/epgs';
import { Box, Button, Stack } from '@mantine/core';
import API from './api';
export const WebsocketContext = createContext([false, () => {}, null]);
export const WebsocketContext = createContext([false, () => { }, null]);
export const WebsocketProvider = ({ children }) => {
const [isReady, setIsReady] = useState(false);
@ -121,11 +121,17 @@ export const WebsocketProvider = ({ children }) => {
case 'epg_match':
notifications.show({
message: 'EPG match is complete!',
message: event.data.message || 'EPG match is complete!',
color: 'green.5',
});
// fetchChannels();
fetchEPGData();
// Check if we have associations data and use the more efficient batch API
if (event.data.associations && event.data.associations.length > 0) {
API.batchSetEPG(event.data.associations);
} else {
// Fall back to legacy full refresh method
API.requeryChannels();
}
break;
case 'm3u_profile_test':

View file

@ -337,6 +337,11 @@ export default class API {
payload.stream_profile_id = null;
}
// Handle logo_id properly (0 means "no logo")
if (payload.logo_id === '0' || payload.logo_id === 0) {
payload.logo_id = null;
}
// Ensure tvg_id is included properly (not as empty string)
if (payload.tvg_id === '') {
payload.tvg_id = null;
@ -1134,7 +1139,7 @@ export default class API {
return response;
} catch (e) {
errorNotification('Failed to create channle profile', e);
errorNotification('Failed to create channel profile', e);
}
}
@ -1281,4 +1286,31 @@ export default class API {
throw e;
}
}
static async batchSetEPG(associations) {
try {
const response = await request(
`${host}/api/channels/channels/batch-set-epg/`,
{
method: 'POST',
body: { associations },
}
);
// If successful, requery channels to update UI
if (response.success) {
notifications.show({
title: 'EPG Association',
message: `Updated ${response.channels_updated} channels, refreshing ${response.programs_refreshed} EPG sources.`,
color: 'blue',
});
this.requeryChannels();
}
return response;
} catch (e) {
errorNotification('Failed to update channel EPGs', e);
}
}
}

View file

@ -67,7 +67,7 @@ const Sidebar = ({ collapsed, toggleDrawer, drawerWidth, miniDrawerWidth }) => {
const environment = useSettingsStore((s) => s.environment);
const isAuthenticated = useAuthStore((s) => s.isAuthenticated);
const publicIPRef = useRef(null);
const [appVersion, setAppVersion] = useState({ version: '', build: '' });
const [appVersion, setAppVersion] = useState({ version: '', timestamp: null });
// Fetch environment settings including version on component mount
useEffect(() => {
@ -89,7 +89,7 @@ const Sidebar = ({ collapsed, toggleDrawer, drawerWidth, miniDrawerWidth }) => {
const versionData = await API.getVersion();
setAppVersion({
version: versionData.version || '',
build: versionData.build || '',
timestamp: versionData.timestamp || null,
});
} catch (error) {
console.error('Failed to fetch version information:', error);
@ -266,7 +266,7 @@ const Sidebar = ({ collapsed, toggleDrawer, drawerWidth, miniDrawerWidth }) => {
{!collapsed && (
<Text size="xs" style={{ padding: '0 16px 16px' }} c="dimmed">
v{appVersion?.version || '0.0.0'}
{appVersion?.build !== '0' ? `-${appVersion?.build}` : ''}
{appVersion?.timestamp ? `-${appVersion.timestamp}` : ''}
</Text>
)}
</AppShell.Navbar>

View file

@ -1,6 +1,6 @@
import { Box, Flex } from '@mantine/core';
import CustomTableHeader from './CustomTableHeader';
import { useCallback, useState } from 'react';
import { useCallback, useState, useRef } from 'react';
import { flexRender } from '@tanstack/react-table';
import table from '../../../helpers/table';
import CustomTableBody from './CustomTableBody';
@ -11,7 +11,6 @@ const CustomTable = ({ table }) => {
className="divTable table-striped"
style={{
width: '100%',
// height: '100%', // ONLY required when using virtual tables
display: 'flex',
flexDirection: 'column',
}}

View file

@ -73,7 +73,7 @@ const CustomTableBody = ({
const renderTableBodyRow = (row, index, style = {}) => {
return (
<Box style={style}>
<Box style={style} key={`row-${row.id}`}>
<Box
key={`tr-${row.id}`}
className={`tr ${index % 2 == 0 ? 'tr-even' : 'tr-odd'}`}

View file

@ -7,7 +7,7 @@ import {
getCoreRowModel,
flexRender,
} from '@tanstack/react-table';
import { useCallback, useMemo, useState } from 'react';
import { useCallback, useMemo, useState, useEffect } from 'react';
import { ChevronDown, ChevronRight } from 'lucide-react';
const useTable = ({
@ -21,6 +21,64 @@ const useTable = ({
}) => {
const [selectedTableIds, setSelectedTableIds] = useState([]);
const [expandedRowIds, setExpandedRowIds] = useState([]);
const [lastClickedId, setLastClickedId] = useState(null);
const [isShiftKeyDown, setIsShiftKeyDown] = useState(false);
// Event handlers for shift key detection with improved handling
const handleKeyDown = useCallback((e) => {
if (e.key === 'Shift') {
setIsShiftKeyDown(true);
// Apply the class to disable text selection immediately
document.body.classList.add('shift-key-active');
// Set a style attribute directly on body for extra assurance
document.body.style.userSelect = 'none';
document.body.style.webkitUserSelect = 'none';
document.body.style.msUserSelect = 'none';
document.body.style.cursor = 'pointer';
}
}, []);
const handleKeyUp = useCallback((e) => {
if (e.key === 'Shift') {
setIsShiftKeyDown(false);
// Remove the class when shift is released
document.body.classList.remove('shift-key-active');
// Reset the style attributes
document.body.style.removeProperty('user-select');
document.body.style.removeProperty('-webkit-user-select');
document.body.style.removeProperty('-ms-user-select');
document.body.style.removeProperty('cursor');
}
}, []);
// Add global event listeners for shift key detection with improved cleanup
useEffect(() => {
window.addEventListener('keydown', handleKeyDown);
window.addEventListener('keyup', handleKeyUp);
// Also detect blur/focus events to handle cases where shift is held and window loses focus
window.addEventListener('blur', () => {
setIsShiftKeyDown(false);
document.body.classList.remove('shift-key-active');
document.body.style.removeProperty('user-select');
document.body.style.removeProperty('-webkit-user-select');
document.body.style.removeProperty('-ms-user-select');
document.body.style.removeProperty('cursor');
});
return () => {
window.removeEventListener('keydown', handleKeyDown);
window.removeEventListener('keyup', handleKeyUp);
window.removeEventListener('blur', () => {
setIsShiftKeyDown(false);
document.body.classList.remove('shift-key-active');
document.body.style.removeProperty('user-select');
document.body.style.removeProperty('-webkit-user-select');
document.body.style.removeProperty('-ms-user-select');
document.body.style.removeProperty('cursor');
});
};
}, [handleKeyDown, handleKeyUp]);
const rowCount = allRowIds.length;
@ -77,6 +135,34 @@ const useTable = ({
updateSelectedTableIds([row.original.id]);
};
// Handle the shift+click selection
const handleShiftSelect = (rowId, isShiftKey) => {
if (!isShiftKey || lastClickedId === null) {
// Normal selection behavior
setLastClickedId(rowId);
return false; // Return false to indicate we're not handling it
}
// Handle shift-click range selection
const currentIndex = allRowIds.indexOf(rowId);
const lastIndex = allRowIds.indexOf(lastClickedId);
if (currentIndex === -1 || lastIndex === -1) return false;
// Determine range
const startIndex = Math.min(currentIndex, lastIndex);
const endIndex = Math.max(currentIndex, lastIndex);
const rangeIds = allRowIds.slice(startIndex, endIndex + 1);
// Preserve existing selections outside the range
const idsOutsideRange = selectedTableIds.filter(id => !rangeIds.includes(id));
const newSelection = [...new Set([...rangeIds, ...idsOutsideRange])];
updateSelectedTableIds(newSelection);
setLastClickedId(rowId);
return true; // Return true to indicate we've handled it
};
const renderBodyCell = ({ row, cell }) => {
if (bodyCellRenderFns[cell.column.id]) {
return bodyCellRenderFns[cell.column.id]({ row, cell });
@ -91,13 +177,22 @@ const useTable = ({
size="xs"
checked={selectedTableIdsSet.has(row.original.id)}
onChange={(e) => {
const newSet = new Set(selectedTableIds);
if (e.target.checked) {
newSet.add(row.original.id);
} else {
newSet.delete(row.original.id);
const rowId = row.original.id;
// Get shift key state from the event
const isShiftKey = e.nativeEvent.shiftKey;
// Try to handle with shift-select logic first
if (!handleShiftSelect(rowId, isShiftKey)) {
// If not handled by shift-select, do regular toggle
const newSet = new Set(selectedTableIds);
if (e.target.checked) {
newSet.add(rowId);
} else {
newSet.delete(rowId);
}
updateSelectedTableIds([...newSet]);
}
updateSelectedTableIds([...newSet]);
}}
/>
</Center>
@ -137,8 +232,9 @@ const useTable = ({
expandedRowIds,
expandedRowRenderer,
setSelectedTableIds,
isShiftKeyDown, // Include shift key state in the table instance
}),
[selectedTableIdsSet, expandedRowIds, allRowIds]
[selectedTableIdsSet, expandedRowIds, allRowIds, isShiftKeyDown]
);
return {

View file

@ -1,94 +1,155 @@
* {
/* box-sizing: border-box; */
}
/* box-sizing: border-box; */
}
html {
font-family: sans-serif;
/* font-size: 14px; */
}
html {
font-family: sans-serif;
/* font-size: 14px; */
}
.divTable {
/* border: 1px solid lightgray; */
/* width: fit-content; */
/* display: flex;
.divTable {
/* border: 1px solid lightgray; */
/* width: fit-content; */
/* display: flex;
flex-direction: column; */
}
}
.tr {
display: flex;
}
.tr {
display: flex;
}
.table-striped .tbody .tr:hover {
background-color: rgb(68,68,68);
}
.table-striped .tbody .tr:hover {
background-color: rgb(68, 68, 68);
}
.tr {
/* width: fit-content;
.tr {
/* width: fit-content;
width: 100%; */
/* height: 30px; */
}
/* height: 30px; */
}
.th,
.td {
/* box-shadow: inset 0 0 0 1px lightgray; */
/* padding: 0.25rem; */
padding-left: 4px;
padding-right: 4px;
}
.th,
.td {
/* box-shadow: inset 0 0 0 1px lightgray; */
/* padding: 0.25rem; */
padding-left: 4px;
padding-right: 4px;
}
.th {
/* padding: 2px 4px; */
position: relative;
font-weight: bold;
text-align: center;
/* height: 30px; */
}
.th {
/* padding: 2px 4px; */
position: relative;
font-weight: bold;
text-align: center;
/* height: 30px; */
}
.td {
height: 28px;
border-bottom: solid 1px rgb(68,68,68);
}
.td {
height: 28px;
border-bottom: solid 1px rgb(68, 68, 68);
}
.resizer {
position: absolute;
top: 0;
height: 100%;
width: 5px;
background: rgba(0, 0, 0, 0.5);
cursor: col-resize;
user-select: none;
touch-action: none;
}
.resizer.ltr {
right: 0;
}
.resizer.rtl {
left: 0;
}
.resizer.isResizing {
background: blue;
opacity: 1;
}
@media (hover: hover) {
.resizer {
position: absolute;
top: 0;
height: 100%;
width: 5px;
background: rgba(0, 0, 0, 0.5);
cursor: col-resize;
user-select: none;
touch-action: none;
opacity: 0;
}
.resizer.ltr {
right: 0;
}
.resizer.rtl {
left: 0;
}
.resizer.isResizing {
background: blue;
*:hover>.resizer {
opacity: 1;
}
}
@media (hover: hover) {
.resizer {
opacity: 0;
}
/* .table-striped .tbody .tr:nth-child(odd), */
.table-striped .tbody .tr-odd {
background-color: #18181b;
}
*:hover > .resizer {
opacity: 1;
}
}
/* .table-striped .tbody .tr:nth-child(even), */
.table-striped .tbody .tr-even {
background-color: #27272A;
}
/* .table-striped .tbody .tr:nth-child(odd), */
.table-striped .tbody .tr-odd {
background-color: #18181b;
}
/* Prevent text selection when shift key is pressed */
.shift-key-active {
cursor: pointer !important;
}
/* .table-striped .tbody .tr:nth-child(even), */
.table-striped .tbody .tr-even {
background-color: #27272A;
}
.shift-key-active *,
.shift-key-active .tr,
.shift-key-active .td,
.shift-key-active .tbody {
user-select: none !important;
-webkit-user-select: none !important;
-moz-user-select: none !important;
-ms-user-select: none !important;
}
/* Always allow text selection in editable elements */
.shift-key-active input,
.shift-key-active textarea,
.shift-key-active [contenteditable="true"],
.shift-key-active .table-input-header input {
user-select: text !important;
-webkit-user-select: text !important;
-moz-user-select: text !important;
-ms-user-select: text !important;
cursor: text !important;
}
/* Improve specificity and ensure text selection is disabled when shift is pressed */
.shift-key-active,
.shift-key-active * {
user-select: none !important;
-webkit-user-select: none !important;
-moz-user-select: none !important;
-ms-user-select: none !important;
cursor: pointer !important;
}
/* Add a visual indicator when shift is pressed */
.shift-key-active .tbody .tr {
transition: background-color 0.1s;
}
.shift-key-active .tbody .tr:hover {
background-color: rgba(68, 68, 68, 0.7) !important;
}
/* Always allow text selection in inputs even when shift is pressed */
.shift-key-active input,
.shift-key-active textarea,
.shift-key-active [contenteditable="true"],
.shift-key-active select,
.shift-key-active .mantine-Select-input,
.shift-key-active .mantine-MultiSelect-input,
.shift-key-active .table-input-header input {
user-select: text !important;
-webkit-user-select: text !important;
-moz-user-select: text !important;
-ms-user-select: text !important;
cursor: text !important;
}

View file

@ -34,6 +34,7 @@ import duration from 'dayjs/plugin/duration';
import relativeTime from 'dayjs/plugin/relativeTime';
import { Sparkline } from '@mantine/charts';
import useStreamProfilesStore from '../store/streamProfiles';
import usePlaylistsStore from '../store/playlists'; // Add this import
import { useLocation } from 'react-router-dom';
import { notifications } from '@mantine/notifications';
@ -82,12 +83,39 @@ const ChannelCard = ({ channel, clients, stopClient, stopChannel, logos, channel
const [availableStreams, setAvailableStreams] = useState([]);
const [isLoadingStreams, setIsLoadingStreams] = useState(false);
const [activeStreamId, setActiveStreamId] = useState(null);
const [currentM3UProfile, setCurrentM3UProfile] = useState(null); // Add state for current M3U profile
// Get M3U account data from the playlists store
const m3uAccounts = usePlaylistsStore((s) => s.playlists);
// Create a map of M3U account IDs to names for quick lookup
const m3uAccountsMap = useMemo(() => {
const map = {};
if (m3uAccounts && Array.isArray(m3uAccounts)) {
m3uAccounts.forEach(account => {
if (account.id) {
map[account.id] = account.name;
}
});
}
return map;
}, [m3uAccounts]);
// Safety check - if channel doesn't have required data, don't render
if (!channel || !channel.channel_id) {
return null;
}
// Update M3U profile information when channel data changes
useEffect(() => {
// If the channel data includes M3U profile information, update our state
if (channel.m3u_profile || channel.m3u_profile_name) {
setCurrentM3UProfile({
name: channel.m3u_profile?.name || channel.m3u_profile_name || 'Default M3U'
});
}
}, [channel.m3u_profile, channel.m3u_profile_name, channel.stream_id]);
// Fetch available streams for this channel
useEffect(() => {
const fetchStreams = async () => {
@ -110,6 +138,11 @@ const ChannelCard = ({ channel, clients, stopClient, stopChannel, logos, channel
if (matchingStream) {
setActiveStreamId(matchingStream.id.toString());
// If the stream has M3U profile info, save it
if (matchingStream.m3u_profile) {
setCurrentM3UProfile(matchingStream.m3u_profile);
}
}
}
}
@ -138,6 +171,14 @@ const ChannelCard = ({ channel, clients, stopClient, stopChannel, logos, channel
// Update the local active stream ID immediately
setActiveStreamId(streamId);
// Update M3U profile information if available in the response
if (response && response.m3u_profile) {
setCurrentM3UProfile(response.m3u_profile);
} else if (selectedStream && selectedStream.m3u_profile) {
// Fallback to the profile from the selected stream
setCurrentM3UProfile(selectedStream.m3u_profile);
}
// Show detailed notification with stream name
notifications.show({
title: 'Stream switching',
@ -152,6 +193,12 @@ const ChannelCard = ({ channel, clients, stopClient, stopChannel, logos, channel
if (channelId) {
const updatedStreamData = await API.getChannelStreams(channelId);
console.log("Channel streams after switch:", updatedStreamData);
// Update current stream information with fresh data
const updatedStream = updatedStreamData.find(s => s.id.toString() === streamId);
if (updatedStream && updatedStream.m3u_profile) {
setCurrentM3UProfile(updatedStream.m3u_profile);
}
}
} catch (error) {
console.error("Error checking streams after switch:", error);
@ -269,7 +316,14 @@ const ChannelCard = ({ channel, clients, stopClient, stopChannel, logos, channel
</Center>
</Box>
),
renderDetailPanel: ({ row }) => <Box>{row.original.user_agent}</Box>,
renderDetailPanel: ({ row }) => (
<Box p="xs">
<Group spacing="xs" align="flex-start">
<Text size="xs" fw={500} color="dimmed">User Agent:</Text>
<Text size="xs">{row.original.user_agent || "Unknown"}</Text>
</Group>
</Box>
),
mantineExpandButtonProps: ({ row, table }) => ({
size: 'xs',
style: {
@ -305,11 +359,26 @@ const ChannelCard = ({ channel, clients, stopClient, stopChannel, logos, channel
const avgBitrate = channel.avg_bitrate || '0 Kbps';
const streamProfileName = channel.stream_profile?.name || 'Unknown Profile';
// Use currentM3UProfile if available, otherwise fall back to channel data
const m3uProfileName = currentM3UProfile?.name ||
channel.m3u_profile?.name ||
channel.m3u_profile_name ||
'Unknown M3U Profile';
// Create select options for available streams
const streamOptions = availableStreams.map(stream => ({
value: stream.id.toString(),
label: `${stream.name || `Stream #${stream.id}`}`, // Make sure stream name is clear
}));
const streamOptions = availableStreams.map(stream => {
// Get account name from our mapping if it exists
const accountName = stream.m3u_account && m3uAccountsMap[stream.m3u_account]
? m3uAccountsMap[stream.m3u_account]
: stream.m3u_account
? `M3U #${stream.m3u_account}`
: 'Unknown M3U';
return {
value: stream.id.toString(),
label: `${stream.name || `Stream #${stream.id}`} [${accountName}]`,
};
});
return (
<Card
@ -325,12 +394,23 @@ const ChannelCard = ({ channel, clients, stopClient, stopChannel, logos, channel
>
<Stack style={{ position: 'relative' }}>
<Group justify="space-between">
<img
src={logoUrl || logo}
width="100"
style={{ maxHeight: '50px', objectFit: 'contain' }}
alt="channel logo"
/>
<Box style={{
width: '100px',
height: '50px',
display: 'flex',
alignItems: 'center',
justifyContent: 'center'
}}>
<img
src={logoUrl || logo}
style={{
maxWidth: '100%',
maxHeight: '100%',
objectFit: 'contain'
}}
alt="channel logo"
/>
</Box>
<Group>
<Box>
@ -360,9 +440,21 @@ const ChannelCard = ({ channel, clients, stopClient, stopChannel, logos, channel
<Text fw={500}>{channelName}</Text>
</Group>
<Tooltip label="Active Stream Profile">
<Group gap={5}>
<Video size="18" />
{streamProfileName}
</Group>
</Tooltip>
</Flex>
{/* Display M3U profile information */}
<Flex justify="flex-end" align="center" mt={-8}>
<Group gap={5}>
<Video size="18" />
{streamProfileName}
<HardDriveUpload size="18" />
<Tooltip label="Current M3U Profile">
<Text size="xs">{m3uProfileName}</Text>
</Tooltip>
</Group>
</Flex>

View file

@ -34,7 +34,7 @@ def process_data(input_data):
channels = input_data["channels"]
epg_data = input_data["epg_data"]
region_code = input_data["region_code"]
region_code = input_data.get("region_code", None)
epg_embeddings = None
if any(row["norm_name"] for row in epg_data):
@ -47,6 +47,21 @@ def process_data(input_data):
matched_channels = []
for chan in channels:
normalized_tvg_id = chan.get("tvg_id", "")
fallback_name = chan["tvg_id"].strip() if chan["tvg_id"] else chan["name"]
# Exact TVG ID match (direct match)
epg_by_tvg_id = next((epg for epg in epg_data if epg["tvg_id"] == normalized_tvg_id), None)
if normalized_tvg_id and epg_by_tvg_id:
chan["epg_data_id"] = epg_by_tvg_id["id"]
channels_to_update.append(chan)
# Add to matched_channels list so it's counted in the total
matched_channels.append((chan['id'], fallback_name, epg_by_tvg_id["tvg_id"]))
eprint(f"Channel {chan['id']} '{fallback_name}' => EPG found by tvg_id={epg_by_tvg_id['tvg_id']}")
continue
# If channel has a tvg_id that doesn't exist in EPGData, do direct check.
# I don't THINK this should happen now that we assign EPG on channel creation.
if chan["tvg_id"]:
@ -59,7 +74,6 @@ def process_data(input_data):
continue
# C) Perform name-based fuzzy matching
fallback_name = chan["tvg_id"].strip() if chan["tvg_id"] else chan["name"]
if not chan["norm_chan"]:
eprint(f"Channel {chan['id']} '{chan['name']}' => empty after normalization, skipping")
continue

View file

@ -2,4 +2,4 @@
Dispatcharr version information.
"""
__version__ = '0.3.3' # Follow semantic versioning (MAJOR.MINOR.PATCH)
__build__ = '27' # Auto-incremented on builds
__timestamp__ = None # Set during CI/CD build process