Dispatcharr/apps/proxy/tasks.py
2025-08-05 21:24:41 -05:00

72 lines
2.3 KiB
Python

# yourapp/tasks.py
from celery import shared_task
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
import redis
import json
import logging
import re
import gc # Add import for garbage collection
from core.utils import RedisClient
from apps.proxy.ts_proxy.channel_status import ChannelStatus
from core.utils import send_websocket_update
from apps.proxy.vod_proxy.connection_manager import get_connection_manager
logger = logging.getLogger(__name__)
# Store the last known value to compare with new data
last_known_data = {}
@shared_task
def fetch_channel_stats():
redis_client = RedisClient.get_client()
try:
# Basic info for all channels
channel_pattern = "ts_proxy:channel:*:metadata"
all_channels = []
# Extract channel IDs from keys
cursor = 0
while True:
cursor, keys = redis_client.scan(cursor, match=channel_pattern)
for key in keys:
channel_id_match = re.search(r"ts_proxy:channel:(.*):metadata", key.decode('utf-8'))
if channel_id_match:
ch_id = channel_id_match.group(1)
channel_info = ChannelStatus.get_basic_channel_info(ch_id)
if channel_info:
all_channels.append(channel_info)
if cursor == 0:
break
except Exception as e:
logger.error(f"Error in channel_status: {e}", exc_info=True)
return
# return JsonResponse({'error': str(e)}, status=500)
send_websocket_update(
"updates",
"update",
{
"success": True,
"type": "channel_stats",
"stats": json.dumps({'channels': all_channels, 'count': len(all_channels)})
},
collect_garbage=True
)
# Explicitly clean up large data structures
all_channels = None
gc.collect()
@shared_task
def cleanup_vod_connections():
"""Clean up stale VOD connections"""
try:
connection_manager = get_connection_manager()
connection_manager.cleanup_stale_connections(max_age_seconds=3600) # 1 hour
logger.info("VOD connection cleanup completed")
except Exception as e:
logger.error(f"Error in VOD connection cleanup: {e}", exc_info=True)