diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py
index cbb7bd94..b3de8567 100644
--- a/apps/m3u/tasks.py
+++ b/apps/m3u/tasks.py
@@ -206,8 +206,8 @@ def refresh_single_m3u_account(account_id):
async_to_sync(channel_layer.group_send)(
"updates",
{
- "type": "m3u_refresh",
- "message": {"success": True, "message": "M3U refresh completed successfully"}
+ "type": "update",
+ "data": {"success": True, "type": "m3u_refresh", "message": "M3U refresh completed successfully"}
},
)
return f"Account {account_id} => Created {created_count}, updated {updated_count}, excluded {excluded_count} Streams."
diff --git a/apps/proxy/tasks.py b/apps/proxy/tasks.py
new file mode 100644
index 00000000..37a1f8f9
--- /dev/null
+++ b/apps/proxy/tasks.py
@@ -0,0 +1,51 @@
+# yourapp/tasks.py
+from celery import shared_task
+from channels.layers import get_channel_layer
+from asgiref.sync import async_to_sync
+import redis
+import json
+import logging
+import re
+from core.utils import redis_client
+from apps.proxy.ts_proxy.channel_status import ChannelStatus
+
+logger = logging.getLogger(__name__)
+
+# Store the last known value to compare with new data
+last_known_data = {}
+
+@shared_task
+def fetch_channel_stats():
+ try:
+ # Basic info for all channels
+ channel_pattern = "ts_proxy:channel:*:metadata"
+ all_channels = []
+
+ # Extract channel IDs from keys
+ cursor = 0
+ while True:
+ cursor, keys = redis_client.scan(cursor, match=channel_pattern)
+ for key in keys:
+ channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8'))
+ if channel_id_match:
+ ch_id = channel_id_match.group(1)
+ channel_info = ChannelStatus.get_basic_channel_info(ch_id)
+ if channel_info:
+ all_channels.append(channel_info)
+
+ if cursor == 0:
+ break
+
+ except Exception as e:
+ logger.error(f"Error in channel_status: {e}", exc_info=True)
+ return
+ # return JsonResponse({'error': str(e)}, status=500)
+
+ channel_layer = get_channel_layer()
+ async_to_sync(channel_layer.group_send)(
+ "updates",
+ {
+ "type": "update",
+ "data": {"success": True, "type": "channel_stats", "stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})}
+ },
+ )
diff --git a/apps/proxy/ts_proxy/channel_status.py b/apps/proxy/ts_proxy/channel_status.py
index 3c36b8cd..c9c896e0 100644
--- a/apps/proxy/ts_proxy/channel_status.py
+++ b/apps/proxy/ts_proxy/channel_status.py
@@ -6,7 +6,7 @@ from . import proxy_server
logger = logging.getLogger("ts_proxy")
class ChannelStatus:
-
+
def get_detailed_channel_info(channel_id):
# Get channel metadata
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
@@ -231,7 +231,8 @@ class ChannelStatus:
# Efficient way - just retrieve the essentials
client_info = {
'client_id': client_id_str,
- 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent')
+ 'user_agent': proxy_server.redis_client.hget(client_key, 'user_agent'),
+ 'ip_address': proxy_server.redis_client.hget(client_key, 'ip_address').decode('utf-8'),
}
if client_info['user_agent']:
@@ -251,4 +252,3 @@ class ChannelStatus:
info['clients'] = clients
return info
-
diff --git a/apps/proxy/ts_proxy/client_manager.py b/apps/proxy/ts_proxy/client_manager.py
index 7285e7fe..178b7a9d 100644
--- a/apps/proxy/ts_proxy/client_manager.py
+++ b/apps/proxy/ts_proxy/client_manager.py
@@ -135,7 +135,7 @@ class ClientManager:
except Exception as e:
logger.error(f"Error notifying owner of client activity: {e}")
- def add_client(self, client_id, user_agent=None):
+ def add_client(self, client_id, client_ip, user_agent=None):
"""Add a client with duplicate prevention"""
if client_id in self._registered_clients:
logger.debug(f"Client {client_id} already registered, skipping")
@@ -150,6 +150,7 @@ class ClientManager:
current_time = str(time.time())
client_data = {
"user_agent": user_agent or "unknown",
+ "ip_address": client_ip,
"connected_at": current_time,
"last_active": current_time,
"worker_id": self.worker_id or "unknown"
@@ -285,4 +286,4 @@ class ClientManager:
# Refresh TTL on the set itself
self.redis_client.expire(self.client_set_key, self.client_ttl)
except Exception as e:
- logger.error(f"Error refreshing client TTL: {e}")
\ No newline at end of file
+ logger.error(f"Error refreshing client TTL: {e}")
diff --git a/apps/proxy/ts_proxy/server.py b/apps/proxy/ts_proxy/server.py
index 5dc4a6b5..72bd93dd 100644
--- a/apps/proxy/ts_proxy/server.py
+++ b/apps/proxy/ts_proxy/server.py
@@ -190,16 +190,16 @@ class ProxyServer:
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
if self.redis_client.exists(metadata_key):
self.redis_client.hset(metadata_key, mapping={
- "state": "stopping",
+ "state": "stopping",
"state_changed_at": str(time.time())
})
-
+
# If we have local resources for this channel, clean them up
if channel_id in self.stream_buffers or channel_id in self.client_managers:
# Use existing stop_channel method
logger.info(f"Stopping local resources for channel {channel_id}")
self.stop_channel(channel_id)
-
+
# Acknowledge stop by publishing a response
stop_response = {
"event": "channel_stopped",
@@ -215,14 +215,14 @@ class ProxyServer:
client_id = data.get("client_id")
if client_id and channel_id:
logger.info(f"Received request to stop client {client_id} on channel {channel_id}")
-
+
# Both remove from client manager AND set a key for the generator to detect
if channel_id in self.client_managers:
client_manager = self.client_managers[channel_id]
if client_id in client_manager.clients:
client_manager.remove_client(client_id)
logger.info(f"Removed client {client_id} from client manager")
-
+
# Set a Redis key for the generator to detect
if self.redis_client:
stop_key = f"ts_proxy:channel:{channel_id}:client:{client_id}:stop"
@@ -497,13 +497,13 @@ class ProxyServer:
"""Stop a channel with proper ownership handling"""
try:
logger.info(f"Stopping channel {channel_id}")
-
+
# First set a stopping key that clients will check
if self.redis_client:
stop_key = f"ts_proxy:channel:{channel_id}:stopping"
# Set with 60 second TTL - enough time for clients to notice
self.redis_client.setex(stop_key, 10, "true")
-
+
# Only stop the actual stream manager if we're the owner
if self.am_i_owner(channel_id):
logger.info(f"This worker ({self.worker_id}) is the owner - closing provider connection")
@@ -592,17 +592,17 @@ class ProxyServer:
try:
# Refresh channel registry
self.refresh_channel_registry()
-
+
# Create a unified list of all channels we have locally
all_local_channels = set(self.stream_managers.keys()) | set(self.client_managers.keys())
-
+
# Single loop through all channels - process each exactly once
for channel_id in list(all_local_channels):
if self.am_i_owner(channel_id):
# === OWNER CHANNEL HANDLING ===
# Extend ownership lease
self.extend_ownership(channel_id)
-
+
# Get channel state from metadata hash
channel_state = "unknown"
if self.redis_client:
@@ -701,7 +701,7 @@ class ProxyServer:
self.redis_client.delete(f"ts_proxy:channel:{channel_id}:last_client_disconnect_time")
else:
- # === NON-OWNER CHANNEL HANDLING ===
+ # === NON-OWNER CHANNEL HANDLING ===
# For channels we don't own, check if they've been stopped/cleaned up in Redis
if self.redis_client:
# Method 1: Check for stopping key
@@ -710,21 +710,21 @@ class ProxyServer:
logger.debug(f"Non-owner cleanup: Channel {channel_id} has stopping flag in Redis, cleaning up local resources")
self._cleanup_local_resources(channel_id)
continue
-
+
# Method 2: Check if owner still exists
owner_key = f"ts_proxy:channel:{channel_id}:owner"
if not self.redis_client.exists(owner_key):
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no owner in Redis, cleaning up local resources")
self._cleanup_local_resources(channel_id)
continue
-
+
# Method 3: Check if metadata still exists
metadata_key = f"ts_proxy:channel:{channel_id}:metadata"
if not self.redis_client.exists(metadata_key):
logger.debug(f"Non-owner cleanup: Channel {channel_id} has no metadata in Redis, cleaning up local resources")
self._cleanup_local_resources(channel_id)
continue
-
+
# Check for local client count - if zero, clean up our local resources
if self.client_managers[channel_id].get_client_count() == 0:
# We're not the owner, and we have no local clients - clean up our resources
@@ -831,6 +831,7 @@ class ProxyServer:
self.redis_client.hset(metadata_key, "last_active", str(time.time()))
self.redis_client.expire(metadata_key, 30) # Reset TTL on metadata hash
logger.debug(f"Refreshed metadata TTL for channel {channel_id}")
+
def update_channel_state(self, channel_id, new_state, additional_fields=None):
"""Update channel state with proper history tracking and logging"""
if not self.redis_client:
@@ -887,7 +888,7 @@ class ProxyServer:
if channel_id in self.client_managers:
del self.client_managers[channel_id]
logger.info(f"Non-owner cleanup: Removed client manager for channel {channel_id}")
-
+
return True
except Exception as e:
logger.error(f"Error cleaning up local resources: {e}", exc_info=True)
diff --git a/apps/proxy/ts_proxy/views.py b/apps/proxy/ts_proxy/views.py
index 32496c55..f7424f38 100644
--- a/apps/proxy/ts_proxy/views.py
+++ b/apps/proxy/ts_proxy/views.py
@@ -21,6 +21,14 @@ from rest_framework.permissions import IsAuthenticated
logger = logging.getLogger("ts_proxy")
+def get_client_ip(request):
+ x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
+ if x_forwarded_for:
+ ip = x_forwarded_for.split(',')[0]
+ else:
+ ip = request.META.get('REMOTE_ADDR')
+ return ip
+
@api_view(['GET'])
def stream_ts(request, channel_id):
"""Stream TS data to client with immediate response and keep-alive packets during initialization"""
@@ -31,6 +39,7 @@ def stream_ts(request, channel_id):
try:
# Generate a unique client ID
client_id = f"client_{int(time.time() * 1000)}_{random.randint(1000, 9999)}"
+ client_ip = get_client_ip(request)
logger.info(f"[{client_id}] Requested stream for channel {channel_id}")
# Extract client user agent early
@@ -156,7 +165,7 @@ def stream_ts(request, channel_id):
# Register client
buffer = proxy_server.stream_buffers[channel_id]
client_manager = proxy_server.client_managers[channel_id]
- client_manager.add_client(client_id, client_user_agent)
+ client_manager.add_client(client_id, client_ip, client_user_agent)
logger.info(f"[{client_id}] Client registered with channel {channel_id}")
# Define a single generate function
diff --git a/dispatcharr/consumers.py b/dispatcharr/consumers.py
index 9c56605d..356422d7 100644
--- a/dispatcharr/consumers.py
+++ b/dispatcharr/consumers.py
@@ -14,5 +14,5 @@ class MyWebSocketConsumer(AsyncWebsocketConsumer):
data = json.loads(text_data)
print("Received:", data)
- async def m3u_refresh(self, event):
+ async def update(self, event):
await self.send(text_data=json.dumps(event))
diff --git a/dispatcharr/settings.py b/dispatcharr/settings.py
index 5c3ee116..5cd21169 100644
--- a/dispatcharr/settings.py
+++ b/dispatcharr/settings.py
@@ -1,6 +1,7 @@
import os
from pathlib import Path
from datetime import timedelta
+from celery.schedules import crontab
BASE_DIR = Path(__file__).resolve().parent.parent
@@ -150,6 +151,13 @@ AUTH_USER_MODEL = 'accounts.User'
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
+CELERY_BEAT_SCHEDULE = {
+ 'fetch-channel-statuses': {
+ 'task': 'apps.proxy.tasks.fetch_channel_stats',
+ 'schedule': 2.0,
+ },
+}
+
MEDIA_ROOT = BASE_DIR / 'media'
MEDIA_URL = '/media/'
diff --git a/docker/uwsgi.dev.ini b/docker/uwsgi.dev.ini
index 535897a5..93ef9fa0 100644
--- a/docker/uwsgi.dev.ini
+++ b/docker/uwsgi.dev.ini
@@ -3,6 +3,7 @@
; exec-before = python manage.py migrate --noinput
attach-daemon = celery -A dispatcharr worker -l info
+attach-daemon = celery -A dispatcharr beat -l info
attach-daemon = redis-server
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
attach-daemon = cd /app/frontend && npm run dev
@@ -17,6 +18,7 @@ socket = /app/uwsgi.sock
chmod-socket = 777
vacuum = true
die-on-term = true
+static-map = /static=/app/static
# Worker management (Optimize for I/O bound tasks)
workers = 4
diff --git a/docker/uwsgi.ini b/docker/uwsgi.ini
index ace423af..adb5de33 100644
--- a/docker/uwsgi.ini
+++ b/docker/uwsgi.ini
@@ -2,7 +2,8 @@
; exec-before = python manage.py collectstatic --noinput
; exec-before = python manage.py migrate --noinput
-attach-daemon = celery -A dispatcharr worker -l info
+attach-daemon = celery -A dispatcharr worker -l error
+attach-daemon = celery -A dispatcharr beat -l error
attach-daemon = redis-server
attach-daemon = daphne -b 0.0.0.0 -p 8001 dispatcharr.asgi:application
@@ -16,6 +17,7 @@ socket = /app/uwsgi.sock
chmod-socket = 777
vacuum = true
die-on-term = true
+static-map = /static=/app/static
# Worker management (Optimize for I/O bound tasks)
workers = 4
diff --git a/frontend/src/App.jsx b/frontend/src/App.jsx
index 25dcb8aa..ff8ae3b6 100644
--- a/frontend/src/App.jsx
+++ b/frontend/src/App.jsx
@@ -12,6 +12,7 @@ import Channels from './pages/Channels';
import M3U from './pages/M3U';
import EPG from './pages/EPG';
import Guide from './pages/Guide';
+import Stats from './pages/Stats';
import Settings from './pages/Settings';
import StreamProfiles from './pages/StreamProfiles';
import useAuthStore from './store/auth';
@@ -122,6 +123,7 @@ const App = () => {
element={