initial xtreamcodes support

This commit is contained in:
dekzter 2025-04-27 10:32:29 -04:00
parent 9c6e19fb3b
commit 3054cf2ae9
9 changed files with 305 additions and 67 deletions

View file

@ -0,0 +1,23 @@
# Generated by Django 5.1.6 on 2025-04-27 14:12
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('dispatcharr_channels', '0017_alter_channelgroup_name'),
]
operations = [
migrations.AddField(
model_name='channelgroupm3uaccount',
name='custom_properties',
field=models.TextField(blank=True, null=True),
),
migrations.AlterField(
model_name='channel',
name='channel_number',
field=models.IntegerField(db_index=True),
),
]

View file

@ -441,6 +441,7 @@ class ChannelGroupM3UAccount(models.Model):
on_delete=models.CASCADE,
related_name='channel_group'
)
custom_properties = models.TextField(null=True, blank=True)
enabled = models.BooleanField(default=True)
class Meta:

View file

@ -0,0 +1,28 @@
# Generated by Django 5.1.6 on 2025-04-27 12:56
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('m3u', '0007_remove_m3uaccount_uploaded_file_m3uaccount_file_path'),
]
operations = [
migrations.AddField(
model_name='m3uaccount',
name='account_type',
field=models.CharField(choices=[('STD', 'Standard'), ('XC', 'Xtream Codes')], default='STD'),
),
migrations.AddField(
model_name='m3uaccount',
name='password',
field=models.CharField(blank=True, max_length=255, null=True),
),
migrations.AddField(
model_name='m3uaccount',
name='username',
field=models.CharField(blank=True, max_length=255, null=True),
),
]

View file

@ -10,6 +10,10 @@ from core.models import CoreSettings, UserAgent
CUSTOM_M3U_ACCOUNT_NAME="custom"
class M3UAccount(models.Model):
class Types(models.TextChoices):
STADNARD = "STD", "Standard"
XC = "XC", "Xtream Codes"
"""Represents an M3U Account for IPTV streams."""
name = models.CharField(
max_length=255,
@ -69,6 +73,9 @@ class M3UAccount(models.Model):
blank=True,
related_name='m3u_accounts'
)
account_type = models.CharField(choices=Types.choices, default=Types.STADNARD)
username = models.CharField(max_length=255, null=True, blank=True)
password = models.CharField(max_length=255, null=True, blank=True)
custom_properties = models.TextField(null=True, blank=True)
refresh_interval = models.IntegerField(default=24)
refresh_task = models.ForeignKey(

View file

@ -66,8 +66,14 @@ class M3UAccountSerializer(serializers.ModelSerializer):
fields = [
'id', 'name', 'server_url', 'file_path', 'server_group',
'max_streams', 'is_active', 'created_at', 'updated_at', 'filters', 'user_agent', 'profiles', 'locked',
'channel_groups', 'refresh_interval'
'channel_groups', 'refresh_interval', 'custom_properties', 'account_type', 'username', 'password'
]
extra_kwargs = {
'password': {
'required': False,
'allow_blank': True,
},
}
def update(self, instance, validated_data):
# Pop out channel group memberships so we can handle them manually

View file

@ -21,6 +21,7 @@ import json
from core.utils import RedisClient, acquire_task_lock, release_task_lock
from core.models import CoreSettings
from asgiref.sync import async_to_sync
from core.xtream_codes import Client as XCClient
logger = logging.getLogger(__name__)
@ -172,32 +173,33 @@ def check_field_lengths(streams_to_create):
print("")
@shared_task
def process_groups(account, group_names):
existing_groups = {group.name: group for group in ChannelGroup.objects.filter(name__in=group_names)}
def process_groups(account, groups):
existing_groups = {group.name: group for group in ChannelGroup.objects.filter(name__in=groups.keys())}
logger.info(f"Currently {len(existing_groups)} existing groups")
groups = []
group_objs = []
groups_to_create = []
for group_name in group_names:
for group_name, custom_props in groups.items():
logger.info(f"Handling group: {group_name}")
if group_name in existing_groups:
groups.append(existing_groups[group_name])
else:
if group_name not in existing_groups:
groups_to_create.append(ChannelGroup(
name=group_name,
))
else:
group_objs.append(existing_groups[group_name])
if groups_to_create:
logger.info(f"Creating {len(groups_to_create)} groups")
created = ChannelGroup.bulk_create_and_fetch(groups_to_create)
logger.info(f"Created {len(created)} groups")
groups.extend(created)
group_objs.extend(created)
relations = []
for group in groups:
for group in group_objs:
relations.append(ChannelGroupM3UAccount(
channel_group=group,
m3u_account=account,
custom_properties=json.dumps(groups[group.name]),
))
ChannelGroupM3UAccount.objects.bulk_create(
@ -205,6 +207,78 @@ def process_groups(account, group_names):
ignore_conflicts=True
)
@shared_task
def process_xc_category(account_id, batch, groups, hash_keys):
account = M3UAccount.objects.get(id=account_id)
streams_to_create = []
streams_to_update = []
stream_hashes = {}
xc_client = XCClient(account.server_url, account.username, account.password)
for group_name, props in batch.items():
streams = xc_client.get_live_category_streams(props['xc_id'])
for stream in streams:
name = stream["name"]
url = xc_client.get_stream_url(stream["stream_id"])
tvg_id = stream["epg_channel_id"]
tvg_logo = stream["stream_icon"]
group_title = group_name
stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys)
stream_props = {
"name": name,
"url": url,
"logo_url": tvg_logo,
"tvg_id": tvg_id,
"m3u_account": account,
"channel_group_id": int(groups.get(group_title)),
"stream_hash": stream_hash,
"custom_properties": json.dumps(stream),
}
if stream_hash not in stream_hashes:
stream_hashes[stream_hash] = stream_props
existing_streams = {s.stream_hash: s for s in Stream.objects.filter(stream_hash__in=stream_hashes.keys())}
for stream_hash, stream_props in stream_hashes.items():
if stream_hash in existing_streams:
obj = existing_streams[stream_hash]
existing_attr = {field.name: getattr(obj, field.name) for field in Stream._meta.fields if field != 'channel_group_id'}
changed = any(existing_attr[key] != value for key, value in stream_props.items() if key != 'channel_group_id')
if changed:
for key, value in stream_props.items():
setattr(obj, key, value)
obj.last_seen = timezone.now()
streams_to_update.append(obj)
del existing_streams[stream_hash]
else:
existing_streams[stream_hash] = obj
else:
stream_props["last_seen"] = timezone.now()
streams_to_create.append(Stream(**stream_props))
try:
with transaction.atomic():
if streams_to_create:
Stream.objects.bulk_create(streams_to_create, ignore_conflicts=True)
if streams_to_update:
Stream.objects.bulk_update(streams_to_update, { key for key in stream_props.keys() if key not in ["m3u_account", "stream_hash"] and key not in hash_keys})
# if len(existing_streams.keys()) > 0:
# Stream.objects.bulk_update(existing_streams.values(), ["last_seen"])
except Exception as e:
logger.error(f"Bulk create failed: {str(e)}")
retval = f"Batch processed: {len(streams_to_create)} created, {len(streams_to_update)} updated."
# Aggressive garbage collection
del streams_to_create, streams_to_update, stream_hashes, existing_streams
gc.collect()
return retval
@shared_task
def process_m3u_batch(account_id, batch, groups, hash_keys):
"""Processes a batch of M3U streams using bulk operations."""
@ -227,23 +301,7 @@ def process_m3u_batch(account_id, batch, groups, hash_keys):
logger.debug(f"Skipping stream in disabled group: {group_title}")
continue
# if any(url.lower().endswith(ext) for ext in SKIP_EXTS) or len(url) > 2000:
# continue
# if _matches_filters(name, group_title, account.filters.all()):
# continue
# if any(compiled_pattern.search(current_info['name']) for ftype, compiled_pattern in compiled_filters if ftype == 'name'):
# excluded_count += 1
# current_info = None
# continue
stream_hash = Stream.generate_hash_key(name, url, tvg_id, hash_keys)
# if redis_client.exists(f"m3u_refresh:{stream_hash}"):
# # duplicate already processed by another batch
# continue
# redis_client.set(f"m3u_refresh:{stream_hash}", "true")
stream_props = {
"name": name,
"url": url,
@ -332,24 +390,38 @@ def refresh_m3u_groups(account_id, use_cache=False, full_refresh=False):
return f"M3UAccount with ID={account_id} not found or inactive.", None
extinf_data = []
groups = set(["Default Group"])
groups = {"Default Group": {}}
for line in fetch_m3u_lines(account, use_cache):
line = line.strip()
if line.startswith("#EXTINF"):
parsed = parse_extinf_line(line)
if parsed:
if "group-title" in parsed["attributes"]:
groups.add(parsed["attributes"]["group-title"])
xc_client = None
if account.account_type == M3UAccount.Types.XC:
xc_client = XCClient(account.server_url, account.username, account.password)
try:
xc_client.authenticate()
except Exception as e:
release_task_lock('refresh_m3u_account_groups', account_id)
return f"M3UAccount with ID={account_id} failed to authenticate with XC server.", None
extinf_data.append(parsed)
elif extinf_data and line.startswith("http"):
# Associate URL with the last EXTINF line
extinf_data[-1]["url"] = line
xc_categories = xc_client.get_live_categories()
for category in xc_categories:
groups[category["category_name"]] = {
"xc_id": category["category_id"],
}
else:
for line in fetch_m3u_lines(account, use_cache):
line = line.strip()
if line.startswith("#EXTINF"):
parsed = parse_extinf_line(line)
if parsed:
if "group-title" in parsed["attributes"]:
groups[parsed["attributes"]["group-title"]] = {}
extinf_data.append(parsed)
elif extinf_data and line.startswith("http"):
# Associate URL with the last EXTINF line
extinf_data[-1]["url"] = line
send_m3u_update(account_id, "processing_groups", 0)
groups = list(groups)
cache_path = os.path.join(m3u_dir, f"{account_id}.json")
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump({
@ -412,7 +484,7 @@ def refresh_single_m3u_account(account_id):
if not extinf_data:
try:
extinf_data, groups = refresh_m3u_groups(account_id, full_refresh=True)
if not extinf_data or not groups:
if not groups:
release_task_lock('refresh_single_m3u_account', account_id)
return "Failed to update m3u account, task may already be running"
except:
@ -426,9 +498,17 @@ def refresh_single_m3u_account(account_id):
m3u_account__enabled=True # Filter by the enabled flag in the join table
)}
# Break into batches and process in parallel
batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)]
task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches)
if account.account_type == M3UAccount.Types.STADNARD:
# Break into batches and process in parallel
batches = [extinf_data[i:i + BATCH_SIZE] for i in range(0, len(extinf_data), BATCH_SIZE)]
task_group = group(process_m3u_batch.s(account_id, batch, existing_groups, hash_keys) for batch in batches)
else:
filtered_groups = [(k, v) for k, v in groups.items() if k in existing_groups]
batches = [
dict(filtered_groups[i:i + 2])
for i in range(0, len(filtered_groups), 2)
]
task_group = group(process_xc_category.s(account_id, batch, existing_groups, hash_keys) for batch in batches)
total_batches = len(batches)
completed_batches = 0

26
core/xtream_codes.py Normal file
View file

@ -0,0 +1,26 @@
import requests
class Client:
host = ""
username = ""
password = ""
def __init__(self, host, username, password):
self.host = host
self.username = username
self.password = password
def authenticate(self):
response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}")
return response.json()
def get_live_categories(self):
response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}&action=get_live_categories")
return response.json()
def get_live_category_streams(self, category_id):
response = requests.get(f"{self.host}/player_api.php?username={self.username}&password={self.password}&action=get_live_streams&category_id={category_id}")
return response.json()
def get_stream_url(self, stream_id):
return f"{self.host}/{self.username}/{self.password}/{stream_id}"

View file

@ -650,6 +650,10 @@ export default class API {
}
static async addPlaylist(values) {
if (values.custom_properties) {
values.custom_properties = JSON.stringify(values.custom_properties);
}
try {
let body = null;
if (values.file) {
@ -717,6 +721,10 @@ export default class API {
static async updatePlaylist(values) {
const { id, ...payload } = values;
if (payload.custom_properties) {
payload.custom_properties = JSON.stringify(payload.custom_properties);
}
try {
let body = null;
if (payload.file) {
@ -735,6 +743,7 @@ export default class API {
body = { ...payload };
delete body.file;
}
console.log(body);
const response = await request(`${host}/api/m3u/accounts/${id}/`, {
method: 'PATCH',
@ -1241,10 +1250,13 @@ export default class API {
static async switchStream(channelId, streamId) {
try {
const response = await request(`${host}/proxy/ts/change_stream/${channelId}`, {
method: 'POST',
body: { stream_id: streamId },
});
const response = await request(
`${host}/proxy/ts/change_stream/${channelId}`,
{
method: 'POST',
body: { stream_id: streamId },
}
);
return response;
} catch (e) {
@ -1255,10 +1267,13 @@ export default class API {
static async nextStream(channelId, streamId) {
try {
const response = await request(`${host}/proxy/ts/next_stream/${channelId}`, {
method: 'POST',
body: { stream_id: streamId },
});
const response = await request(
`${host}/proxy/ts/next_stream/${channelId}`,
{
method: 'POST',
body: { stream_id: streamId },
}
);
return response;
} catch (e) {

View file

@ -18,6 +18,8 @@ import {
Stack,
Group,
Switch,
Box,
PasswordInput,
} from '@mantine/core';
import M3UGroupFilter from './M3UGroupFilter';
import useChannelsStore from '../../store/channels';
@ -35,13 +37,7 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
const [profileModalOpen, setProfileModalOpen] = useState(false);
const [groupFilterModalOpen, setGroupFilterModalOpen] = useState(false);
const [loadingText, setLoadingText] = useState('');
const handleFileChange = (file) => {
console.log(file);
if (file) {
setFile(file);
}
};
const [showCredentialFields, setShowCredentialFields] = useState(false);
const form = useForm({
mode: 'uncontrolled',
@ -52,6 +48,9 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
is_active: true,
max_streams: 0,
refresh_interval: 24,
is_xc: false,
username: '',
password: '',
},
validate: {
@ -63,6 +62,7 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
useEffect(() => {
if (playlist) {
const customProperties = JSON.parse(playlist.custom_properties || '{}');
form.setValues({
name: playlist.name,
server_url: playlist.server_url,
@ -70,14 +70,43 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
user_agent: playlist.user_agent ? `${playlist.user_agent}` : '0',
is_active: playlist.is_active,
refresh_interval: playlist.refresh_interval,
is_xc: playlist.account_type == 'XC',
username: customProperties.username ?? '',
password: '',
});
if (customProperties.is_xc) {
setShowCredentialFields(true);
} else {
setShowCredentialFields(false);
}
} else {
form.reset();
}
}, [playlist]);
useEffect(() => {
if (form.values.is_xc) {
setShowCredentialFields(true);
}
}, [form.values.is_xc]);
const onSubmit = async () => {
const values = form.getValues();
const { ...values } = form.getValues();
if (values.is_xc && values.password == '') {
// If account XC and no password input, assuming no password change
// from previously stored value.
delete values.password;
}
if (values.is_xc) {
values.account_type = 'XC';
} else {
values.account_type = 'STD';
}
delete values.is_xc;
if (values.user_agent == '0') {
values.user_agent = null;
@ -150,7 +179,6 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
{...form.getInputProps('name')}
key={form.key('name')}
/>
<TextInput
fullWidth
id="server_url"
@ -159,14 +187,38 @@ const M3U = ({ playlist = null, isOpen, onClose, playlistCreated = false }) => {
{...form.getInputProps('server_url')}
key={form.key('server_url')}
/>
<FileInput
id="file"
label="Upload files"
placeholder="Upload files"
// value={formik.file}
onChange={handleFileChange}
<Switch
id="is_xc"
name="is_xc"
label="Is XC?"
{...form.getInputProps('is_xc', { type: 'checkbox' })}
/>
{form.getValues().is_xc && (
<Box>
<TextInput
id="username"
name="username"
label="Username"
{...form.getInputProps('username')}
/>
<PasswordInput
id="password"
name="password"
label="Password"
{...form.getInputProps('password')}
/>
</Box>
)}
{!form.getValues().is_xc && (
<FileInput
id="file"
label="Upload files"
placeholder="Upload files"
onChange={setFile}
/>
)}
</Stack>
<Divider size="sm" orientation="vertical" />