mirror of
https://github.com/Dispatcharr/Dispatcharr.git
synced 2026-01-23 10:45:27 +00:00
This works to stream multiple clients.
This commit is contained in:
parent
352b473f27
commit
16618d64d9
2 changed files with 41 additions and 10 deletions
|
|
@ -54,47 +54,78 @@ def initialize_stream(request, channel_id):
|
|||
@csrf_exempt
|
||||
@require_http_methods(["GET"])
|
||||
def stream_ts(request, channel_id):
|
||||
"""Handle TS stream requests"""
|
||||
"""Handle TS stream requests with improved multi-client support"""
|
||||
if channel_id not in proxy_server.stream_managers:
|
||||
return JsonResponse({'error': 'Channel not found'}, status=404)
|
||||
|
||||
def generate():
|
||||
client_id = threading.get_ident()
|
||||
# Use a truly unique client ID (timestamp + random component)
|
||||
import random
|
||||
client_id = int(time.time() * 1000) + random.randint(1, 1000)
|
||||
|
||||
try:
|
||||
buffer = proxy_server.stream_buffers[channel_id]
|
||||
client_manager = proxy_server.client_managers[channel_id]
|
||||
|
||||
# Record this client
|
||||
client_manager.add_client(client_id)
|
||||
last_index = buffer.index
|
||||
|
||||
# Each client starts at current buffer position
|
||||
with buffer.lock:
|
||||
last_index = buffer.index
|
||||
|
||||
logger.info(f"New client {client_id} connected to channel {channel_id} (starting at index {last_index})")
|
||||
|
||||
# Yield some initial headers or empty data to establish connection
|
||||
yield b''
|
||||
|
||||
# Stream indefinitely
|
||||
while True:
|
||||
new_chunks = False
|
||||
|
||||
# Minimize lock time - only lock briefly to check and grab data
|
||||
with buffer.lock:
|
||||
if buffer.index > last_index:
|
||||
chunks_behind = buffer.index - last_index
|
||||
# Calculate start position in circular buffer
|
||||
start_pos = max(0, len(buffer.buffer) - chunks_behind)
|
||||
|
||||
for i in range(start_pos, len(buffer.buffer)):
|
||||
yield buffer.buffer[i]
|
||||
# Get chunks to send (make a copy to avoid long lock)
|
||||
chunks_to_send = [buffer.buffer[i] for i in range(start_pos, len(buffer.buffer))]
|
||||
last_index = buffer.index
|
||||
new_chunks = True
|
||||
|
||||
threading.Event().wait(0.1) # Short sleep between checks
|
||||
if new_chunks:
|
||||
# Send all collected chunks outside the lock
|
||||
for chunk in chunks_to_send:
|
||||
yield chunk
|
||||
else:
|
||||
# Shorter sleep to be more responsive
|
||||
time.sleep(0.05)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Streaming error for channel {channel_id}: {e}")
|
||||
logger.error(f"Streaming error for client {client_id}, channel {channel_id}: {e}")
|
||||
finally:
|
||||
try:
|
||||
if channel_id in proxy_server.client_managers:
|
||||
remaining = proxy_server.client_managers[channel_id].remove_client(client_id)
|
||||
logger.info(f"Client {client_id} disconnected from channel {channel_id} ({remaining} clients remaining)")
|
||||
|
||||
# Keep channel active with at least one client
|
||||
if remaining == 0:
|
||||
logger.info(f"No clients remaining, stopping channel {channel_id}")
|
||||
proxy_server.stop_channel(channel_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Error during client cleanup: {e}")
|
||||
|
||||
return StreamingHttpResponse(
|
||||
|
||||
# Create response with appropriate streaming settings
|
||||
response = StreamingHttpResponse(
|
||||
generate(),
|
||||
content_type='video/MP2T'
|
||||
)
|
||||
response['Cache-Control'] = 'no-cache, no-store'
|
||||
response['X-Accel-Buffering'] = 'no' # Disable nginx buffering
|
||||
return response
|
||||
|
||||
@csrf_exempt
|
||||
@require_http_methods(["POST"])
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ master = true
|
|||
module = dispatcharr.wsgi:application
|
||||
processes = 1
|
||||
workers = 1
|
||||
threads = 1
|
||||
threads = 10
|
||||
enable-threads = true
|
||||
|
||||
# Listen on a dedicated port
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue