This commit is contained in:
SergeantPanda 2025-03-02 14:56:41 -06:00
commit 94f7f5e630
29 changed files with 3464 additions and 330 deletions

View file

@ -1,4 +1,4 @@
# Generated by Django 5.1.6 on 2025-03-02 00:01
# Generated by Django 5.1.6 on 2025-03-02 13:52
import django.contrib.auth.models
import django.contrib.auth.validators

View file

@ -9,6 +9,8 @@ from django.shortcuts import get_object_or_404
from .models import Stream, Channel, ChannelGroup
from .serializers import StreamSerializer, ChannelSerializer, ChannelGroupSerializer
from .tasks import match_epg_channels
# ─────────────────────────────────────────────────────────
# 1) Stream API (CRUD)
@ -30,6 +32,7 @@ class StreamViewSet(viewsets.ModelViewSet):
qs = qs.filter(channels__isnull=True)
return qs
# ─────────────────────────────────────────────────────────
# 2) Channel Group Management (CRUD)
# ─────────────────────────────────────────────────────────
@ -38,6 +41,7 @@ class ChannelGroupViewSet(viewsets.ModelViewSet):
serializer_class = ChannelGroupSerializer
permission_classes = [IsAuthenticated]
# ─────────────────────────────────────────────────────────
# 3) Channel Management (CRUD)
# ─────────────────────────────────────────────────────────
@ -131,6 +135,7 @@ class ChannelViewSet(viewsets.ModelViewSet):
'tvg_id': stream.tvg_id,
'channel_group_id': channel_group.id,
'logo_url': stream.logo_url,
'streams': [stream_id]
}
serializer = self.get_serializer(data=channel_data)
serializer.is_valid(raise_exception=True)
@ -178,6 +183,7 @@ class ChannelViewSet(viewsets.ModelViewSet):
# Gather current used numbers once.
used_numbers = set(Channel.objects.all().values_list('channel_number', flat=True))
next_number = 1
def get_auto_number():
nonlocal next_number
while next_number in used_numbers:
@ -221,6 +227,7 @@ class ChannelViewSet(viewsets.ModelViewSet):
"tvg_id": stream.tvg_id,
"channel_group_id": channel_group.id,
"logo_url": stream.logo_url,
"streams": [stream_id],
}
serializer = self.get_serializer(data=channel_data)
if serializer.is_valid():
@ -236,6 +243,20 @@ class ChannelViewSet(viewsets.ModelViewSet):
return Response(response_data, status=status.HTTP_201_CREATED)
# ─────────────────────────────────────────────────────────
# 6) EPG Fuzzy Matching
# ─────────────────────────────────────────────────────────
@swagger_auto_schema(
method='post',
operation_description="Kick off a Celery task that tries to fuzzy-match channels with EPG data.",
responses={202: "EPG matching task initiated"}
)
@action(detail=False, methods=['post'], url_path='match-epg')
def match_epg(self, request):
match_epg_channels.delay()
return Response({"message": "EPG matching task initiated."}, status=status.HTTP_202_ACCEPTED)
# ─────────────────────────────────────────────────────────
# 4) Bulk Delete Streams
# ─────────────────────────────────────────────────────────
@ -262,6 +283,7 @@ class BulkDeleteStreamsAPIView(APIView):
Stream.objects.filter(id__in=stream_ids).delete()
return Response({"message": "Streams deleted successfully!"}, status=status.HTTP_204_NO_CONTENT)
# ─────────────────────────────────────────────────────────
# 5) Bulk Delete Channels
# ─────────────────────────────────────────────────────────

View file

@ -1,4 +1,4 @@
# Generated by Django 5.1.6 on 2025-03-02 00:01
# Generated by Django 5.1.6 on 2025-03-02 13:52
import django.db.models.deletion
from django.db import migrations, models
@ -21,6 +21,20 @@ class Migration(migrations.Migration):
('name', models.CharField(max_length=100, unique=True)),
],
),
migrations.CreateModel(
name='Channel',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('channel_number', models.IntegerField()),
('channel_name', models.CharField(max_length=255)),
('logo_url', models.URLField(blank=True, max_length=2000, null=True)),
('logo_file', models.ImageField(blank=True, null=True, upload_to='logos/')),
('tvg_id', models.CharField(blank=True, max_length=255, null=True)),
('tvg_name', models.CharField(blank=True, max_length=255, null=True)),
('stream_profile', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='core.streamprofile')),
('channel_group', models.ForeignKey(blank=True, help_text='Channel group this channel belongs to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='channels.channelgroup')),
],
),
migrations.CreateModel(
name='Stream',
fields=[
@ -44,18 +58,20 @@ class Migration(migrations.Migration):
},
),
migrations.CreateModel(
name='Channel',
name='ChannelStream',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('channel_number', models.IntegerField()),
('channel_name', models.CharField(max_length=255)),
('logo_url', models.URLField(blank=True, max_length=2000, null=True)),
('logo_file', models.ImageField(blank=True, null=True, upload_to='logos/')),
('tvg_id', models.CharField(blank=True, max_length=255, null=True)),
('tvg_name', models.CharField(blank=True, max_length=255, null=True)),
('stream_profile', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='core.streamprofile')),
('channel_group', models.ForeignKey(blank=True, help_text='Channel group this channel belongs to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='channels', to='channels.channelgroup')),
('streams', models.ManyToManyField(blank=True, related_name='channels', to='channels.stream')),
('order', models.PositiveIntegerField(default=0)),
('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='channels.channel')),
('stream', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='channels.stream')),
],
options={
'ordering': ['order'],
},
),
migrations.AddField(
model_name='channel',
name='streams',
field=models.ManyToManyField(blank=True, related_name='channels', through='channels.ChannelStream', to='channels.stream'),
),
]

View file

@ -61,6 +61,7 @@ class Channel(models.Model):
streams = models.ManyToManyField(
Stream,
blank=True,
through='ChannelStream',
related_name='channels'
)
@ -84,7 +85,7 @@ class Channel(models.Model):
related_name='channels'
)
def clean(self):
# Enforce unique channel_number within a given group
existing = Channel.objects.filter(
@ -109,3 +110,11 @@ class ChannelGroup(models.Model):
def __str__(self):
return self.name
class ChannelStream(models.Model):
channel = models.ForeignKey(Channel, on_delete=models.CASCADE)
stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
order = models.PositiveIntegerField(default=0) # Ordering field
class Meta:
ordering = ['order'] # Ensure streams are retrieved in order

View file

@ -1,5 +1,5 @@
from rest_framework import serializers
from .models import Stream, Channel, ChannelGroup
from .models import Stream, Channel, ChannelGroup, ChannelStream
from core.models import StreamProfile
#
@ -73,8 +73,10 @@ class ChannelSerializer(serializers.ModelSerializer):
required=False
)
# Possibly show streams inline, or just by ID
# streams = StreamSerializer(many=True, read_only=True)
streams = serializers.ListField(
child=serializers.IntegerField(), write_only=True
)
stream_ids = serializers.SerializerMethodField()
class Meta:
model = Channel
@ -89,5 +91,39 @@ class ChannelSerializer(serializers.ModelSerializer):
'tvg_id',
'tvg_name',
'streams',
'stream_ids',
'stream_profile_id',
]
def get_stream_ids(self, obj):
"""Retrieve ordered stream IDs for GET requests."""
return list(obj.streams.all().order_by('channelstream__order').values_list('id', flat=True))
def create(self, validated_data):
stream_ids = validated_data.pop('streams', [])
channel = Channel.objects.create(**validated_data)
# Add streams in the specified order
for index, stream_id in enumerate(stream_ids):
ChannelStream.objects.create(channel=channel, stream_id=stream_id, order=index)
return channel
def update(self, instance, validated_data):
print("Validated Data:", validated_data)
stream_ids = validated_data.get('streams', None)
print(f'stream ids: {stream_ids}')
# Update basic fields
instance.name = validated_data.get('channel_name', instance.channel_name)
instance.save()
if stream_ids is not None:
# Clear existing relationships
instance.channelstream_set.all().delete()
# Add new streams in order
for index, stream_id in enumerate(stream_ids):
ChannelStream.objects.create(channel=instance, stream_id=stream_id, order=index)
return instance

207
apps/channels/tasks.py Normal file
View file

@ -0,0 +1,207 @@
# apps/channels/tasks.py
import logging
import re
from celery import shared_task
from rapidfuzz import fuzz
from sentence_transformers import SentenceTransformer, util
from django.db import transaction
from apps.channels.models import Channel
from apps.epg.models import EPGData
from core.models import CoreSettings # to retrieve "preferred-region" setting
logger = logging.getLogger(__name__)
# Load the model once at module level
SENTENCE_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
st_model = SentenceTransformer(SENTENCE_MODEL_NAME)
# Threshold constants
BEST_FUZZY_THRESHOLD = 70
LOWER_FUZZY_THRESHOLD = 40
EMBED_SIM_THRESHOLD = 0.65
# Common extraneous words
COMMON_EXTRANEOUS_WORDS = [
"tv", "channel", "network", "television",
"east", "west", "hd", "uhd", "us", "usa", "not", "24/7",
"1080p", "720p", "540p", "480p",
"arabic", "latino", "film", "movie", "movies"
]
def normalize_channel_name(name: str) -> str:
"""
A more aggressive normalization that:
- Lowercases
- Removes bracketed/parenthesized text
- Removes punctuation
- Strips extraneous words
- Collapses extra spaces
"""
if not name:
return ""
# Lowercase
norm = name.lower()
# Remove bracketed text
norm = re.sub(r"\[.*?\]", "", norm)
norm = re.sub(r"\(.*?\)", "", norm)
# Remove punctuation except word chars/spaces
norm = re.sub(r"[^\w\s]", "", norm)
# Remove extraneous tokens
tokens = norm.split()
tokens = [t for t in tokens if t not in COMMON_EXTRANEOUS_WORDS]
# Rejoin
norm = " ".join(tokens).strip()
return norm
@shared_task
def match_epg_channels():
"""
Goes through all Channels and tries to find a matching EPGData row by:
1) If channel.tvg_id is valid in EPGData, skip
2) If channel has a tvg_id but not found in EPGData, attempt direct EPGData lookup
3) Otherwise do name-based fuzzy ratio pass:
- add region-based bonus if region code is found in the EPG row
- if fuzzy >= BEST_FUZZY_THRESHOLD => accept
- if fuzzy in [LOWER_FUZZY_THRESHOLD..BEST_FUZZY_THRESHOLD) => do embedding check
- else skip
4) Log summary
"""
logger.info("Starting EPG matching logic...")
# Try to get user's preferred region from CoreSettings
try:
region_obj = CoreSettings.objects.get(key="preferred-region")
region_code = region_obj.value.strip().lower() # e.g. "us"
except CoreSettings.DoesNotExist:
region_code = None
# 1) Gather EPG rows
all_epg = list(EPGData.objects.all())
epg_rows = []
for e in all_epg:
epg_rows.append({
"epg_id": e.id,
"tvg_id": e.tvg_id or "", # e.g. "Fox News.us"
"raw_name": e.channel_name,
"norm_name": normalize_channel_name(e.channel_name),
})
# 2) Pre-encode embeddings if possible
epg_embeddings = None
if any(row["norm_name"] for row in epg_rows):
epg_embeddings = st_model.encode(
[row["norm_name"] for row in epg_rows],
convert_to_tensor=True
)
matched_channels = []
with transaction.atomic():
for chan in Channel.objects.all():
# A) Skip if channel.tvg_id is valid
if chan.tvg_id and EPGData.objects.filter(tvg_id=chan.tvg_id).exists():
continue
# B) If channel has a tvg_id but not in EPG, do direct lookup
if chan.tvg_id:
epg_match = EPGData.objects.filter(tvg_id=chan.tvg_id).first()
if epg_match:
logger.info(
f"Channel {chan.id} '{chan.channel_name}' => found EPG by tvg_id={chan.tvg_id}"
)
continue
# C) No valid tvg_id => name-based matching
fallback_name = chan.tvg_name.strip() if chan.tvg_name else chan.channel_name
norm_chan = normalize_channel_name(fallback_name)
if not norm_chan:
logger.info(
f"Channel {chan.id} '{chan.channel_name}' => empty after normalization, skipping"
)
continue
best_score = 0
best_epg = None
for row in epg_rows:
if not row["norm_name"]:
continue
# Base fuzzy ratio
base_score = fuzz.ratio(norm_chan, row["norm_name"])
# If we have a region_code, add a small bonus if the epg row has that region
# e.g. tvg_id or raw_name might contain ".us" or "us"
bonus = 0
if region_code:
# example: if region_code is "us" and row["tvg_id"] ends with ".us"
# or row["raw_name"] has "us" in it, etc.
# We'll do a naive check:
combined_text = row["tvg_id"].lower() + " " + row["raw_name"].lower()
if region_code in combined_text:
bonus = 15 # pick a small bonus
score = base_score + bonus
if score > best_score:
best_score = score
best_epg = row
if not best_epg:
logger.info(f"Channel {chan.id} '{fallback_name}' => no EPG match at all.")
continue
# E) Decide acceptance
if best_score >= BEST_FUZZY_THRESHOLD:
# Accept
chan.tvg_id = best_epg["tvg_id"]
chan.save()
matched_channels.append((chan.id, fallback_name, best_epg["tvg_id"]))
logger.info(
f"Channel {chan.id} '{fallback_name}' => matched tvg_id={best_epg['tvg_id']} (score={best_score})"
)
elif best_score >= LOWER_FUZZY_THRESHOLD and epg_embeddings is not None:
# borderline => do embedding
chan_embedding = st_model.encode(norm_chan, convert_to_tensor=True)
sim_scores = util.cos_sim(chan_embedding, epg_embeddings)[0]
top_index = int(sim_scores.argmax())
top_value = float(sim_scores[top_index])
if top_value >= EMBED_SIM_THRESHOLD:
matched_epg = epg_rows[top_index]
chan.tvg_id = matched_epg["tvg_id"]
chan.save()
matched_channels.append((chan.id, fallback_name, matched_epg["tvg_id"]))
logger.info(
f"Channel {chan.id} '{fallback_name}' => matched EPG tvg_id={matched_epg['tvg_id']} "
f"(fuzzy={best_score}, cos-sim={top_value:.2f})"
)
else:
logger.info(
f"Channel {chan.id} '{fallback_name}' => fuzzy={best_score}, "
f"cos-sim={top_value:.2f} < {EMBED_SIM_THRESHOLD}, skipping"
)
else:
# no match
logger.info(
f"Channel {chan.id} '{fallback_name}' => fuzzy={best_score} < {LOWER_FUZZY_THRESHOLD}, skipping"
)
# Final summary
total_matched = len(matched_channels)
if total_matched:
logger.info(f"Match Summary: {total_matched} channel(s) matched.")
for (cid, cname, tvg) in matched_channels:
logger.info(f" - Channel ID={cid}, Name='{cname}' => tvg_id='{tvg}'")
else:
logger.info("No new channels were matched.")
logger.info("Finished EPG matching logic.")
return f"Done. Matched {total_matched} channel(s)."

View file

@ -1,4 +1,4 @@
# Generated by Django 5.1.6 on 2025-03-02 00:01
# Generated by Django 5.1.6 on 2025-03-02 13:52
from django.db import migrations, models

View file

@ -1,4 +1,4 @@
# Generated by Django 5.1.6 on 2025-03-02 00:01
# Generated by Django 5.1.6 on 2025-03-02 13:52
import django.db.models.deletion
from django.db import migrations, models

View file

@ -1,4 +1,4 @@
# Generated by Django 5.1.6 on 2025-03-02 00:01
# Generated by Django 5.1.6 on 2025-03-02 13:52
from django.db import migrations, models

View file

@ -1,4 +1,4 @@
# Generated by Django 5.1.6 on 2025-03-02 00:01
# Generated by Django 5.1.6 on 2025-03-02 13:52
import django.db.models.deletion
from django.db import migrations, models

1227
apps/proxy/hls_proxy Normal file

File diff suppressed because it is too large Load diff

323
apps/proxy/ts_proxy Normal file
View file

@ -0,0 +1,323 @@
"""
Transport Stream (TS) Proxy Server
Handles live TS stream proxying with support for:
- Stream switching
- Buffer management
- Multiple client connections
- Connection state tracking
"""
from flask import Flask, Response, request, jsonify
import requests
import threading
import logging
from collections import deque
import time
import os
from typing import Optional, Set, Deque, Dict
# Configuration
class Config:
CHUNK_SIZE: int = 8192 # Buffer chunk size (bytes)
BUFFER_SIZE: int = 1000 # Number of chunks to keep in memory
RECONNECT_DELAY: int = 5 # Seconds between reconnection attempts
CLIENT_POLL_INTERVAL: float = 0.1 # Seconds between client buffer checks
MAX_RETRIES: int = 3 # Maximum connection retry attempts
DEFAULT_USER_AGENT: str = 'VLC/3.0.20 LibVLC/3.0.20' # Default user agent
class StreamManager:
"""Manages TS stream state and connection handling"""
def __init__(self, initial_url: str, channel_id: str, user_agent: Optional[str] = None):
self.current_url: str = initial_url
self.channel_id: str = channel_id
self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT
self.url_changed: threading.Event = threading.Event()
self.running: bool = True
self.session: requests.Session = self._create_session()
self.connected: bool = False
self.retry_count: int = 0
logging.info(f"Initialized stream manager for channel {channel_id}")
def _create_session(self) -> requests.Session:
"""Create and configure requests session"""
session = requests.Session()
session.headers.update({
'User-Agent': self.user_agent,
'Connection': 'keep-alive'
})
return session
def update_url(self, new_url: str) -> bool:
"""Update stream URL and signal connection change"""
if new_url != self.current_url:
logging.info(f"Stream switch initiated: {self.current_url} -> {new_url}")
self.current_url = new_url
self.connected = False
self.url_changed.set()
return True
return False
def should_retry(self) -> bool:
"""Check if connection retry is allowed"""
return self.retry_count < Config.MAX_RETRIES
def stop(self) -> None:
"""Clean shutdown of stream manager"""
self.running = False
if self.session:
self.session.close()
class StreamBuffer:
"""Manages stream data buffering"""
def __init__(self):
self.buffer: Deque[bytes] = deque(maxlen=Config.BUFFER_SIZE)
self.lock: threading.Lock = threading.Lock()
self.index: int = 0
class ClientManager:
"""Manages active client connections"""
def __init__(self):
self.active_clients: Set[int] = set()
self.lock: threading.Lock = threading.Lock()
def add_client(self, client_id: int) -> None:
"""Add new client connection"""
with self.lock:
self.active_clients.add(client_id)
logging.info(f"New client connected: {client_id} (total: {len(self.active_clients)})")
def remove_client(self, client_id: int) -> int:
"""Remove client and return remaining count"""
with self.lock:
self.active_clients.remove(client_id)
remaining = len(self.active_clients)
logging.info(f"Client disconnected: {client_id} (remaining: {remaining})")
return remaining
class StreamFetcher:
"""Handles stream data fetching"""
def __init__(self, manager: StreamManager, buffer: StreamBuffer):
self.manager = manager
self.buffer = buffer
def fetch_loop(self) -> None:
"""Main fetch loop for stream data"""
while self.manager.running:
try:
if not self._handle_connection():
continue
with self.manager.session.get(self.manager.current_url, stream=True) as response:
if response.status_code == 200:
self._handle_successful_connection()
self._process_stream(response)
except requests.exceptions.RequestException as e:
self._handle_connection_error(e)
def _handle_connection(self) -> bool:
"""Handle connection state and retries"""
if not self.manager.connected:
if not self.manager.should_retry():
logging.error(f"Failed to connect after {Config.MAX_RETRIES} attempts")
return False
if not self.manager.running:
return False
self.manager.retry_count += 1
logging.info(f"Connecting to stream: {self.manager.current_url} "
f"(attempt {self.manager.retry_count}/{Config.MAX_RETRIES})")
return True
def _handle_successful_connection(self) -> None:
"""Handle successful stream connection"""
if not self.manager.connected:
logging.info("Stream connected successfully")
self.manager.connected = True
self.manager.retry_count = 0
def _process_stream(self, response: requests.Response) -> None:
"""Process incoming stream data"""
for chunk in response.iter_content(chunk_size=Config.CHUNK_SIZE):
if not self.manager.running:
logging.info("Stream fetch stopped - shutting down")
return
if chunk:
if self.manager.url_changed.is_set():
logging.info("Stream switch in progress, closing connection")
self.manager.url_changed.clear()
break
with self.buffer.lock:
self.buffer.buffer.append(chunk)
self.buffer.index += 1
def _handle_connection_error(self, error: Exception) -> None:
"""Handle stream connection errors"""
logging.error(f"Stream connection error: {error}")
self.manager.connected = False
if not self.manager.running:
return
logging.info(f"Attempting to reconnect in {Config.RECONNECT_DELAY} seconds...")
if not wait_for_running(self.manager, Config.RECONNECT_DELAY):
return
def wait_for_running(manager: StreamManager, delay: float) -> bool:
"""Wait while checking manager running state"""
start = time.time()
while time.time() - start < delay:
if not manager.running:
return False
threading.Event().wait(0.1)
return True
class ProxyServer:
"""Manages TS proxy server instance"""
def __init__(self, user_agent: Optional[str] = None):
self.app = Flask(__name__)
self.stream_managers: Dict[str, StreamManager] = {}
self.stream_buffers: Dict[str, StreamBuffer] = {}
self.client_managers: Dict[str, ClientManager] = {}
self.fetch_threads: Dict[str, threading.Thread] = {}
self.user_agent: str = user_agent or Config.DEFAULT_USER_AGENT
self._setup_routes()
def _setup_routes(self) -> None:
"""Configure Flask routes"""
self.app.route('/stream/<channel_id>')(self.stream_endpoint)
self.app.route('/change_stream/<channel_id>', methods=['POST'])(self.change_stream)
def initialize_channel(self, url: str, channel_id: str) -> None:
"""Initialize a new channel stream"""
if channel_id in self.stream_managers:
self.stop_channel(channel_id)
self.stream_managers[channel_id] = StreamManager(
url,
channel_id,
user_agent=self.user_agent
)
self.stream_buffers[channel_id] = StreamBuffer()
self.client_managers[channel_id] = ClientManager()
fetcher = StreamFetcher(
self.stream_managers[channel_id],
self.stream_buffers[channel_id]
)
self.fetch_threads[channel_id] = threading.Thread(
target=fetcher.fetch_loop,
name=f"StreamFetcher-{channel_id}",
daemon=True
)
self.fetch_threads[channel_id].start()
logging.info(f"Initialized channel {channel_id} with URL {url}")
def stop_channel(self, channel_id: str) -> None:
"""Stop and cleanup a channel"""
if channel_id in self.stream_managers:
self.stream_managers[channel_id].stop()
if channel_id in self.fetch_threads:
self.fetch_threads[channel_id].join(timeout=5)
self._cleanup_channel(channel_id)
def _cleanup_channel(self, channel_id: str) -> None:
"""Remove channel resources"""
for collection in [self.stream_managers, self.stream_buffers,
self.client_managers, self.fetch_threads]:
collection.pop(channel_id, None)
def stream_endpoint(self, channel_id: str):
"""Stream endpoint that serves TS data to clients"""
if channel_id not in self.stream_managers:
return Response('Channel not found', status=404)
def generate():
client_id = threading.get_ident()
buffer = self.stream_buffers[channel_id]
client_manager = self.client_managers[channel_id]
client_manager.add_client(client_id)
last_index = buffer.index
try:
while True:
with buffer.lock:
if buffer.index > last_index:
chunks_behind = buffer.index - last_index
start_pos = max(0, len(buffer.buffer) - chunks_behind)
for i in range(start_pos, len(buffer.buffer)):
yield buffer.buffer[i]
last_index = buffer.index
threading.Event().wait(Config.CLIENT_POLL_INTERVAL)
except GeneratorExit:
remaining = client_manager.remove_client(client_id)
if remaining == 0:
logging.info(f"No clients remaining for channel {channel_id}")
self.stop_channel(channel_id)
return Response(generate(), content_type='video/mp2t')
def change_stream(self, channel_id: str):
"""Handle stream URL changes"""
if channel_id not in self.stream_managers:
return jsonify({'error': 'Channel not found'}), 404
new_url = request.json.get('url')
if not new_url:
return jsonify({'error': 'No URL provided'}), 400
manager = self.stream_managers[channel_id]
if manager.update_url(new_url):
return jsonify({
'message': 'Stream URL updated',
'channel': channel_id,
'url': new_url
})
return jsonify({
'message': 'URL unchanged',
'channel': channel_id,
'url': new_url
})
def run(self, host: str = '0.0.0.0', port: int = 5000) -> None:
"""Start the proxy server"""
self.app.run(host=host, port=port, threaded=True)
def shutdown(self) -> None:
"""Stop all channels and cleanup"""
for channel_id in list(self.stream_managers.keys()):
self.stop_channel(channel_id)
def main():
"""Initialize and start the proxy server"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.getLogger('werkzeug').setLevel(logging.DEBUG)
proxy_server = ProxyServer()
initial_url = os.getenv('STREAM_URL', 'http://example.com/stream.ts')
proxy_server.initialize_channel(initial_url, "default_channel")
try:
proxy_server.run()
finally:
proxy_server.shutdown()
if __name__ == '__main__':
main()

View file

@ -1,4 +1,4 @@
# Generated by Django 5.1.6 on 2025-03-02 00:01
# Generated by Django 5.1.6 on 2025-03-02 13:52
from django.db import migrations, models

View file

@ -32,10 +32,16 @@ def settings_view(request):
def stream_view(request, stream_id):
"""
Streams the first available stream for the given channel.
It uses the channels assigned StreamProfile.
It uses the channels assigned StreamProfile with a fallback to core default
A persistent Redis lock is used to prevent concurrent streaming on the same channel.
Priority:
- iterate through all streams
- iterate through each stream's m3u profile
"""
try:
redis_host = getattr(settings, "REDIS_HOST", "localhost")
redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0)
# Retrieve the channel by the provided stream_id.
channel = Channel.objects.get(channel_number=stream_id)
logger.debug("Channel retrieved: ID=%s, Name=%s", channel.id, channel.channel_name)
@ -45,46 +51,78 @@ def stream_view(request, stream_id):
logger.error("No streams found for channel ID=%s", channel.id)
return HttpResponseServerError("No stream found for this channel.")
# Get the first available stream.
stream = channel.streams.first()
logger.debug("Using stream: ID=%s, Name=%s", stream.id, stream.name)
# Retrieve the M3U account associated with the stream.
m3u_account = stream.m3u_account
logger.debug("Using M3U account ID=%s, Name=%s", m3u_account.id, m3u_account.name)
# Use the custom URL if available; otherwise, use the standard URL.
input_url = stream.custom_url or stream.url
logger.debug("Input URL: %s", input_url)
# Determine which profile we can use.
m3u_profiles = m3u_account.profiles.all()
default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
profiles = [obj for obj in m3u_profiles if not obj.is_default]
active_profile = None
# -- Loop through profiles and pick the first active one --
for profile in [default_profile] + profiles:
logger.debug(f'Checking profile {profile.name}...')
if not profile.is_active:
logger.debug('Profile is not active, skipping.')
lock_key = None
persistent_lock = None
# iterate through channel's streams
for stream in channel.streams.all().order_by('channelstream__order'):
logger.debug(f"Checking stream: ID={stream.id}, Name={stream.name}")
# Retrieve the M3U account associated with the stream.
m3u_account = stream.m3u_account
logger.debug(f"Using M3U account ID={m3u_account.id}, Name={m3u_account.name}")
# Use the custom URL if available; otherwise, use the standard URL.
input_url = stream.custom_url or stream.url
logger.debug(f"Input URL: {input_url}")
# Determine which profile we can use.
m3u_profiles = m3u_account.profiles.all()
default_profile = next((obj for obj in m3u_profiles if obj.is_default), None)
profiles = [obj for obj in m3u_profiles if not obj.is_default]
# -- Loop through profiles and pick the first active one --
for profile in [default_profile] + profiles:
logger.debug(f'Checking profile {profile.name}...')
if not profile.is_active:
logger.debug('Profile is not active, skipping.')
continue
# Acquire the persistent Redis lock, indexed by 0 through max_streams available in the profile
stream_index = 0
while True:
stream_index += 1
if stream_index > profile.max_streams:
# @TODO: we are bailing here if no profile was found, but we need to end up supporting looping through
# all available channel streams
logger.debug(f"Profile is using all available streams.")
break
lock_key = f"lock:{channel.id}:{stream.id}:{profile.id}:{stream_index}"
persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120)
if not persistent_lock.acquire():
logger.error(f"Could not acquire persistent lock for profile {profile.id} index {stream_index}, currently in use.")
continue
break
if persistent_lock.has_lock:
break
if persistent_lock.has_lock == False:
logger.debug(f'Unable to get lock for profile {profile.id}:{profile.name}. Skipping...')
continue
# *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check ***
logger.debug(f"Using M3U profile ID={profile.id} (ignoring viewer count limits)")
active_profile = M3UAccountProfile.objects.get(id=profile.id)
# Prepare the pattern replacement.
logger.debug("Executing the following pattern replacement:")
logger.debug(f" search: {profile.search_pattern}")
safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', profile.replace_pattern)
logger.debug(f" replace: {profile.replace_pattern}")
logger.debug(f" safe replace: {safe_replace_pattern}")
stream_url = re.sub(profile.search_pattern, safe_replace_pattern, input_url)
logger.debug(f"Generated stream url: {stream_url}")
break
if active_profile is None:
logger.exception("No available profiles for the stream")
return HttpResponseServerError("No available profiles for the stream")
if persistent_lock.has_lock == False:
logger.debug(f"Unable to find any available streams or stream profiles.")
return HttpResponseServerError("Resource busy, please try again later.")
# *** DISABLE FAKE LOCKS: Ignore current_viewers/max_streams check ***
logger.debug(f"Using stream {stream.id}{stream.name}, M3U profile {profile.id}{profile.name}, stream index {stream_index}")
active_profile = M3UAccountProfile.objects.get(id=profile.id)
# Prepare the pattern replacement.
logger.debug("Executing the following pattern replacement:")
logger.debug(f" search: {active_profile.search_pattern}")
safe_replace_pattern = re.sub(r'\$(\d+)', r'\\\1', active_profile.replace_pattern)
logger.debug(f" replace: {active_profile.replace_pattern}")
logger.debug(f" safe replace: {safe_replace_pattern}")
stream_url = re.sub(active_profile.search_pattern, safe_replace_pattern, input_url)
logger.debug(f"Generated stream url: {stream_url}")
# Get the stream profile set on the channel.
stream_profile = channel.stream_profile
@ -106,19 +144,9 @@ def stream_view(request, stream_id):
cmd = [stream_profile.command] + parameters.split()
logger.debug("Executing command: %s", cmd)
# Acquire the persistent Redis lock.
redis_host = getattr(settings, "REDIS_HOST", "localhost")
redis_client = redis.Redis(host=settings.REDIS_HOST, port=6379, db=0)
lock_key = f"lock:channel:{channel.id}"
persistent_lock = PersistentLock(redis_client, lock_key, lock_timeout=120)
if not persistent_lock.acquire():
logger.error("Could not acquire persistent lock for channel %s", channel.id)
return HttpResponseServerError("Resource busy, please try again later.")
try:
# Start the streaming process.
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=8192)
except Exception as e:
persistent_lock.release() # Ensure the lock is released on error.
logger.exception("Error starting stream for channel ID=%s", stream_id)
@ -137,6 +165,7 @@ def stream_view(request, stream_id):
yield chunk
finally:
try:
proc.terminate()
logger.debug("Streaming process terminated for stream ID=%s", s.id)
except Exception as e:
@ -144,6 +173,7 @@ def stream_view(request, stream_id):
persistent_lock.release()
logger.debug("Persistent lock released for channel ID=%s", channel.id)
return StreamingHttpResponse(
stream_generator(process, stream, persistent_lock),
content_type="video/MP2T"

View file

@ -5,7 +5,7 @@ import redis
class PersistentLock:
"""
A persistent, auto-expiring lock that uses Redis.
Usage:
1. Instantiate with a Redis client, a unique lock key (e.g. "lock:account:123"),
and an optional timeout (in seconds).
@ -16,7 +16,7 @@ class PersistentLock:
def __init__(self, redis_client: redis.Redis, lock_key: str, lock_timeout: int = 120):
"""
Initialize the lock.
:param redis_client: An instance of redis.Redis.
:param lock_key: The unique key for the lock.
:param lock_timeout: Time-to-live for the lock in seconds.
@ -25,6 +25,10 @@ class PersistentLock:
self.lock_key = lock_key
self.lock_timeout = lock_timeout
self.lock_token = None
self.has_lock = False
def has_lock(self) -> bool:
return self.has_lock
def acquire(self) -> bool:
"""
@ -33,6 +37,9 @@ class PersistentLock:
self.lock_token = str(uuid.uuid4())
# Set the lock with NX (only if not exists) and EX (expire time)
result = self.redis_client.set(self.lock_key, self.lock_token, nx=True, ex=self.lock_timeout)
if result is not None:
self.has_lock = True
return result is not None
def refresh(self) -> bool:
@ -43,6 +50,7 @@ class PersistentLock:
current_value = self.redis_client.get(self.lock_key)
if current_value and current_value.decode("utf-8") == self.lock_token:
self.redis_client.expire(self.lock_key, self.lock_timeout)
self.has_lock = False
return True
return False

View file

@ -1,28 +1,34 @@
FROM alpine
FROM python:3.13-slim
ENV PATH="/dispatcharrpy/bin:$PATH" \
VIRTUAL_ENV=/dispatcharrpy \
DJANGO_SETTINGS_MODULE=dispatcharr.settings \
PYTHONUNBUFFERED=1
RUN apk add \
python3 \
python3-dev \
gcc \
musl-dev \
linux-headers \
py3-pip \
RUN apt-get update && \
apt-get install -y \
curl \
ffmpeg \
streamlink \
vlc \
libpq-dev \
gcc \
py3-virtualenv \
uwsgi \
uwsgi-python \
nodejs \
npm \
git \
gpg \
libpq-dev \
lsb-release \
python3-virtualenv \
streamlink
RUN \
curl -sL https://deb.nodesource.com/setup_23.x -o /tmp/nodesource_setup.sh && \
bash /tmp/nodesource_setup.sh && \
curl -fsSL https://packages.redis.io/gpg | gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg && \
chmod 644 /usr/share/keyrings/redis-archive-keyring.gpg && \
echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | tee /etc/apt/sources.list.d/redis.list && \
apt-get update && \
apt-get install -y redis
RUN apt-get update && \
apt-get install -y \
nodejs \
redis
RUN \
@ -30,24 +36,26 @@ RUN \
virtualenv /dispatcharrpy && \
git clone https://github.com/Dispatcharr/Dispatcharr /app && \
cd /app && \
/dispatcharrpy/bin/pip install --no-cache-dir -r requirements.txt && \
pip install --no-cache-dir -r requirements.txt && \
cd /app/frontend && \
npm install && \
npm run build && \
find . -maxdepth 1 ! -name '.' ! -name 'build' -exec rm -rf '{}' \; && \
cd /app && \
python manage.py collectstatic --noinput || true
# Cleanup
RUN \
apk del \
nodejs \
npm \
git \
python manage.py collectstatic --noinput || true && \
apt-get remove -y \
gcc \
musl-dev \
python3-dev \
linux-headers
git \
gpg \
libpq-dev \
lsb-release \
nodejs && \
apt-get clean && \
apt-get autoremove -y && \
rm -rf \
/tmp/* \
/var/lib/apt/lists/* \
/var/tmp/*
WORKDIR /app

54
docker/Dockerfile.alpine Normal file
View file

@ -0,0 +1,54 @@
FROM alpine
ENV PATH="/dispatcharrpy/bin:$PATH" \
VIRTUAL_ENV=/dispatcharrpy \
DJANGO_SETTINGS_MODULE=dispatcharr.settings \
PYTHONUNBUFFERED=1
RUN apk add \
python3 \
python3-dev \
gcc \
musl-dev \
linux-headers \
py3-pip \
ffmpeg \
streamlink \
vlc \
libpq-dev \
gcc \
py3-virtualenv \
uwsgi \
uwsgi-python \
nodejs \
npm \
git \
redis
RUN \
mkdir /data && \
virtualenv /dispatcharrpy && \
git clone https://github.com/Dispatcharr/Dispatcharr /app && \
cd /app && \
/dispatcharrpy/bin/pip install --no-cache-dir -r requirements.txt && \
cd /app/frontend && \
npm install && \
npm run build && \
find . -maxdepth 1 ! -name '.' ! -name 'build' -exec rm -rf '{}' \; && \
cd /app && \
python manage.py collectstatic --noinput || true
# Cleanup
RUN \
apk del \
nodejs \
npm \
git \
gcc \
musl-dev \
python3-dev \
linux-headers
WORKDIR /app
CMD ["/app/docker/entrypoint.aio.sh"]

View file

@ -1,20 +1,105 @@
#!/bin/sh
#!/bin/bash
# Check the value of DISPATCHARR_ENV and run the corresponding program
case "$DISPATCHARR_ENV" in
"dev")
echo "DISPATCHARR_ENV is set to 'dev'. Running Development Program..."
apk add nodejs npm
cd /app/frontend && npm install
cd /app
exec /usr/sbin/uwsgi --ini uwsgi.dev.ini
;;
"aio")
echo "DISPATCHARR_ENV is set to 'aio'. Running All-in-One Program..."
exec /usr/sbin/uwsgi --ini uwsgi.aio.ini
;;
*)
echo "DISPATCHARR_ENV is not set or has an unexpected value. Running standalone..."
exec /usr/sbin/uwsgi --ini uwsgi.ini
;;
esac
# Run Django migrations and collect static files
python manage.py collectstatic --noinput
python manage.py migrate --noinput
# Function to clean up only running processes
cleanup() {
echo "🔥 Cleanup triggered! Stopping services..."
for pid in "${pids[@]}"; do
if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
echo "⛔ Stopping process (PID: $pid)..."
kill -TERM "$pid" 2>/dev/null
else
echo "✅ Process (PID: $pid) already stopped."
fi
done
wait
}
# Catch termination signals (CTRL+C, Docker Stop, etc.)
trap cleanup TERM INT
# Initialize an array to store PIDs
pids=()
GUNICORN_PORT=9191
# If running in development mode, install and start frontend
if [ "$DISPATCHARR_ENV" = "dev" ]; then
echo "🚀 Development Mode - Setting up Frontend..."
GUNICORN_PORT=5656
# Install Node.js
apt-get update && apt-get install -y nodejs
# Install frontend dependencies
cd /app/frontend && npm install
cd /app
# Start React development server
echo "🚀 Starting React Dev Server..."
cd /app/frontend
PORT=9191 ./node_modules/pm2/bin/pm2 --name test start npm -- start
./node_modules/pm2/bin/pm2 logs &
react_pid=$!
echo "✅ React started with PID $react_pid"
pids+=("$react_pid")
cd /app
fi
# If running in `dev` or `aio`, start Redis and Celery
if [ "$DISPATCHARR_ENV" = "dev" ] || [ "$DISPATCHARR_ENV" = "aio" ]; then
echo "🚀 Running Redis and Celery for '$DISPATCHARR_ENV'..."
# Start Redis
echo "🚀 Starting Redis..."
redis-server --daemonize no &
sleep 1 # Give Redis time to start
redis_pid=$(pgrep -x redis-server)
if [ -n "$redis_pid" ]; then
echo "✅ Redis started with PID $redis_pid"
pids+=("$redis_pid")
else
echo "❌ Redis failed to start!"
fi
# Start Celery
echo "🚀 Starting Celery..."
celery -A dispatcharr worker -l info &
celery_pid=$!
echo "✅ Celery started with PID $celery_pid"
pids+=("$celery_pid")
fi
# Always start Gunicorn
echo "🚀 Starting Gunicorn..."
gunicorn --workers=4 --worker-class=gevent --timeout=300 --bind 0.0.0.0:${GUNICORN_PORT} dispatcharr.wsgi:application &
gunicorn_pid=$!
echo "✅ Gunicorn started with PID $gunicorn_pid"
pids+=("$gunicorn_pid")
# Log PIDs
echo "📝 Process PIDs: ${pids[*]}"
# Wait for at least one process to exit and log the process that exited first
if [ ${#pids[@]} -gt 0 ]; then
echo "⏳ Waiting for processes to exit..."
ps -aux | grep -E 'redis-server|celery|gunicorn|npm'
wait -n "${pids[@]}"
echo "🚨 One of the processes exited! Checking which one..."
for pid in "${pids[@]}"; do
if ! kill -0 "$pid" 2>/dev/null; then
process_name=$(ps -p "$pid" -o comm=)
echo "❌ Process $process_name (PID: $pid) has exited!"
fi
done
else
echo "❌ No processes started. Exiting."
exit 1
fi
# Cleanup and stop remaining processes
cleanup

File diff suppressed because it is too large Load diff

View file

@ -17,6 +17,7 @@
"material-react-table": "^3.2.0",
"mpegts.js": "^1.4.2",
"planby": "^1.1.7",
"pm2": "^5.4.3",
"prettier": "^3.5.2",
"react": "18.2.0",
"react-dom": "18.2.0",

View file

@ -10,7 +10,7 @@
<meta name="theme-color" content="#000000" />
<meta
name="description"
content="Web site created using create-react-app"
content="IPTV Master Control"
/>
<link rel="apple-touch-icon" href="%PUBLIC_URL%/logo192.png" />
<!--
@ -42,7 +42,7 @@
<title>React App</title>
<title>Dispatcharr</title>
</head>
<body>
<noscript>You need to enable JavaScript to run this app.</noscript>

View file

@ -1,3 +1,4 @@
// src/api.js (updated)
import useAuthStore from './store/auth';
import useChannelsStore from './store/channels';
import useUserAgentsStore from './store/userAgents';
@ -7,18 +8,17 @@ import useStreamsStore from './store/streams';
import useStreamProfilesStore from './store/streamProfiles';
import useSettingsStore from './store/settings';
// const axios = Axios.create({
// withCredentials: true,
// });
// If needed, you can set a base host or keep it empty if relative requests
const host = '';
export const getAuthToken = async () => {
const token = await useAuthStore.getState().getToken(); // Assuming token is stored in Zustand store
return token;
};
export default class API {
/**
* A static method so we can do: await API.getAuthToken()
*/
static async getAuthToken() {
return await useAuthStore.getState().getToken();
}
static async login(username, password) {
const response = await fetch(`${host}/api/accounts/token/`, {
method: 'POST',
@ -31,11 +31,11 @@ export default class API {
return await response.json();
}
static async refreshToken(refreshToken) {
static async refreshToken(refresh) {
const response = await fetch(`${host}/api/accounts/token/refresh/`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ refresh: refreshToken }),
body: JSON.stringify({ refresh }),
});
const retval = await response.json();
@ -54,7 +54,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/channels/`, {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
},
});
@ -66,7 +66,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/groups/`, {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
},
});
@ -78,7 +78,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/groups/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(values),
@ -97,7 +97,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/groups/${id}/`, {
method: 'PUT',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
@ -114,6 +114,7 @@ export default class API {
static async addChannel(channel) {
let body = null;
if (channel.logo_file) {
// Must send FormData for file upload
body = new FormData();
for (const prop in channel) {
body.append(prop, channel[prop]);
@ -127,7 +128,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/channels/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
...(channel.logo_file
? {}
: {
@ -149,7 +150,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/channels/${id}/`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -162,7 +163,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/channels/bulk-delete/`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ channel_ids }),
@ -176,7 +177,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/channels/${id}/`, {
method: 'PUT',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
@ -195,26 +196,22 @@ export default class API {
const response = await fetch(`${host}/api/channels/channels/assign/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ channel_order: channelIds }),
});
// The backend returns something like { "message": "Channels have been auto-assigned!" }
if (!response.ok) {
// If you want to handle errors gracefully:
const text = await response.text();
throw new Error(`Assign channels failed: ${response.status} => ${text}`);
}
// Usually it has a { message: "..."} or similar
const retval = await response.json();
// If you want to automatically refresh the channel list in Zustand:
// Optionally refresh the channel list in Zustand
await useChannelsStore.getState().fetchChannels();
// Return the entire JSON result (so the caller can see the "message")
return retval;
}
@ -222,7 +219,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/channels/from-stream/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(values),
@ -242,7 +239,7 @@ export default class API {
{
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(values),
@ -261,7 +258,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/streams/`, {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
},
});
@ -273,7 +270,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/streams/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(values),
@ -292,7 +289,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/streams/${id}/`, {
method: 'PUT',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
@ -310,7 +307,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/streams/${id}/`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -322,7 +319,7 @@ export default class API {
const response = await fetch(`${host}/api/channels/streams/bulk-delete/`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ stream_ids: ids }),
@ -335,7 +332,7 @@ export default class API {
const response = await fetch(`${host}/api/core/useragents/`, {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
},
});
@ -347,7 +344,7 @@ export default class API {
const response = await fetch(`${host}/api/core/useragents/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(values),
@ -366,7 +363,7 @@ export default class API {
const response = await fetch(`${host}/api/core/useragents/${id}/`, {
method: 'PUT',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
@ -384,7 +381,7 @@ export default class API {
const response = await fetch(`${host}/api/core/useragents/${id}/`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -395,7 +392,7 @@ export default class API {
static async getPlaylist(id) {
const response = await fetch(`${host}/api/m3u/accounts/${id}/`, {
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -407,7 +404,7 @@ export default class API {
static async getPlaylists() {
const response = await fetch(`${host}/api/m3u/accounts/`, {
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -420,7 +417,7 @@ export default class API {
const response = await fetch(`${host}/api/m3u/accounts/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(values),
@ -438,7 +435,7 @@ export default class API {
const response = await fetch(`${host}/api/m3u/refresh/${id}/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -451,7 +448,7 @@ export default class API {
const response = await fetch(`${host}/api/m3u/refresh/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -464,7 +461,7 @@ export default class API {
const response = await fetch(`${host}/api/m3u/accounts/${id}/`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -477,7 +474,7 @@ export default class API {
const response = await fetch(`${host}/api/m3u/accounts/${id}/`, {
method: 'PUT',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
@ -494,7 +491,7 @@ export default class API {
static async getEPGs() {
const response = await fetch(`${host}/api/epg/sources/`, {
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -503,18 +500,8 @@ export default class API {
return retval;
}
static async refreshPlaylist(id) {
const response = await fetch(`${host}/api/m3u/refresh/${id}/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
'Content-Type': 'application/json',
},
});
const retval = await response.json();
return retval;
}
// Notice there's a duplicated "refreshPlaylist" method above;
// you might want to rename or remove one if it's not needed.
static async addEPG(values) {
let body = null;
@ -532,7 +519,7 @@ export default class API {
const response = await fetch(`${host}/api/epg/sources/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
...(values.epg_file
? {}
: {
@ -554,7 +541,7 @@ export default class API {
const response = await fetch(`${host}/api/epg/sources/${id}/`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -566,7 +553,7 @@ export default class API {
const response = await fetch(`${host}/api/epg/import/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({ id }),
@ -579,7 +566,7 @@ export default class API {
static async getStreamProfiles() {
const response = await fetch(`${host}/api/core/streamprofiles/`, {
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -592,7 +579,7 @@ export default class API {
const response = await fetch(`${host}/api/core/streamprofiles/`, {
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(values),
@ -610,7 +597,7 @@ export default class API {
const response = await fetch(`${host}/api/core/streamprofiles/${id}/`, {
method: 'PUT',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
@ -628,7 +615,7 @@ export default class API {
const response = await fetch(`${host}/api/core/streamprofiles/${id}/`, {
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -639,7 +626,7 @@ export default class API {
static async getGrid() {
const response = await fetch(`${host}/api/epg/grid/`, {
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
});
@ -654,7 +641,7 @@ export default class API {
{
method: 'POST',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(values),
@ -663,7 +650,7 @@ export default class API {
const retval = await response.json();
if (retval.id) {
// Fetch m3u account to update it with its new playlists
// Refresh the playlist
const playlist = await API.getPlaylist(accountId);
usePlaylistsStore
.getState()
@ -679,7 +666,7 @@ export default class API {
{
method: 'DELETE',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
}
@ -696,7 +683,7 @@ export default class API {
{
method: 'PUT',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
@ -711,7 +698,7 @@ export default class API {
const response = await fetch(`${host}/api/core/settings/`, {
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
},
});
@ -724,7 +711,7 @@ export default class API {
const response = await fetch(`${host}/api/core/settings/${id}/`, {
method: 'PUT',
headers: {
Authorization: `Bearer ${await getAuthToken()}`,
Authorization: `Bearer ${await API.getAuthToken()}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),

View file

@ -69,7 +69,7 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
channel_name: '',
channel_number: '',
channel_group_id: '',
stream_profile_id: '',
stream_profile_id: '0',
tvg_id: '',
tvg_name: '',
},
@ -79,6 +79,10 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
channel_group_id: Yup.string().required('Channel group is required'),
}),
onSubmit: async (values, { setSubmitting, resetForm }) => {
if (values.stream_profile_id == '0') {
values.stream_profile_id = null;
}
console.log(values);
if (channel?.id) {
await API.updateChannel({
@ -109,21 +113,18 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
channel_name: channel.channel_name,
channel_number: channel.channel_number,
channel_group_id: channel.channel_group?.id,
stream_profile_id: channel.stream_profile_id,
stream_profile_id: channel.stream_profile_id || '0',
tvg_id: channel.tvg_id,
tvg_name: channel.tvg_name,
});
console.log('channel streams');
console.log(channel.streams);
console.log(channel);
const filteredStreams = streams
.filter((stream) => channel.streams.includes(stream.id))
.filter((stream) => channel.stream_ids.includes(stream.id))
.sort(
(a, b) =>
channel.streams.indexOf(a.id) - channel.streams.indexOf(b.id)
channel.stream_ids.indexOf(a.id) - channel.stream_ids.indexOf(b.id)
);
console.log('filtered streams');
console.log(filteredStreams);
setChannelStreams(filteredStreams);
} else {
formik.resetForm();
@ -334,7 +335,6 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
labelId="stream-profile-label"
id="stream_profile_id"
name="stream_profile_id"
label="Stream Profile (optional)"
value={formik.values.stream_profile_id}
onChange={formik.handleChange}
onBlur={formik.handleBlur}
@ -345,6 +345,9 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
// helperText={formik.touched.channel_group_id && formik.errors.stream_profile_id}
variant="standard"
>
<MenuItem value="0" selected>
<em>Use Default</em>
</MenuItem>
{streamProfiles.map((option, index) => (
<MenuItem key={index} value={option.id}>
{option.profile_name}
@ -401,7 +404,7 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
helperText={formik.touched.tvg_id && formik.errors.tvg_id}
variant="standard"
/>
<TextField
fullWidth
id="logo_url"
@ -415,7 +418,6 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
helperText="If you have a direct image URL, set it here."
/>
<Box mt={2} mb={2}>
{/* File upload input */}
<Stack
@ -486,4 +488,4 @@ const Channel = ({ channel = null, isOpen, onClose }) => {
);
};
export default Channel;
export default Channel;

View file

@ -24,13 +24,14 @@ import {
SwapVert as SwapVertIcon,
LiveTv as LiveTvIcon,
ContentCopy,
Tv as TvIcon, // <-- ADD THIS IMPORT
} from '@mui/icons-material';
import API from '../../api';
import ChannelForm from '../forms/Channel';
import { TableHelper } from '../../helpers';
import utils from '../../utils';
import logo from '../../images/logo.png';
import useVideoStore from '../../store/useVideoStore'; // NEW import
import useVideoStore from '../../store/useVideoStore';
const ChannelsTable = () => {
const [channel, setChannel] = useState(null);
@ -116,6 +117,7 @@ const ChannelsTable = () => {
4,
selected.map((chan) => () => deleteChannel(chan.original.id))
);
// If you have a real bulk-delete endpoint, call it here:
// await API.deleteChannels(selected.map((sel) => sel.id));
setIsLoading(false);
};
@ -144,6 +146,32 @@ const ChannelsTable = () => {
}
};
// ─────────────────────────────────────────────────────────
// The new "Match EPG" button logic
// ─────────────────────────────────────────────────────────
const matchEpg = async () => {
try {
// Hit our new endpoint that triggers the fuzzy matching Celery task
const resp = await fetch('/api/channels/channels/match-epg/', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${await API.getAuthToken()}`,
},
});
if (resp.ok) {
setSnackbarMessage('EPG matching task started!');
} else {
const text = await resp.text();
setSnackbarMessage(`Failed to start EPG matching: ${text}`);
}
} catch (err) {
setSnackbarMessage(`Error: ${err.message}`);
}
setSnackbarOpen(true);
};
const closeChannelForm = () => {
setChannel(null);
setChannelModalOpen(false);
@ -294,6 +322,18 @@ const ChannelsTable = () => {
</IconButton>
</Tooltip>
{/* Our brand-new button for EPG matching */}
<Tooltip title="Auto-match EPG with fuzzy logic">
<IconButton
size="small"
color="success"
variant="contained"
onClick={matchEpg}
>
<TvIcon fontSize="small" />
</IconButton>
</Tooltip>
<ButtonGroup sx={{ marginLeft: 1 }}>
<Button variant="contained" size="small" onClick={copyHDHRUrl}>
HDHR URL

View file

@ -1,66 +1,89 @@
import React, { useEffect, useState } from 'react';
import React, { useEffect } from 'react';
import {
Grid2,
Grid as Grid2,
Box,
Container,
Typography,
TextField,
Button,
FormControl,
Select,
MenuItem,
CircularProgress,
InputLabel,
Button,
} from '@mui/material';
import { useFormik } from 'formik';
import * as Yup from 'yup';
import API from '../api';
import useSettingsStore from '../store/settings';
import useUserAgentsStore from '../store/userAgents';
import useStreamProfilesStore from '../store/streamProfiles';
import { useFormik } from 'formik';
import * as Yup from 'yup';
import API from '../api';
const SettingsPage = () => {
const { settings } = useSettingsStore();
const { userAgents } = useUserAgentsStore();
const { profiles: streamProfiles } = useStreamProfilesStore();
// Add your region choices here:
const regionChoices = [
{ value: 'us', label: 'US' },
{ value: 'uk', label: 'UK' },
{ value: 'nl', label: 'NL' },
{ value: 'de', label: 'DE' },
// Add more if needed
];
const formik = useFormik({
initialValues: {
'default-user-agent': '',
'default-stream-profile': '',
'preferred-region': '',
},
validationSchema: Yup.object({
'default-user-agent': Yup.string().required('User-Agent is required'),
'default-stream-profile': Yup.string().required(
'Stream Profile is required'
),
// The region is optional or required as you prefer
// 'preferred-region': Yup.string().required('Region is required'),
}),
onSubmit: async (values, { setSubmitting, resetForm }) => {
const changedSettings = {};
for (const setting in values) {
if (values[setting] != settings[setting].value) {
changedSettings[setting] = values[setting];
for (const settingKey in values) {
// If the user changed the settings value from whats in the DB:
if (String(values[settingKey]) !== String(settings[settingKey].value)) {
changedSettings[settingKey] = values[settingKey];
}
}
console.log(changedSettings);
for (const updated in changedSettings) {
// Update each changed setting in the backend
for (const updatedKey in changedSettings) {
await API.updateSetting({
...settings[updated],
value: values[updated],
...settings[updatedKey],
value: changedSettings[updatedKey],
});
}
setSubmitting(false);
// Dont necessarily resetForm, in case the user wants to see new values
},
});
// Initialize form values once settings / userAgents / profiles are loaded
useEffect(() => {
formik.setValues(
Object.values(settings).reduce((acc, setting) => {
acc[setting.key] = parseInt(setting.value) || setting.value;
// If the settings value is numeric, parse it
// Otherwise, just store as string
const possibleNumber = parseInt(setting.value, 10);
acc[setting.key] = isNaN(possibleNumber)
? setting.value
: possibleNumber;
return acc;
}, {})
);
}, [settings, streamProfiles, userAgents]);
// eslint-disable-next-line
}, [settings, userAgents, streamProfiles]);
return (
<Container maxWidth="md">
@ -68,65 +91,90 @@ const SettingsPage = () => {
<Typography variant="h4" gutterBottom>
Settings
</Typography>
<form onSubmit={formik.handleSubmit}>
<Grid2 container spacing={3}>
<FormControl variant="standard" fullWidth>
<InputLabel id="user-agent-label">Default User-Agent</InputLabel>
<Select
labelId="user-agent-label"
id={settings['default-user-agent'].id}
name={settings['default-user-agent'].key}
label={settings['default-user-agent'].name}
value={formik.values['default-user-agent']}
onChange={formik.handleChange}
onBlur={formik.handleBlur}
error={
formik.touched['default-user-agent'] &&
Boolean(formik.errors['default-user-agent'])
}
helperText={
formik.touched['default-user-agent'] &&
formik.errors['default-user-agent']
}
variant="standard"
>
{userAgents.map((option, index) => (
<MenuItem key={index} value={option.id}>
{option.user_agent_name}
</MenuItem>
))}
</Select>
</FormControl>
{/* Default User-Agent */}
<Grid2 xs={12}>
<FormControl variant="standard" fullWidth>
<InputLabel id="user-agent-label">Default User-Agent</InputLabel>
<Select
labelId="user-agent-label"
id={settings['default-user-agent']?.id}
name={settings['default-user-agent']?.key}
label={settings['default-user-agent']?.name}
value={formik.values['default-user-agent'] || ''}
onChange={formik.handleChange}
onBlur={formik.handleBlur}
error={
formik.touched['default-user-agent'] &&
Boolean(formik.errors['default-user-agent'])
}
variant="standard"
>
{userAgents.map((option) => (
<MenuItem key={option.id} value={option.id}>
{option.user_agent_name}
</MenuItem>
))}
</Select>
</FormControl>
</Grid2>
<FormControl variant="standard" fullWidth>
<InputLabel id="stream-profile-label">
Default Stream Profile
</InputLabel>
<Select
labelId="stream-profile-label"
id={settings['default-stream-profile'].id}
name={settings['default-stream-profile'].key}
label={settings['default-stream-profile'].name}
value={formik.values['default-stream-profile']}
onChange={formik.handleChange}
onBlur={formik.handleBlur}
error={
formik.touched['default-stream-profile'] &&
Boolean(formik.errors['default-stream-profile'])
}
helperText={
formik.touched['default-stream-profile'] &&
formik.errors['default-stream-profile']
}
variant="standard"
>
{streamProfiles.map((option, index) => (
<MenuItem key={index} value={option.id}>
{option.profile_name}
</MenuItem>
))}
</Select>
</FormControl>
{/* Default Stream Profile */}
<Grid2 xs={12}>
<FormControl variant="standard" fullWidth>
<InputLabel id="stream-profile-label">
Default Stream Profile
</InputLabel>
<Select
labelId="stream-profile-label"
id={settings['default-stream-profile']?.id}
name={settings['default-stream-profile']?.key}
label={settings['default-stream-profile']?.name}
value={formik.values['default-stream-profile'] || ''}
onChange={formik.handleChange}
onBlur={formik.handleBlur}
error={
formik.touched['default-stream-profile'] &&
Boolean(formik.errors['default-stream-profile'])
}
variant="standard"
>
{streamProfiles.map((profile) => (
<MenuItem key={profile.id} value={profile.id}>
{profile.profile_name}
</MenuItem>
))}
</Select>
</FormControl>
</Grid2>
{/* Preferred Region */}
<Grid2 xs={12}>
{/* Only render if you do indeed have "preferred-region" in the DB */}
{settings['preferred-region'] && (
<FormControl variant="standard" fullWidth>
<InputLabel id="region-label">Preferred Region</InputLabel>
<Select
labelId="region-label"
id={settings['preferred-region'].id}
name={settings['preferred-region'].key}
label={settings['preferred-region'].name}
value={formik.values['preferred-region'] || ''}
onChange={formik.handleChange}
onBlur={formik.handleBlur}
variant="standard"
>
{regionChoices.map((r) => (
<MenuItem key={r.value} value={r.value}>
{r.label}
</MenuItem>
))}
</Select>
</FormControl>
)}
</Grid2>
</Grid2>
<Box mt={4} display="flex" justifyContent="flex-end">

View file

@ -14,3 +14,6 @@ yt-dlp
gevent==24.11.1
django-cors-headers
djangorestframework-simplejwt
m3u8
rapidfuzz==3.12.1
sentence-transformers==3.4.1

View file

@ -1,19 +0,0 @@
[uwsgi]
exec-pre-app = python manage.py collectstatic --noinput
exec-pre-app = python manage.py migrate --noinput
http-socket = [::]:9191
buffer-size = 32768
enable-threads
plugin = python3
module = dispatcharr.wsgi:application
static-map = /static=staticfiles
thunder-lock
disable-write-exception
virtualenv = /dispatcharrpy
max-fd = 10000
attach-daemon = celery -A dispatcharr worker -l info
attach-daemon = redis-server

View file

@ -1,20 +0,0 @@
[uwsgi]
exec-pre-app = python manage.py collectstatic --noinput
exec-pre-app = python manage.py migrate --noinput
http-socket = [::]:5656
buffer-size = 32768
enable-threads
plugin = python3
module = dispatcharr.wsgi:application
static-map = /static=staticfiles
thunder-lock
disable-write-exception
virtualenv = /dispatcharrpy
max-fd = 10000
attach-daemon = celery -A dispatcharr worker -l info
attach-daemon = redis-server
attach-daemon = cd /app/frontend && npm run start

View file

@ -1,16 +0,0 @@
[uwsgi]
exec-pre-app = python manage.py collectstatic --noinput
exec-pre-app = python manage.py migrate --noinput
http-socket = [::]:9191
buffer-size = 32768
enable-threads
plugin = python3
module = dispatcharr.wsgi:application
static-map = /static=staticfiles
thunder-lock
disable-write-exception
virtualenv = /dispatcharrpy
max-fd = 10000