diff --git a/apps/channels/migrations/0018_channelgroupm3uaccount_custom_properties_and_more.py b/apps/channels/migrations/0018_channelgroupm3uaccount_custom_properties_and_more.py new file mode 100644 index 00000000..7d2dafb4 --- /dev/null +++ b/apps/channels/migrations/0018_channelgroupm3uaccount_custom_properties_and_more.py @@ -0,0 +1,23 @@ +# Generated by Django 5.1.6 on 2025-04-27 14:12 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dispatcharr_channels', '0017_alter_channelgroup_name'), + ] + + operations = [ + migrations.AddField( + model_name='channelgroupm3uaccount', + name='custom_properties', + field=models.TextField(blank=True, null=True), + ), + migrations.AlterField( + model_name='channel', + name='channel_number', + field=models.IntegerField(db_index=True), + ), + ] diff --git a/apps/channels/models.py b/apps/channels/models.py index 0b66c468..13172e36 100644 --- a/apps/channels/models.py +++ b/apps/channels/models.py @@ -441,6 +441,7 @@ class ChannelGroupM3UAccount(models.Model): on_delete=models.CASCADE, related_name='channel_group' ) + custom_properties = models.TextField(null=True, blank=True) enabled = models.BooleanField(default=True) class Meta: diff --git a/apps/m3u/migrations/0008_m3uaccount_account_type_m3uaccount_password_and_more.py b/apps/m3u/migrations/0008_m3uaccount_account_type_m3uaccount_password_and_more.py new file mode 100644 index 00000000..02d2937f --- /dev/null +++ b/apps/m3u/migrations/0008_m3uaccount_account_type_m3uaccount_password_and_more.py @@ -0,0 +1,28 @@ +# Generated by Django 5.1.6 on 2025-04-27 12:56 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('m3u', '0007_remove_m3uaccount_uploaded_file_m3uaccount_file_path'), + ] + + operations = [ + migrations.AddField( + model_name='m3uaccount', + name='account_type', + field=models.CharField(choices=[('STD', 'Standard'), ('XC', 'Xtream Codes')], default='STD'), + ), + migrations.AddField( + model_name='m3uaccount', + name='password', + field=models.CharField(blank=True, max_length=255, null=True), + ), + migrations.AddField( + model_name='m3uaccount', + name='username', + field=models.CharField(blank=True, max_length=255, null=True), + ), + ] diff --git a/apps/m3u/models.py b/apps/m3u/models.py index 25a332c6..99ead627 100644 --- a/apps/m3u/models.py +++ b/apps/m3u/models.py @@ -10,6 +10,10 @@ from core.models import CoreSettings, UserAgent CUSTOM_M3U_ACCOUNT_NAME="custom" class M3UAccount(models.Model): + class Types(models.TextChoices): + STADNARD = "STD", "Standard" + XC = "XC", "Xtream Codes" + """Represents an M3U Account for IPTV streams.""" name = models.CharField( max_length=255, @@ -69,6 +73,9 @@ class M3UAccount(models.Model): blank=True, related_name='m3u_accounts' ) + account_type = models.CharField(choices=Types.choices, default=Types.STADNARD) + username = models.CharField(max_length=255, null=True, blank=True) + password = models.CharField(max_length=255, null=True, blank=True) custom_properties = models.TextField(null=True, blank=True) refresh_interval = models.IntegerField(default=24) refresh_task = models.ForeignKey( diff --git a/apps/m3u/serializers.py b/apps/m3u/serializers.py index d79b0117..dd9b0e7a 100644 --- a/apps/m3u/serializers.py +++ b/apps/m3u/serializers.py @@ -66,8 +66,14 @@ class M3UAccountSerializer(serializers.ModelSerializer): fields = [ 'id', 'name', 'server_url', 'file_path', 'server_group', 'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked', - 'channel_groups', 'refresh_interval' + 'channel_groups', 'refresh_interval', 'custom_properties', 'account_type', 'username', 'password' ] + extra_kwargs = { + 'password': { + 'required': False, + 'allow_blank': True, + }, + } def update(self, instance, validated_data): # Pop out channel group memberships so we can handle them manually diff --git a/apps/m3u/tasks.py b/apps/m3u/tasks.py index beacaaa2..978f9763 100644 --- a/apps/m3u/tasks.py +++ b/apps/m3u/tasks.py @@ -21,6 +21,7 @@ import json from core.utils import RedisClient, acquire_task_lock, release_task_lock from core.models import CoreSettings from asgiref.sync import async_to_sync +from core.xtream_codes import Client as XCClient logger = logging.getLogger(__name__) @@ -172,32 +173,33 @@ def check_field_lengths(streams_to_create): print("") @shared_task -def process_groups(account, group_names): - existing_groups = {group.name: group for group in ChannelGroup.objects.filter(name__in=group_names)} +def process_groups(account, groups): + existing_groups = {group.name: group for group in ChannelGroup.objects.filter(name__in=groups.keys())} logger.info(f"Currently {len(existing_groups)} existing groups") - groups = [] + group_objs = [] groups_to_create = [] - for group_name in group_names: + for group_name, custom_props in groups.items(): logger.info(f"Handling group: {group_name}") - if group_name in existing_groups: - groups.append(existing_groups[group_name]) - else: + if group_name not in existing_groups: groups_to_create.append(ChannelGroup( name=group_name, )) + else: + group_objs.append(existing_groups[group_name]) if groups_to_create: logger.info(f"Creating {len(groups_to_create)} groups") created = ChannelGroup.bulk_create_and_fetch(groups_to_create) logger.info(f"Created {len(created)} groups") - groups.extend(created) + group_objs.extend(created) relations = [] - for group in groups: + for group in group_objs: relations.append(ChannelGroupM3UAccount( channel_group=group, m3u_account=account, + custom_properties=json.dumps(groups[group.name]), )) ChannelGroupM3UAccount.objects.bulk_create( @@ -205,6 +207,78 @@ def process_groups(account, group_names): ignore_conflicts=True ) +@shared_task +def process_xc_category(account_id, batch, groups, hash_keys): + account = M3UAccount.objects.get(id=account_id) + + streams_to_create = [] + streams_to_update = [] + stream_hashes = {} + + xc_client = XCClient(account.server_url, account.username, account.password) + for group_name, props in batch.items(): + streams = xc_client.get_live_category_streams(props['xc_id']) + for stream in streams: + name = stream["name"] + url = xc_client.get_stream_url(stream["stream_id"]) + tvg_id = stream["epg_channel_id"] + tvg_logo = stream["stream_icon"] + group_title = group_name + + stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) + stream_props = { + "name": name, + "url": url, + "logo_url": tvg_logo, + "tvg_id": tvg_id, + "m3u_account": account, + "channel_group_id": int(groups.get(group_title)), + "stream_hash": stream_hash, + "custom_properties": json.dumps(stream), + } + + if stream_hash not in stream_hashes: + stream_hashes[stream_hash] = stream_props + + existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())} + + for stream_hash, stream_props in stream_hashes.items(): + if stream_hash in existing_streams: + obj = existing_streams[stream_hash] + existing_attr = {field.name: getattr(obj, field.name) for field in Stream._meta.fields if field != 'channel_group_id'} + changed = any(existing_attr[key] != value for key, value in stream_props.items() if key != 'channel_group_id') + + if changed: + for key, value in stream_props.items(): + setattr(obj, key, value) + obj.last_seen = timezone.now() + streams_to_update.append(obj) + del existing_streams[stream_hash] + else: + existing_streams[stream_hash] = obj + else: + stream_props["last_seen"] = timezone.now() + streams_to_create.append(Stream(**stream_props)) + + try: + with transaction.atomic(): + if streams_to_create: + Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True) + if streams_to_update: + Stream.objects.bulk_update(streams_to_update, { key for key in stream_props.keys() if key not in ["m3u_account", "stream_hash"] and key not in hash_keys}) + # if len(existing_streams.keys()) > 0: + # Stream.objects.bulk_update(existing_streams.values(), ["last_seen"]) + except Exception as e: + logger.error(f"Bulk create failed: {str(e)}") + + retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated." + + # Aggressive garbage collection + del streams_to_create, streams_to_update, stream_hashes, existing_streams + gc.collect() + + return retval + @shared_task def process_m3u_batch(account_id, batch, groups, hash_keys): """Processes a batch of M3U streams using bulk operations.""" @@ -227,23 +301,7 @@ def process_m3u_batch(account_id, batch, groups, hash_keys): logger.debug(f"Skipping stream in disabled group: {group_title}") continue - # if any(url.lower().endswith(ext) for ext in SKIP_EXTS) or len(url) > 2000: - # continue - - # if _matches_filters(name, group_title, account.filters.all()): - # continue - - # if any(compiled_pattern.search(current_info['name']) for ftype, compiled_pattern in compiled_filters if ftype == 'name'): - # excluded_count += 1 - # current_info = None - # continue - stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys) - # if redis_client.exists(f"m3u_refresh:{stream_hash}"): - # # duplicate already processed by another batch - # continue - - # redis_client.set(f"m3u_refresh:{stream_hash}", "true") stream_props = { "name": name, "url": url, @@ -332,24 +390,38 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False): return f"M3UAccount with ID={account_id} not found or inactive.", None extinf_data = [] - groups = set(["Default Group"]) + groups = {"Default Group": {}} - for line in fetch_m3u_lines(account, use_cache): - line = line.strip() - if line.startswith("#EXTINF"): - parsed = parse_extinf_line(line) - if parsed: - if "group-title" in parsed["attributes"]: - groups.add(parsed["attributes"]["group-title"]) + xc_client = None + if account.account_type == M3UAccount.Types.XC: + xc_client = XCClient(account.server_url, account.username, account.password) + try: + xc_client.authenticate() + except Exception as e: + release_task_lock('refresh_m3u_account_groups', account_id) + return f"M3UAccount with ID={account_id} failed to authenticate with XC server.", None - extinf_data.append(parsed) - elif extinf_data and line.startswith("http"): - # Associate URL with the last EXTINF line - extinf_data[-1]["url"] = line + xc_categories = xc_client.get_live_categories() + for category in xc_categories: + groups[category["category_name"]] = { + "xc_id": category["category_id"], + } + else: + for line in fetch_m3u_lines(account, use_cache): + line = line.strip() + if line.startswith("#EXTINF"): + parsed = parse_extinf_line(line) + if parsed: + if "group-title" in parsed["attributes"]: + groups[parsed["attributes"]["group-title"]] = {} + + extinf_data.append(parsed) + elif extinf_data and line.startswith("http"): + # Associate URL with the last EXTINF line + extinf_data[-1]["url"] = line send_m3u_update(account_id, "processing_groups", 0) - groups = list(groups) cache_path = os.path.join(m3u_dir, f"{account_id}.json") with open(cache_path, 'w', encoding='utf-8') as f: json.dump({ @@ -412,7 +484,7 @@ def refresh_single_m3u_account(account_id): if not extinf_data: try: extinf_data, groups = refresh_m3u_groups(account_id, full_refresh=True) - if not extinf_data or not groups: + if not groups: release_task_lock('refresh_single_m3u_account', account_id) return "Failed to update m3u account, task may already be running" except: @@ -426,9 +498,17 @@ def refresh_single_m3u_account(account_id): m3u_account__enabled=True # Filter by the enabled flag in the join table )} - # Break into batches and process in parallel - batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)] - task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches) + if account.account_type == M3UAccount.Types.STADNARD: + # Break into batches and process in parallel + batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)] + task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches) + else: + filtered_groups = [(k, v) for k, v in groups.items() if k in existing_groups] + batches = [ + dict(filtered_groups[i:i + 2]) + for i in range(0, len(filtered_groups), 2) + ] + task_group = group(process_xc_category.s(account_id, batch, existing_groups, hash_keys) for batch in batches) total_batches = len(batches) completed_batches = 0 diff --git a/core/xtream_codes.py b/core/xtream_codes.py new file mode 100644 index 00000000..e79ec5e9 --- /dev/null +++ b/core/xtream_codes.py @@ -0,0 +1,26 @@ +import requests + +class Client: + host = "" + username = "" + password = "" + + def __init__(self, host, username, password): + self.host = host + self.username = username + self.password = password + + def authenticate(self): + response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}") + return response.json() + + def get_live_categories(self): + response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}&action=get_live_categories") + return response.json() + + def get_live_category_streams(self, category_id): + response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}&action=get_live_streams&category_id={category_id}") + return response.json() + + def get_stream_url(self, stream_id): + return f"{self.host}/{self.username}/{self.password}/{stream_id}" diff --git a/frontend/src/api.js b/frontend/src/api.js index 38aac846..2c7f3a05 100644 --- a/frontend/src/api.js +++ b/frontend/src/api.js @@ -650,6 +650,10 @@ export default class API { } static async addPlaylist(values) { + if (values.custom_properties) { + values.custom_properties = JSON.stringify(values.custom_properties); + } + try { let body = null; if (values.file) { @@ -717,6 +721,10 @@ export default class API { static async updatePlaylist(values) { const { id, ...payload } = values; + if (payload.custom_properties) { + payload.custom_properties = JSON.stringify(payload.custom_properties); + } + try { let body = null; if (payload.file) { @@ -735,6 +743,7 @@ export default class API { body = { ...payload }; delete body.file; } + console.log(body); const response = await request(`${host}/api/m3u/accounts/${id}/`, { method: 'PATCH', @@ -1241,10 +1250,13 @@ export default class API { static async switchStream(channelId, streamId) { try { - const response = await request(`${host}/proxy/ts/change_stream/${channelId}`, { - method: 'POST', - body: { stream_id: streamId }, - }); + const response = await request( + `${host}/proxy/ts/change_stream/${channelId}`, + { + method: 'POST', + body: { stream_id: streamId }, + } + ); return response; } catch (e) { @@ -1255,10 +1267,13 @@ export default class API { static async nextStream(channelId, streamId) { try { - const response = await request(`${host}/proxy/ts/next_stream/${channelId}`, { - method: 'POST', - body: { stream_id: streamId }, - }); + const response = await request( + `${host}/proxy/ts/next_stream/${channelId}`, + { + method: 'POST', + body: { stream_id: streamId }, + } + ); return response; } catch (e) { diff --git a/frontend/src/components/forms/M3U.jsx b/frontend/src/components/forms/M3U.jsx index bd9cf3c8..2d0f29ba 100644 --- a/frontend/src/components/forms/M3U.jsx +++ b/frontend/src/components/forms/M3U.jsx @@ -18,6 +18,8 @@ import { Stack, Group, Switch, + Box, + PasswordInput, } from '@mantine/core'; import M3UGroupFilter from './M3UGroupFilter'; import useChannelsStore from '../../store/channels'; @@ -35,13 +37,7 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { const [profileModalOpen, setProfileModalOpen] = useState(false); const [groupFilterModalOpen, setGroupFilterModalOpen] = useState(false); const [loadingText, setLoadingText] = useState(''); - - const handleFileChange = (file) => { - console.log(file); - if (file) { - setFile(file); - } - }; + const [showCredentialFields, setShowCredentialFields] = useState(false); const form = useForm({ mode: 'uncontrolled', @@ -52,6 +48,9 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { is_active: true, max_streams: 0, refresh_interval: 24, + is_xc: false, + username: '', + password: '', }, validate: { @@ -63,6 +62,7 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { useEffect(() => { if (playlist) { + const customProperties = JSON.parse(playlist.custom_properties || '{}'); form.setValues({ name: playlist.name, server_url: playlist.server_url, @@ -70,14 +70,43 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { user_agent: playlist.user_agent ? `${playlist.user_agent}` : '0', is_active: playlist.is_active, refresh_interval: playlist.refresh_interval, + is_xc: playlist.account_type == 'XC', + username: customProperties.username ?? '', + password: '', }); + + if (customProperties.is_xc) { + setShowCredentialFields(true); + } else { + setShowCredentialFields(false); + } } else { form.reset(); } }, [playlist]); + useEffect(() => { + if (form.values.is_xc) { + setShowCredentialFields(true); + } + }, [form.values.is_xc]); + const onSubmit = async () => { - const values = form.getValues(); + const { ...values } = form.getValues(); + + if (values.is_xc && values.password == '') { + // If account XC and no password input, assuming no password change + // from previously stored value. + delete values.password; + } + + if (values.is_xc) { + values.account_type = 'XC'; + } else { + values.account_type = 'STD'; + } + + delete values.is_xc; if (values.user_agent == '0') { values.user_agent = null; @@ -150,7 +179,6 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => { {...form.getInputProps('name')} key={form.key('name')} /> - { {...form.getInputProps('server_url')} key={form.key('server_url')} /> - - + + {form.getValues().is_xc && ( + + + + + )} + + {!form.getValues().is_xc && ( + + )}