From adf84294eb6dc6ef5f71ab65aca9f1b2bbab4d3e Mon Sep 17 00:00:00 2001 From: dekzter Date: Sat, 11 Oct 2025 10:40:34 -0400 Subject: [PATCH] committing what I've got for fuse fs --- docker/dfs.py | 420 +++++++++++++++++++++++++++ docker/docker-compose.dev.yml | 13 + docker/media-fs.py | 523 ++++++++++++++++++++++++++++++++++ docker/streamfs.py | 220 ++++++++++++++ docker/virtfs.py | 429 ++++++++++++++++++++++++++++ 5 files changed, 1605 insertions(+) create mode 100644 docker/dfs.py create mode 100644 docker/media-fs.py create mode 100644 docker/streamfs.py create mode 100644 docker/virtfs.py diff --git a/docker/dfs.py b/docker/dfs.py new file mode 100644 index 00000000..b62fe5d7 --- /dev/null +++ b/docker/dfs.py @@ -0,0 +1,420 @@ +#!/usr/bin/env python3 +""" +StreamFS — a Python FUSE virtual filesystem that exposes video-on-demand URLs +(from mapping files) as regular read-only files with full byte-range support. + +Design highlights +----------------- +- Mapping directory (default: /mnt/mappings) contains *text files*. Each file's + content is a single URL (source URL). The filename becomes the exposed file. +- When a client opens a file, StreamFS issues a HEAD request to the source URL. + The server returns a *unique* session URL in the `X-Session-URL` header. That + session URL is used for **all subsequent GETs** for that file handle. +- File size is taken from `Content-Length` (returned by HEAD). +- `read()` uses HTTP Range requests against the session URL so that scrubbing + (random seeks) works seamlessly. +- Read-only filesystem. Multiple simultaneous clients are supported; each open + gets its own session URL and connection pool. + +Requirements +------------ +- Linux with FUSE (libfuse) installed. +- Python 3.8+ +- `fusepy` and `requests` packages. + + pip install fusepy requests + +Mount example +------------- + sudo ./streamfs.py /mnt/streamfs --maps /mnt/mappings -f -d + +Unmount +------- + sudo umount /mnt/streamfs + +Notes +----- +- The mapping directory is reloaded automatically on directory listing calls, + so adding/removing mapping files becomes visible to clients. +- Optional small read-ahead cache per open handle is included for sequential + reads; it is conservative and safe to disable with `--no-readahead`. +- TLS verification can be disabled (e.g., for lab networks) via `--insecure`. + +""" +import errno +import logging +import os +import stat +import sys +import time +import threading +import argparse +from typing import Dict, Tuple, Optional + +import requests +from fuse import FUSE, Operations, LoggingMixIn + +# -------------------------- Helpers & Data Classes -------------------------- # + +class HTTPError(IOError): + pass + + +def clamp(n: int, low: int, high: int) -> int: + return max(low, min(high, n)) + + +class OpenHandle: + """Tracks state for one open() file handle.""" + + def __init__(self, logical_path: str, source_url: str, session: requests.Session, + session_url: str, size: int, readahead: bool, verify_tls: bool): + self.path = logical_path + self.source_url = source_url + self.session = session + self.session_url = session_url + self.size = size + self.verify_tls = verify_tls + + # Simple read-ahead cache: maps (start,end) -> bytes + self.readahead = readahead + self.cache_lock = threading.Lock() + self.cache: Dict[Tuple[int, int], bytes] = {} + self.max_cache_bytes = 4 * 1024 * 1024 # 4 MiB per handle cap + self.cache_bytes = 0 + + def _add_cache(self, start: int, data: bytes): + if not self.readahead: + return + with self.cache_lock: + end = start + len(data) + key = (start, end) + # If adding exceeds cap, clear cache (simple strategy). + if self.cache_bytes + len(data) > self.max_cache_bytes: + self.cache.clear() + self.cache_bytes = 0 + self.cache[key] = data + self.cache_bytes += len(data) + + def _get_from_cache(self, start: int, size: int) -> Optional[bytes]: + if not self.readahead or size <= 0: + return None + with self.cache_lock: + for (s, e), blob in list(self.cache.items()): + if start >= s and (start + size) <= e: + off = start - s + return blob[off:off + size] + return None + + def ranged_get(self, start: int, size: int) -> bytes: + if size <= 0: + return b"" + # Clamp against EOF + end_inclusive = clamp(start + size - 1, 0, self.size - 1) + if end_inclusive < start: + return b"" + + # Cache lookup + cached = self._get_from_cache(start, end_inclusive - start + 1) + if cached is not None: + return cached + + headers = {"Range": f"bytes={start}-{end_inclusive}"} + resp = self.session.get(self.session_url, headers=headers, stream=False, + timeout=(5, 30), verify=self.verify_tls) + if resp.status_code not in (200, 206): + raise HTTPError(f"Unexpected GET status {resp.status_code} for {self.path}") + data = resp.content + # Cache the full returned chunk + self._add_cache(start, data) + return data + + def close(self): + try: + self.session.close() + except Exception: + pass + + +# ------------------------------ Filesystem Core ----------------------------- # + +class StreamFS(LoggingMixIn, Operations): + def __init__(self, mappings_dir: str, verify_tls: bool = True, readahead: bool = True): + super().__init__() + self.mappings_dir = os.path.abspath(mappings_dir) + self.verify_tls = verify_tls + self.readahead = readahead + + self._log = logging.getLogger("StreamFS") + self._log.info("Using mappings dir: %s", self.mappings_dir) + + # name -> source URL string (loaded from files) + self.mappings: Dict[str, str] = {} + self._mappings_mtime = 0.0 + self._mappings_lock = threading.Lock() + + # name -> (size, etag/last-modified) + self.meta_cache: Dict[str, Tuple[int, Optional[str]]] = {} + self._meta_lock = threading.Lock() + + # fh -> OpenHandle + self._fh_lock = threading.Lock() + self._next_fh = 3 + self._open_handles: Dict[int, OpenHandle] = {} + + if not os.path.isdir(self.mappings_dir): + raise RuntimeError(f"Mappings directory does not exist: {self.mappings_dir}") + self._reload_mappings(force=True) + + # --------------------------- Mapping management -------------------------- # + + def _reload_mappings(self, force: bool = False): + try: + mtime = os.stat(self.mappings_dir).st_mtime + except FileNotFoundError: + mtime = time.time() + if not force and mtime <= self._mappings_mtime: + return + + with self._mappings_lock: + new_map: Dict[str, str] = {} + for entry in os.listdir(self.mappings_dir): + full = os.path.join(self.mappings_dir, entry) + if not os.path.isfile(full): + continue + try: + with open(full, "r", encoding="utf-8") as f: + url = f.read().strip() + if url: + new_map[entry] = url + except Exception as e: + self._log.warning("Skipping mapping %s: %s", entry, e) + self.mappings = new_map + self._mappings_mtime = mtime + self._log.info("Loaded %d mappings", len(self.mappings)) + + # ------------------------------- Utilities ------------------------------- # + + def _source_for_path(self, path: str) -> Optional[str]: + name = path.lstrip("/") + return self.mappings.get(name) + + def _head_fetch_meta(self, source_url: str) -> Tuple[int, Optional[str], str]: + """Return (size, etag_or_lastmod, session_url) from HEAD. + Expects server to include X-Session-URL header. + """ + s = requests.Session() + resp = s.get(source_url, allow_redirects=True, timeout=(5, 15), verify=self.verify_tls) + if resp.status_code >= 400: + s.close() + raise HTTPError(f"HEAD failed {resp.status_code} for {source_url}") + size_hdr = resp.headers.get("Content-Length") + if not size_hdr: + s.close() + raise HTTPError("No Content-Length on HEAD") + try: + size = int(size_hdr) + except ValueError: + s.close() + raise HTTPError("Invalid Content-Length") + + session_url = resp.headers.get("X-Session-URL") + if not session_url: + # Fallback: use final URL if header not present + session_url = str(resp.url) + etag = resp.headers.get("ETag") or resp.headers.get("Last-Modified") + # Keep this session for the handle that will use it. + return size, etag, session_url, s + + # ------------------------------- FUSE ops -------------------------------- # + + def getattr(self, path, fh=None): + self._reload_mappings() + if path == "/": + mode = stat.S_IFDIR | 0o555 + now = int(time.time()) + return { + "st_mode": mode, + "st_nlink": 2, + "st_ctime": now, + "st_mtime": now, + "st_atime": now, + } + + src = self._source_for_path(path) + if not src: + raise OSError(errno.ENOENT, "No such file", path) + + # Find or refresh meta + name = path.lstrip("/") + size: Optional[int] = None + with self._meta_lock: + meta = self.meta_cache.get(name) + if meta: + size = meta[0] + + if size is None: + try: + size, etag, _session_url, sess = self._head_fetch_meta(src) + sess.close() + with self._meta_lock: + self.meta_cache[name] = (size, etag) + except Exception as e: + self._log.error("HEAD meta fetch failed for %s: %s", name, e) + raise OSError(errno.EIO, str(e)) + + mode = stat.S_IFREG | 0o444 + now = int(time.time()) + return { + "st_mode": mode, + "st_nlink": 1, + "st_size": size, + "st_ctime": now, + "st_mtime": now, + "st_atime": now, + } + + def readdir(self, path, fh): + self._reload_mappings() + if path != "/": + raise OSError(errno.ENOENT, "No such directory", path) + yield from [".", "..", *sorted(self.mappings.keys())] + + def open(self, path, flags): + # Read-only enforcement + if flags & (os.O_WRONLY | os.O_RDWR): + raise OSError(errno.EACCES, "Read-only filesystem") + + src = self._source_for_path(path) + if not src: + raise OSError(errno.ENOENT, "No such file", path) + + # HEAD to obtain size and session URL + try: + size, etag, session_url, session = self._head_fetch_meta(src) + except Exception as e: + self._log.error("open() HEAD failed for %s: %s", path, e) + raise OSError(errno.EIO, str(e)) + + # Update meta cache + with self._meta_lock: + self.meta_cache[path.lstrip("/")] = (size, etag) + + with self._fh_lock: + fh = self._next_fh + self._next_fh += 1 + self._open_handles[fh] = OpenHandle( + logical_path=path, + source_url=src, + session=session, + session_url=session_url, + size=size, + readahead=self.readahead, + verify_tls=self.verify_tls, + ) + self._log.debug("Opened %s (fh=%d) session_url=%s size=%d", path, fh, session_url, size) + return fh + + def read(self, path, size, offset, fh): + with self._fh_lock: + handle = self._open_handles.get(fh) + if not handle: + raise OSError(errno.EBADF, "Invalid file handle") + try: + return handle.ranged_get(offset, size) + except HTTPError as e: + self._log.error("HTTP read error for %s: %s", path, e) + raise OSError(errno.EIO, str(e)) + except requests.RequestException as e: + self._log.error("Network error for %s: %s", path, e) + raise OSError(errno.EIO, str(e)) + + # VFS is read-only; deny write ops explicitly + def write(self, path, data, offset, fh): + raise OSError(errno.EROFS, "Read-only filesystem") + + def truncate(self, path, length, fh=None): + raise OSError(errno.EROFS, "Read-only filesystem") + + def unlink(self, path): + raise OSError(errno.EROFS, "Read-only filesystem") + + def rename(self, old, new): + raise OSError(errno.EROFS, "Read-only filesystem") + + def mkdir(self, path, mode): + raise OSError(errno.EROFS, "Read-only filesystem") + + def rmdir(self, path): + raise OSError(errno.EROFS, "Read-only filesystem") + + def chmod(self, path, mode): + raise OSError(errno.EROFS, "Read-only filesystem") + + def chown(self, path, uid, gid): + raise OSError(errno.EROFS, "Read-only filesystem") + + def utimens(self, path, times=None): + # Ignore timestamp changes to avoid errors in some clients + return 0 + + def release(self, path, fh): + with self._fh_lock: + handle = self._open_handles.pop(fh, None) + if handle: + handle.close() + return 0 + + def flush(self, path, fh): + # No writeback + return 0 + + +# ---------------------------------- Main ----------------------------------- # + +def parse_args(argv=None): + p = argparse.ArgumentParser(description="VOD virtual filesystem over HTTP with Range support") + p.add_argument("mountpoint", help="Mount point for the FUSE filesystem") + p.add_argument("--maps", dest="maps", default="/mnt/mappings", + help="Directory of mapping files (default: /mnt/mappings)") + p.add_argument("-f", "--foreground", action="store_true", help="Run in foreground") + p.add_argument("-d", "--debug", action="store_true", help="Enable debug logging") + p.add_argument("--insecure", action="store_true", help="Disable TLS verification") + p.add_argument("--no-readahead", action="store_true", help="Disable read-ahead cache") + return p.parse_args(argv) + + +def main(argv=None): + args = parse_args(argv) + + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=log_level, format="[%(levelname)s] %(name)s: %(message)s") + + fs = StreamFS( + mappings_dir=args.maps, + verify_tls=not args.insecure, + readahead=not args.no_readahead, + ) + + # Mount options: allow other users to read, default_permissions for kernel checks + fuse_opts = { + "foreground": args.foreground, + "allow_other": True, + "ro": True, + "default_permissions": True, + } + + # Convert dict to FUSE option list + mount_opts = [] + for k, v in fuse_opts.items(): + if isinstance(v, bool): + if v: + mount_opts.append(k) + else: + mount_opts.append(f"{k}={v}") + + FUSE(fs, args.mountpoint, nothreads=True, **{opt: True for opt in mount_opts}) + + +if __name__ == "__main__": + main() diff --git a/docker/docker-compose.dev.yml b/docker/docker-compose.dev.yml index 00394d55..39bdfe0c 100644 --- a/docker/docker-compose.dev.yml +++ b/docker/docker-compose.dev.yml @@ -37,5 +37,18 @@ services: ports: - 8081:8081 + dispatcharr-fs: + image: dispatcharr/fuse + container_name: dispatcharr-fs + cap_add: + - SYS_ADMIN + devices: + - /dev/fuse:/dev/fuse + security_opt: + - apparmor:unconfined + volumes: + - /appdata/dispatcharr/vod:/mnt:shared + restart: unless-stopped + volumes: dispatcharr_dev_pgadmin: diff --git a/docker/media-fs.py b/docker/media-fs.py new file mode 100644 index 00000000..abe91227 --- /dev/null +++ b/docker/media-fs.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python3 +import errno +import os +import stat +import time +import argparse +import threading +from typing import Dict, Optional, Tuple + +import requests +from requests.adapters import HTTPAdapter +from fuse import FUSE, Operations, FuseOSError + +# Notes: +# - Requires: pip install fusepy requests +# - Linux: install FUSE (e.g., sudo apt-get install fuse3) and ensure user can mount +# - Mount with: sudo python3 url_stream_fs.py --source /path/to/url-files --mount /mnt/streamfs -o allow_other +# +# Behavior: +# - Mirrors the directory structure under --source. +# - Each regular file is treated as a "URL file": first non-empty, non-comment line is the media URL. +# - getattr: +# - For files, returns a read-only regular file with size derived from HEAD Content-Length. +# - Size is cached with TTL to avoid excessive HEADs. +# - open: +# - Performs HEAD against the file's URL, reads Content-Length and X-Session-URL (if present). +# - Stores a per-open handle with "session_url" for subsequent reads. +# - read: +# - Issues GET with Range: bytes=offset-end to the per-handle session_url (or original URL if no session header). +# - Returns exactly the bytes requested by FUSE (or fewer near EOF). +# - release: +# - Closes the per-open requests.Session and discards handle state. + + +DEFAULT_TTL_SECONDS = 60 # cache TTL for HEAD-derived file size (getattr) +DEFAULT_TIMEOUT = (5, 30) # (connect, read) timeouts for requests +USER_AGENT = "URLStreamFS/1.1 (+https://github.com/)" + +# Read-only stat size strategy to avoid prematurely creating sessions: +# - "zero": always report 0 (never HEAD on getattr) +# - "cached": report last-known size if we've opened before (no HEAD on getattr) +# - "head": perform HEAD on getattr (may create a session early) +DEFAULT_STAT_MODE = "cached" + +# If a GET returns one of these, we will refresh the session and retry once +SESSION_RETRY_STATUS = {401, 403, 404, 410, 412, 429, 500, 502, 503, 504} + + +def is_probably_url(s: str) -> bool: + s = s.strip().lower() + return s.startswith("http://") or s.startswith("https://") + + +def read_url_from_file(local_path: str) -> str: + # Reads first non-empty, non-comment line (comments start with '#') + with open(local_path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if not is_probably_url(line): + raise FuseOSError(errno.EINVAL) + return line + raise FuseOSError(errno.EINVAL) + + +class URLStreamFS(Operations): + def __init__( + self, + source_root: str, + extra_headers: Optional[Dict[str, str]] = None, + stat_ttl: int = DEFAULT_TTL_SECONDS, + timeout: Tuple[int, int] = DEFAULT_TIMEOUT, + only_ext: Optional[Tuple[str, ...]] = None, + stat_mode: str = DEFAULT_STAT_MODE, + ): + self.source_root = os.path.abspath(source_root) + self.extra_headers = extra_headers or {} + self.stat_ttl = stat_ttl + self.timeout = timeout + self.only_ext = tuple(e.lower() for e in only_ext) if only_ext else None + self.stat_mode = stat_mode # zero|cached|head + + # Cache for HEAD-derived sizes keyed by absolute path + self._size_cache: Dict[str, Dict] = {} + self._size_lock = threading.RLock() + + # Handle table for per-open sessions + self._fh_lock = threading.RLock() + self._next_fh = 3 # arbitrary start + self._handles: Dict[int, Dict] = {} + + # Helpers + + def _full_path(self, path: str) -> str: + assert path.startswith("/") + return os.path.join(self.source_root, path.lstrip("/")) + + def _is_supported_file(self, full: str) -> bool: + if not os.path.isfile(full): + return False + if self.only_ext: + _, ext = os.path.splitext(full) + return ext.lower() in self.only_ext + return True + + def _requests_session(self) -> requests.Session: + s = requests.Session() + s.headers.update( + { + "User-Agent": USER_AGENT, + "Connection": "keep-alive", + **self.extra_headers, + } + ) + # A small connection pool helps concurrent readers + adapter = HTTPAdapter(pool_connections=8, pool_maxsize=8, max_retries=0) + s.mount("http://", adapter) + s.mount("https://", adapter) + return s + + def _head(self, session: requests.Session, url: str) -> requests.Response: + # Allow redirects; server returns Content-Length and possibly X-Session-URL + resp = session.head(url, allow_redirects=True, timeout=self.timeout) + if resp.status_code >= 400: + raise FuseOSError(errno.EIO) + return resp + + def _update_size_cache(self, full: str, size: Optional[int]) -> None: + with self._size_lock: + self._size_cache[full] = { + "ts": time.time(), + "local_mtime": os.path.getmtime(full), + "size": size, + } + + def _get_size_cached(self, full: str, url: str) -> Optional[int]: + # Returns size from cache or refreshes via HEAD (using a short-lived session not tied to 'open') + now = time.time() + local_mtime = os.path.getmtime(full) + with self._size_lock: + entry = self._size_cache.get(full) + if ( + entry + and entry["local_mtime"] == local_mtime + and (now - entry["ts"]) <= self.stat_ttl + and entry.get("size") is not None + ): + return entry["size"] + + # Avoid creating server-side viewing sessions during getattr unless explicitly requested + if self.stat_mode != "head": + return None + + # Refresh with HEAD (may create/advance a session server-side) + sess = self._requests_session() + try: + resp = self._head(sess, url) + size = None + cl = resp.headers.get("Content-Length") + if cl is not None: + try: + size = int(cl) + except ValueError: + size = None + finally: + sess.close() + + with self._size_lock: + self._size_cache[full] = { + "ts": now, + "local_mtime": local_mtime, + "size": size, + } + return size + + def _ensure_session(self, handle: Dict) -> None: + # Lazily start or refresh a session for this open handle + if handle.get("session_url"): + return + sess: requests.Session = handle["sess"] + orig_url: str = handle["orig_url"] + + head = self._head(sess, orig_url) + session_url = head.headers.get("X-Session-URL") or orig_url + + size = None + cl = head.headers.get("Content-Length") + if cl is not None: + try: + size = int(cl) + except ValueError: + size = None + + handle["session_url"] = session_url + handle["size"] = size + # Save for getattr(stat_mode=cached) + self._update_size_cache(handle["full"], size) + + def access(self, path, mode): + full = self._full_path(path) + if not os.path.exists(full): + raise FuseOSError(errno.ENOENT) + # Read-only FS + if mode & (os.W_OK | os.X_OK): + # Allow execute traversal on directories, deny writes + if os.path.isdir(full): + return 0 + if mode & os.W_OK: + raise FuseOSError(errno.EROFS) + return 0 + + def getattr(self, path, fh=None): + full = self._full_path(path) + if not os.path.exists(full): + raise FuseOSError(errno.ENOENT) + + st = os.lstat(full) + + # Directory: mirror local attrs + if stat.S_ISDIR(st.st_mode): + return dict( + st_mode=(stat.S_IFDIR | 0o555), # read+execute + st_nlink=2, + st_size=0, + st_ctime=st.st_ctime, + st_mtime=st.st_mtime, + st_atime=st.st_atime, + ) + + # Files: expose as read-only regular files and override size from remote (per stat_mode) + if self._is_supported_file(full): + mode = stat.S_IFREG | 0o444 + try: + url = read_url_from_file(full) + except FuseOSError: + url = None + + size = 0 + if url: + if self.stat_mode == "zero": + size = 0 + elif self.stat_mode == "cached": + cached = self._get_size_cached(full, url) # will not network-call in cached mode + size = cached if cached is not None else 0 + else: # head + probed = self._get_size_cached(full, url) # performs HEAD when stale + size = probed if probed is not None else 0 + + return dict( + st_mode=mode, + st_nlink=1, + st_size=size, + st_ctime=st.st_ctime, + st_mtime=st.st_mtime, + st_atime=st.st_atime, + ) + + # Non-supported regular files: show as 0444 with local size + return dict( + st_mode=(stat.S_IFREG | 0o444), + st_nlink=1, + st_size=st.st_size, + st_ctime=st.st_ctime, + st_mtime=st.st_mtime, + st_atime=st.st_atime, + ) + + def readdir(self, path, fh): + full = self._full_path(path) + if not os.path.isdir(full): + raise FuseOSError(errno.ENOTDIR) + + entries = [".", ".."] + try: + for name in os.listdir(full): + entries.append(name) + except PermissionError: + raise FuseOSError(errno.EACCES) + + for e in entries: + yield e + + # Open returns a new file handle; session is created lazily on first read + def open(self, path, flags): + full = self._full_path(path) + if not os.path.exists(full): + raise FuseOSError(errno.ENOENT) + if not self._is_supported_file(full): + if flags & (os.O_WRONLY | os.O_RDWR): + raise FuseOSError(errno.EROFS) + raise FuseOSError(errno.EPERM) + + if flags & (os.O_WRONLY | os.O_RDWR): + raise FuseOSError(errno.EROFS) + + url = read_url_from_file(full) + + sess = self._requests_session() + + with self._fh_lock: + fh = self._next_fh + self._next_fh += 1 + self._handles[fh] = { + "path": path, + "full": full, + "orig_url": url, + "session_url": None, # defer creating session until first read + "size": None, + "sess": sess, + "lock": threading.RLock(), + } + return fh + + def _perform_range_get(self, sess: requests.Session, url: str, start: int, size: int) -> requests.Response: + end = start + size - 1 if size > 0 else None + headers = {} + if size > 0: + headers["Range"] = f"bytes={start}-{end}" + return sess.get( + url, + headers=headers, + stream=True, + allow_redirects=True, + timeout=self.timeout, + ) + + def read(self, path, size, offset, fh): + with self._fh_lock: + handle = self._handles.get(fh) + if handle is None: + raise FuseOSError(errno.EBADF) + + sess: requests.Session = handle["sess"] + + # Lazily create or refresh session before first read + try: + with handle["lock"]: + if not handle.get("session_url"): + self._ensure_session(handle) + session_url = handle["session_url"] + except Exception: + raise + + # Attempt GET; if it indicates session is invalid, refresh once and retry + def fetch_once(target_url: str): + return self._perform_range_get(sess, target_url, max(0, offset), size) + + try: + with handle["lock"]: + resp = fetch_once(session_url) + + # If server sends a new X-Session-URL on GET, update our handle + new_session_url = resp.headers.get("X-Session-URL") + if new_session_url and new_session_url != session_url: + handle["session_url"] = new_session_url + session_url = new_session_url + + if resp.status_code in SESSION_RETRY_STATUS: + # Refresh session and retry once + self._ensure_session(handle) + session_url = handle["session_url"] + resp.close() + resp = fetch_once(session_url) + + if resp.status_code not in (200, 206): + if resp.status_code == 416: + return b"" + raise FuseOSError(errno.EIO) + + data = b"" + start = max(0, offset) + + if resp.status_code == 200 and start > 0: + # Fallback: server ignored range; skip then slice + for chunk in resp.iter_content(chunk_size=1024 * 1024): + if not chunk: + break + if start >= len(chunk): + start -= len(chunk) + continue + take = min(len(chunk) - start, size - len(data)) + data += chunk[start : start + take] + start = 0 + if len(data) >= size: + break + else: + for chunk in resp.iter_content(chunk_size=1024 * 256): + if not chunk: + break + need = size - len(data) + if need <= 0: + break + if len(chunk) > need: + data += chunk[:need] + break + data += chunk + + return data + except requests.Timeout: + raise FuseOSError(errno.ETIMEDOUT) + except requests.RequestException: + raise FuseOSError(errno.EIO) + + def release(self, path, fh): + with self._fh_lock: + handle = self._handles.pop(fh, None) + if handle: + try: + handle["sess"].close() + except Exception: + pass + return 0 + + # Read-only FS + def unlink(self, path): + raise FuseOSError(errno.EROFS) + + def rename(self, old, new): + raise FuseOSError(errno.EROFS) + + def mkdir(self, path, mode): + raise FuseOSError(errno.EROFS) + + def rmdir(self, path): + raise FuseOSError(errno.EROFS) + + def chmod(self, path, mode): + raise FuseOSError(errno.EROFS) + + def chown(self, path, uid, gid): + raise FuseOSError(errno.EROFS) + + def truncate(self, path, length, fh=None): + raise FuseOSError(errno.EROFS) + + def utimens(self, path, times=None): + # Ignore updates + return 0 + + def statfs(self, path): + # Provide some sane defaults; not critical for media servers + block_size = 4096 + total = 1 << 30 # 1 GiB virtual + free = total // 2 + return dict( + f_bsize=block_size, + f_frsize=block_size, + f_blocks=total // block_size, + f_bfree=free // block_size, + f_bavail=free // block_size, + f_files=1000000, + f_ffree=999999, + f_namemax=255, + ) + + +def parse_headers(header_list: Optional[list]) -> Dict[str, str]: + headers = {} + if not header_list: + return headers + for h in header_list: + if ":" not in h: + continue + k, v = h.split(":", 1) + headers[k.strip()] = v.strip() + return headers + + +def main(): + parser = argparse.ArgumentParser(description="Mount URL-streaming FUSE filesystem") + parser.add_argument("--source", required=True, help="Local directory with URL files") + parser.add_argument("--mount", required=True, help="Mount point") + parser.add_argument( + "--ext", + action="append", + help="Only treat files with this extension as URL files (e.g., --ext .url). Repeatable.", + ) + parser.add_argument( + "--header", + action="append", + help="Extra HTTP header to send to the API (e.g., --header 'Authorization: Bearer ...'). Repeatable.", + ) + parser.add_argument("--stat-ttl", type=int, default=DEFAULT_TTL_SECONDS, help="Seconds to cache remote size") + parser.add_argument( + "--stat-mode", + choices=["zero", "cached", "head"], + default=DEFAULT_STAT_MODE, + help="How getattr reports size: zero (no HEAD), cached (no HEAD until opened), or head (HEAD on getattr).", + ) + parser.add_argument("--fg", action="store_true", help="Run in foreground") + parser.add_argument("-o", dest="fuseopts", default="", help="Additional FUSE options (e.g., allow_other)") + args = parser.parse_args() + + if not os.path.isdir(args.source): + raise SystemExit(f"Source directory not found: {args.source}") + if not os.path.isdir(args.mount): + raise SystemExit(f"Mount point must exist and be a directory: {args.mount}") + + headers = parse_headers(args.header) + only_ext = tuple(args.ext) if args.ext else None + + fs = URLStreamFS( + source_root=args.source, + extra_headers=headers, + stat_ttl=args.stat_ttl, + only_ext=only_ext, + stat_mode=args.stat_mode, + ) + + # Prepare FUSE options + fuse_kwargs = { + "foreground": args.fg, + "nothreads": False, + "allow_other": "allow_other" in (args.fuseopts or ""), + } + + # fusepy uses options via -o; we pass the raw string if provided + FUSE(fs, args.mount, nothreads=fuse_kwargs["nothreads"], foreground=fuse_kwargs["foreground"], fsname="urlstreamfs", ro=True, debug=args.fg) + + +if __name__ == "__main__": + main() diff --git a/docker/streamfs.py b/docker/streamfs.py new file mode 100644 index 00000000..b7da0e44 --- /dev/null +++ b/docker/streamfs.py @@ -0,0 +1,220 @@ +#!/usr/bin/env python3 +import os +import stat +import time +import errno +import logging +import requests +import subprocess +import json + +from fuse import FUSE, Operations, FuseOSError + +logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") + +MAPPINGS_DIR = "/mnt/mappings" # folder with files whose content is the URL to the stream +MOUNT_POINT = "/mnt/streams" # mount point for the FUSE filesystem + +CACHE_TTL = 300 # seconds for cache validity + +class StreamFS(Operations): + def __init__(self, mappings_dir): + self.mappings_dir = mappings_dir + self._reload_mappings() + self.size_cache = {} + self.metadata_cache = {} + + def _reload_mappings(self): + try: + self.files = os.listdir(self.mappings_dir) + except FileNotFoundError: + self.files = [] + + def _get_url_for_file(self, filename): + try: + with open(os.path.join(self.mappings_dir, filename), "r") as f: + return f.read().strip() + except Exception as e: + logging.error(f"Failed to read URL for {filename}: {e}") + return None + + def _http_head_size(self, url): + now = time.time() + if url in self.size_cache: + ts, size = self.size_cache[url] + if now - ts < CACHE_TTL: + return size + + try: + r = requests.head(url, timeout=5) + if r.status_code in (200, 206): + size = int(r.headers.get('Content-Length', '0')) + self.size_cache[url] = (now, size) + logging.debug(f"Fetched size {size} for {url}") + return size + else: + logging.warning(f"HEAD request returned status {r.status_code} for {url}") + except Exception as e: + logging.error(f"HEAD request failed for {url}: {e}") + return 0 + + def _get_metadata(self, url): + now = time.time() + if url in self.metadata_cache: + ts, metadata_str = self.metadata_cache[url] + if now - ts < CACHE_TTL: + return metadata_str + + # Use ffprobe to get video metadata + # Note: ffprobe expects a URL or a local file path + try: + cmd = [ + "ffprobe", "-v", "error", "-show_entries", + "format=duration:stream=codec_name,width,height", + "-of", "json", url + ] + logging.debug(f"Running ffprobe for {url}") + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=10) + if proc.returncode != 0: + logging.error(f"ffprobe error for {url}: {proc.stderr.strip()}") + metadata_str = "ffprobe error: " + proc.stderr.strip() + else: + info = json.loads(proc.stdout) + format_info = info.get("format", {}) + streams = info.get("streams", []) + + duration = format_info.get("duration", "unknown") + duration = float(duration) if duration != "unknown" else None + + # Gather codecs and resolution from first video stream + video_stream = None + for s in streams: + if s.get("codec_type") == "video": + video_stream = s + break + + codec = video_stream.get("codec_name", "unknown") if video_stream else "unknown" + width = video_stream.get("width", "unknown") if video_stream else "unknown" + height = video_stream.get("height", "unknown") if video_stream else "unknown" + + metadata_str = ( + f"Duration: {duration:.2f} sec\n" if duration else "Duration: unknown\n" + ) + metadata_str += f"Video codec: {codec}\n" + metadata_str += f"Resolution: {width}x{height}\n" + + self.metadata_cache[url] = (now, metadata_str) + return metadata_str + except Exception as e: + logging.error(f"Failed to run ffprobe for {url}: {e}") + return "Metadata unavailable\n" + + # -- FUSE operations -- + + def readdir(self, path, fh): + logging.debug(f"readdir called for {path}") + self._reload_mappings() + yield '.' + yield '..' + + for filename in self.files: + # yield video file + yield filename + + def getattr(self, path, fh=None): + logging.debug(f"getattr called for {path}") + + if path == "/" or path == "": + return dict(st_mode=(stat.S_IFDIR | 0o755), st_nlink=2) + + # Normal video files + filename = path.lstrip("/") + if filename not in self.files: + raise FuseOSError(errno.ENOENT) + + url = self._get_url_for_file(filename) + if not url: + raise FuseOSError(errno.ENOENT) + + size = self._http_head_size(url) + + # Get stat of mapping file for ownership and timestamps + full_path = os.path.join(self.mappings_dir, filename) + try: + st = os.lstat(full_path) + uid = st.st_uid + gid = st.st_gid + atime = st.st_atime + mtime = st.st_mtime + ctime = st.st_ctime + except Exception: + uid = os.getuid() + gid = os.getgid() + atime = mtime = ctime = time.time() + + return dict( + st_mode=(stat.S_IFREG | 0o444), + st_nlink=1, + st_size=size, + st_uid=uid, + st_gid=gid, + st_atime=atime, + st_mtime=mtime, + st_ctime=ctime, + ) + + def getxattr(self, path, name, position=0): + logging.debug(f"getxattr called for {path} name={name}") + # Return ENOTSUP error without exception + raise FuseOSError(errno.ENOTSUP) + + def open(self, path, flags): + filename = path.lstrip("/").lstrip(".").rstrip(".info") + logging.debug(f"Requested to open {filename}") + logging.debug(f"Current files: {json.dumps(self.files)}") + if filename not in self.files: + raise FuseOSError(errno.ENOENT) + logging.debug(f"open called for file: {filename}") + return 0 + + def read(self, path, size, offset, fh): + logging.debug(f"read called for path={path}, size={size}, offset={offset}") + + if path.endswith(".info"): + base_filename = path[1:-5] + url = self._get_url_for_file(base_filename) + if not url: + return b"" + metadata_str = self._get_metadata(url) + data = metadata_str.encode("utf-8") + return data[offset:offset + size] + + filename = path.lstrip("/") + url = self._get_url_for_file(filename) + if not url: + return b"" + + headers = {"Range": f"bytes={offset}-{offset + size - 1}"} + try: + with requests.get(url, headers=headers, stream=True, timeout=15) as r: + r.raise_for_status() + data = bytearray() + for chunk in r.iter_content(chunk_size=8192): + if not chunk: + break + data.extend(chunk) + if len(data) >= size: + break + # Make sure to return exactly requested size + return bytes(data[:size]) + except Exception as e: + logging.error(f"Error reading {url} range {offset}-{offset + size - 1}: {e}") + return b"" + +if __name__ == "__main__": + os.makedirs(MAPPINGS_DIR, exist_ok=True) + os.makedirs(MOUNT_POINT, exist_ok=True) + + logging.info(f"Mounting StreamFS: {MAPPINGS_DIR} -> {MOUNT_POINT}") + + FUSE(StreamFS(MAPPINGS_DIR), MOUNT_POINT, nothreads=True, foreground=True, allow_other=True) diff --git a/docker/virtfs.py b/docker/virtfs.py new file mode 100644 index 00000000..c14e6b2c --- /dev/null +++ b/docker/virtfs.py @@ -0,0 +1,429 @@ +#!/usr/bin/env python3 +import os +import stat +import logging +import requests +from fuse import FUSE, Operations, LoggingMixIn +from collections import OrderedDict +import time +from urllib.parse import urlparse +import threading + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s" +) + +# ------------------------- +# Tunables for performance +# ------------------------- +READ_CHUNK_BYTES = 1024 * 256 # 256 KiB chunks pulled from upstream per read iteration +BUFFER_MAX_BYTES = 1024 * 1024 * 8 # 8 MiB sliding window per open handle +DOWNLOAD_SLEEP_SEC = 0.0 # small pause per chunk to ease throttling (e.g., 0.01) +READ_WAIT_TIMEOUT = 15.0 # max seconds a read waits for needed bytes to arrive +HTTP_TIMEOUT_SEC = 20 # connect/read timeout for requests +USER_UID = 1000 # tweak to your environment if needed +USER_GID = 1000 + +class VideoStreamFS(LoggingMixIn, Operations): + def __init__(self, source_dir): + self.source_dir = source_dir + self.fd_counter = 0 + self.open_files = {} + self._global_lock = threading.Lock() + self.cache = OrderedDict() # (fh, offset, size) -> (data, timestamp) + self.cache_max_items = 10 + self.cache_ttl = 5 # seconds + self.cache_files = {} + logging.info(f"Initialized VideoStreamFS with source_dir={source_dir}") + + # ---------- helpers ---------- + def _full_path(self, partial): + if partial.startswith("/"): + partial = partial[1:] + return os.path.join(self.source_dir, partial) + + def _head_follow(self, url): + """ + HEAD (preferred) to follow redirects and get final URL + size. + Fallback to GET(stream=True) if Content-Length missing. + """ + s = requests.Session() + r = s.head(url, allow_redirects=True, timeout=HTTP_TIMEOUT_SEC) + r.raise_for_status() + + url_parts = urlparse(r.url) + final_url = f'{url_parts.scheme}://{url_parts.netloc}{r.headers.get('X-Session-URL')}' + + size = r.headers.get("Content-Length") + + if size is None: + # Fallback to GET (no Range) to fetch headers only + gr = s.get(final_url, stream=True, allow_redirects=True, timeout=HTTP_TIMEOUT_SEC) + try: + gr.raise_for_status() + size = gr.headers.get("Content-Length") + finally: + gr.close() + + total_size = int(size) if size is not None and size.isdigit() else 0 + return s, final_url, total_size + + def _cache_get(self, fh, offset, size): + key = (fh, offset, size) + entry = self.cache.get(key) + if entry: + data, ts = entry + if time.time() - ts < self.cache_ttl: + logging.info(f"Cache HIT for fh={fh} offset={offset} size={size}") + return data + else: + logging.info(f"Cache EXPIRED for fh={fh} offset={offset} size={size}") + del self.cache[key] + return None + + def _cache_set(self, fh, offset, size, data): + key = (fh, offset, size) + if len(self.cache) >= self.cache_max_items: + self.cache.popitem(last=False) # remove oldest + self.cache[key] = (data, time.time()) + logging.info(f"Cache SET for fh={fh} offset={offset} size={size}") + + def _full_path(self, partial): + if partial.startswith("/"): + partial = partial[1:] # strip leading slash + return os.path.join(self.source_dir, partial) + + def getattr(self, path, fh=None): + logging.info(f"GETATTR {path}") + full_path = self._full_path(path) + + if os.path.isdir(full_path): + st = os.lstat(full_path) + return { + "st_mode": stat.S_IFDIR | 0o755, + "st_nlink": 2, + "st_size": 0, + "st_uid": 1000, + "st_gid": 1000, + "st_ctime": st.st_ctime, + "st_mtime": st.st_mtime, + "st_atime": st.st_atime, + } + + if not os.path.exists(full_path): + raise FileNotFoundError + + # For regular files, do HEAD request to get size of the redirected resource + st = os.lstat(full_path) + mode = stat.S_IFREG | 0o444 # read-only file + + try: + with open(full_path, "r") as f: + original_url = f.read().strip() + + session, final_url, size = self._head_follow(original_url) + + self.cache_files[path] = { + "original_url": original_url, + "url": final_url, + "size": size, + "session": session, + } + except Exception as e: + logging.warning(f"Failed HEAD request for {path}: {e}") + size = st.st_size + + return { + "st_mode": mode, + "st_nlink": 1, + "st_size": size, + "st_uid": 1000, + "st_gid": 1000, + "st_ctime": st.st_ctime, + "st_mtime": st.st_mtime, + "st_atime": st.st_atime, + } + + def readdir(self, path, fh): + logging.info(f"READDIR {path}") + full_path = self._full_path(path) + dirents = [".", ".."] + os.listdir(full_path) + for r in dirents: + yield r + + def open(self, path, flags): + full_path = self._full_path(path) + if not os.path.exists(full_path): + raise FileNotFoundError(f"File not found: {path}") + + # Resolve unique redirected URL & size using HEAD + session = self.cache_files[path]["session"] + stream_url = self.cache_files[path]["url"] + total_size = self.cache_files[path]["size"] + original_url = self.cache_files[path]["original_url"] + + # Allocate fh + with self._global_lock: + self.fd_counter += 1 + fh = self.fd_counter + + # Per-handle state + state = { + "session": session, # requests.Session + "original_url": original_url, # stable URL from source file + "stream_url": stream_url, # unique redirected URL (this session only) + "size": total_size, # may be 0 if unknown + "resp": None, # active requests.Response (stream) + "resp_lock": threading.Lock(), + + # Sliding buffer state + "buffer": bytearray(), + "buf_start": 0, # absolute offset of first byte in buffer + "buf_end": 0, # absolute offset *after* last byte in buffer + "desired_offset": 0, # where downloader should be streaming from + + "stop_event": threading.Event(), + "have_data": threading.Condition(), # notify readers when new bytes arrive + "downloader": None, + } + + # Start downloader; initial position is offset 0 (will adapt on first read) + t = threading.Thread(target=self._downloader_loop, args=(fh,), name=f"dl-{fh}", daemon=True) + state["downloader"] = t + self.open_files[fh] = state + t.start() + + logging.debug(f"OPEN {path} fh={fh} original={original_url} stream={stream_url} size={total_size}") + return fh + + def read(self, path, size, offset, fh): + st = self.open_files.get(fh) + if not st: + logging.error(f"READ on unknown fh={fh}") + return b"" + + # If requested range is outside current window, ask downloader to restart at offset + with st["have_data"]: + if not (st["buf_start"] <= offset < st["buf_end"] or (offset == st["buf_end"] and size == 0)): + # Request a (re)positioning of upstream stream + st["desired_offset"] = offset + # Clear buffer because we'll restart at new offset + with st["resp_lock"]: + st["buffer"].clear() + st["buf_start"] = offset + st["buf_end"] = offset + st["have_data"].notify_all() + + deadline = time.time() + READ_WAIT_TIMEOUT + # Wait until we have enough bytes or stream stopped + while True: + available = st["buf_end"] - offset + if available >= size or st["stop_event"].is_set(): + break + # If upstream known size and offset beyond EOF, return empty + if st["size"] and offset >= st["size"]: + break + remaining = deadline - time.time() + if remaining <= 0: + logging.warning(f"READ timeout fh={fh} off={offset} size={size} avail={available}") + break + st["have_data"].wait(timeout=min(0.2, remaining)) + + # Serve available bytes (may be partial near EOF/timeout) + start = max(offset, st["buf_start"]) + end = min(offset + size, st["buf_end"]) + if end <= start: + return b"" + + rel_start = start - st["buf_start"] + rel_end = end - st["buf_start"] + out = bytes(st["buffer"][rel_start:rel_end]) + logging.debug(f"READ fh={fh} off={offset} size={size} -> returned={len(out)} " + f"(buf [{st['buf_start']},{st['buf_end']}) len={len(st['buffer'])})") + return out + + def release(self, path, fh): + st = self.open_files.pop(fh, None) + if not st: + return 0 + logging.debug(f"RELEASE fh={fh} stream_url discarded") + st["stop_event"].set() + with st["have_data"]: + st["have_data"].notify_all() + # Close response and session + with st["resp_lock"]: + try: + if st["resp"] is not None: + st["resp"].close() + except Exception: + pass + try: + st["session"].close() + except Exception: + pass + + del self.cache_files[path] + # Let the downloader thread exit + return 0 + + # ---------- downloader logic ---------- + def _open_stream(self, st, start_offset): + """ + (Re)open the upstream HTTP stream at a specific offset using Range. + """ + # Close any existing response + with st["resp_lock"]: + if st["resp"] is not None: + try: + st["resp"].close() + except Exception: + pass + st["resp"] = None + + headers = {"Range": f"bytes={start_offset}-"} + logging.debug(f"HTTP OPEN stream from {start_offset} url={st['stream_url']}") + r = st["session"].get( + st["stream_url"], + headers=headers, + stream=True, + allow_redirects=False, # IMPORTANT: keep the resolved URL (no new redirect) + timeout=HTTP_TIMEOUT_SEC, + ) + # Accept 206 (preferred). Some servers may return 200 (no ranges) — we can handle if start_offset==0 + if r.status_code not in (200, 206): + try: + r.raise_for_status() + finally: + r.close() + + # Parse/refresh size from Content-Range if present + cr = r.headers.get("Content-Range") + if cr and "/" in cr: + try: + total = int(cr.split("/")[-1]) + st["size"] = total + except Exception: + pass + elif st["size"] == 0: + # Try Content-Length as a fallback (represents remaining bytes from start_offset) + try: + rem = int(r.headers.get("Content-Length", "0")) + if rem > 0: + st["size"] = start_offset + rem + except Exception: + pass + + with st["resp_lock"]: + st["resp"] = r + + def _downloader_loop(self, fh): + st = self.open_files.get(fh) + if not st: + return + + current_offset = 0 + # Start at desired_offset (initially 0; read() may change it) + with st["have_data"]: + current_offset = st["desired_offset"] + + try: + self._open_stream(st, current_offset) + except Exception as e: + logging.error(f"Downloader failed to open initial stream fh={fh}: {e}") + st["stop_event"].set() + with st["have_data"]: + st["have_data"].notify_all() + return + + while not st["stop_event"].is_set(): + # Check for seek request (desired_offset changed outside current window) + with st["have_data"]: + desired = st["desired_offset"] + # If desired is not the next byte to fetch, we need to restart stream at desired + if desired != st["buf_end"]: + current_offset = desired + # Reset buffer window to new start + with st["resp_lock"]: + st["buffer"].clear() + st["buf_start"] = desired + st["buf_end"] = desired + try: + self._open_stream(st, current_offset) + except Exception as e: + logging.error(f"Downloader reopen failed fh={fh} off={desired}: {e}") + st["stop_event"].set() + st["have_data"].notify_all() + break + + # Pull next chunk + try: + with st["resp_lock"]: + r = st["resp"] + if r is None: + # Stream closed unexpectedly; try to reopen at buf_end + current = st["buf_end"] + try: + self._open_stream(st, current) + with st["resp_lock"]: + r = st["resp"] + except Exception as e: + logging.error(f"Downloader reopen-after-null failed fh={fh}: {e}") + st["stop_event"].set() + with st["have_data"]: + st["have_data"].notify_all() + break + + chunk = r.raw.read(READ_CHUNK_BYTES) + if not chunk: + # EOF + logging.debug(f"Downloader EOF fh={fh}") + st["stop_event"].set() + with st["have_data"]: + st["have_data"].notify_all() + break + + # Append chunk; enforce sliding window + with st["have_data"]: + st["buffer"].extend(chunk) + st["buf_end"] += len(chunk) + + if len(st["buffer"]) > BUFFER_MAX_BYTES: + # Evict from front + evict = len(st["buffer"]) - BUFFER_MAX_BYTES + del st["buffer"][:evict] + st["buf_start"] += evict + + st["have_data"].notify_all() + + if DOWNLOAD_SLEEP_SEC > 0: + time.sleep(DOWNLOAD_SLEEP_SEC) + + except requests.exceptions.RequestException as e: + logging.warning(f"Downloader network error fh={fh}: {e}; retrying shortly") + time.sleep(0.5) + # Try to reopen at current buf_end + try: + self._open_stream(st, st["buf_end"]) + except Exception as e2: + logging.error(f"Downloader reopen after error failed fh={fh}: {e2}") + st["stop_event"].set() + with st["have_data"]: + st["have_data"].notify_all() + break + except Exception as e: + logging.error(f"Downloader unexpected error fh={fh}: {e}") + st["stop_event"].set() + with st["have_data"]: + st["have_data"].notify_all() + break + +def main(source_dir, mount_dir): + FUSE(VideoStreamFS(source_dir), mount_dir, nothreads=True, foreground=True, ro=True, allow_other=True) + +if __name__ == "__main__": + import sys + if len(sys.argv) != 3: + print(f"Usage: {sys.argv[0]} ") + sys.exit(1) + main(sys.argv[1], sys.argv[2])