committing what I've got for fuse fs

This commit is contained in:
dekzter 2025-10-11 10:40:34 -04:00
parent ca8e9d0143
commit adf84294eb
5 changed files with 1605 additions and 0 deletions

420
docker/dfs.py Normal file
View file

@ -0,0 +1,420 @@
#!/usr/bin/env python3
"""
StreamFS a Python FUSE virtual filesystem that exposes video-on-demand URLs
(from mapping files) as regular read-only files with full byte-range support.
Design highlights
-----------------
- Mapping directory (default: /mnt/mappings) contains *text files*. Each file's
content is a single URL (source URL). The filename becomes the exposed file.
- When a client opens a file, StreamFS issues a HEAD request to the source URL.
The server returns a *unique* session URL in the `X-Session-URL` header. That
session URL is used for **all subsequent GETs** for that file handle.
- File size is taken from `Content-Length` (returned by HEAD).
- `read()` uses HTTP Range requests against the session URL so that scrubbing
(random seeks) works seamlessly.
- Read-only filesystem. Multiple simultaneous clients are supported; each open
gets its own session URL and connection pool.
Requirements
------------
- Linux with FUSE (libfuse) installed.
- Python 3.8+
- `fusepy` and `requests` packages.
pip install fusepy requests
Mount example
-------------
sudo ./streamfs.py /mnt/streamfs --maps /mnt/mappings -f -d
Unmount
-------
sudo umount /mnt/streamfs
Notes
-----
- The mapping directory is reloaded automatically on directory listing calls,
so adding/removing mapping files becomes visible to clients.
- Optional small read-ahead cache per open handle is included for sequential
reads; it is conservative and safe to disable with `--no-readahead`.
- TLS verification can be disabled (e.g., for lab networks) via `--insecure`.
"""
import errno
import logging
import os
import stat
import sys
import time
import threading
import argparse
from typing import Dict, Tuple, Optional
import requests
from fuse import FUSE, Operations, LoggingMixIn
# -------------------------- Helpers & Data Classes -------------------------- #
class HTTPError(IOError):
pass
def clamp(n: int, low: int, high: int) -> int:
return max(low, min(high, n))
class OpenHandle:
"""Tracks state for one open() file handle."""
def __init__(self, logical_path: str, source_url: str, session: requests.Session,
session_url: str, size: int, readahead: bool, verify_tls: bool):
self.path = logical_path
self.source_url = source_url
self.session = session
self.session_url = session_url
self.size = size
self.verify_tls = verify_tls
# Simple read-ahead cache: maps (start,end) -> bytes
self.readahead = readahead
self.cache_lock = threading.Lock()
self.cache: Dict[Tuple[int, int], bytes] = {}
self.max_cache_bytes = 4 * 1024 * 1024 # 4 MiB per handle cap
self.cache_bytes = 0
def _add_cache(self, start: int, data: bytes):
if not self.readahead:
return
with self.cache_lock:
end = start + len(data)
key = (start, end)
# If adding exceeds cap, clear cache (simple strategy).
if self.cache_bytes + len(data) > self.max_cache_bytes:
self.cache.clear()
self.cache_bytes = 0
self.cache[key] = data
self.cache_bytes += len(data)
def _get_from_cache(self, start: int, size: int) -> Optional[bytes]:
if not self.readahead or size <= 0:
return None
with self.cache_lock:
for (s, e), blob in list(self.cache.items()):
if start >= s and (start + size) <= e:
off = start - s
return blob[off:off + size]
return None
def ranged_get(self, start: int, size: int) -> bytes:
if size <= 0:
return b""
# Clamp against EOF
end_inclusive = clamp(start + size - 1, 0, self.size - 1)
if end_inclusive < start:
return b""
# Cache lookup
cached = self._get_from_cache(start, end_inclusive - start + 1)
if cached is not None:
return cached
headers = {"Range": f"bytes={start}-{end_inclusive}"}
resp = self.session.get(self.session_url, headers=headers, stream=False,
timeout=(5, 30), verify=self.verify_tls)
if resp.status_code not in (200, 206):
raise HTTPError(f"Unexpected GET status {resp.status_code} for {self.path}")
data = resp.content
# Cache the full returned chunk
self._add_cache(start, data)
return data
def close(self):
try:
self.session.close()
except Exception:
pass
# ------------------------------ Filesystem Core ----------------------------- #
class StreamFS(LoggingMixIn, Operations):
def __init__(self, mappings_dir: str, verify_tls: bool = True, readahead: bool = True):
super().__init__()
self.mappings_dir = os.path.abspath(mappings_dir)
self.verify_tls = verify_tls
self.readahead = readahead
self._log = logging.getLogger("StreamFS")
self._log.info("Using mappings dir: %s", self.mappings_dir)
# name -> source URL string (loaded from files)
self.mappings: Dict[str, str] = {}
self._mappings_mtime = 0.0
self._mappings_lock = threading.Lock()
# name -> (size, etag/last-modified)
self.meta_cache: Dict[str, Tuple[int, Optional[str]]] = {}
self._meta_lock = threading.Lock()
# fh -> OpenHandle
self._fh_lock = threading.Lock()
self._next_fh = 3
self._open_handles: Dict[int, OpenHandle] = {}
if not os.path.isdir(self.mappings_dir):
raise RuntimeError(f"Mappings directory does not exist: {self.mappings_dir}")
self._reload_mappings(force=True)
# --------------------------- Mapping management -------------------------- #
def _reload_mappings(self, force: bool = False):
try:
mtime = os.stat(self.mappings_dir).st_mtime
except FileNotFoundError:
mtime = time.time()
if not force and mtime <= self._mappings_mtime:
return
with self._mappings_lock:
new_map: Dict[str, str] = {}
for entry in os.listdir(self.mappings_dir):
full = os.path.join(self.mappings_dir, entry)
if not os.path.isfile(full):
continue
try:
with open(full, "r", encoding="utf-8") as f:
url = f.read().strip()
if url:
new_map[entry] = url
except Exception as e:
self._log.warning("Skipping mapping %s: %s", entry, e)
self.mappings = new_map
self._mappings_mtime = mtime
self._log.info("Loaded %d mappings", len(self.mappings))
# ------------------------------- Utilities ------------------------------- #
def _source_for_path(self, path: str) -> Optional[str]:
name = path.lstrip("/")
return self.mappings.get(name)
def _head_fetch_meta(self, source_url: str) -> Tuple[int, Optional[str], str]:
"""Return (size, etag_or_lastmod, session_url) from HEAD.
Expects server to include X-Session-URL header.
"""
s = requests.Session()
resp = s.get(source_url, allow_redirects=True, timeout=(5, 15), verify=self.verify_tls)
if resp.status_code >= 400:
s.close()
raise HTTPError(f"HEAD failed {resp.status_code} for {source_url}")
size_hdr = resp.headers.get("Content-Length")
if not size_hdr:
s.close()
raise HTTPError("No Content-Length on HEAD")
try:
size = int(size_hdr)
except ValueError:
s.close()
raise HTTPError("Invalid Content-Length")
session_url = resp.headers.get("X-Session-URL")
if not session_url:
# Fallback: use final URL if header not present
session_url = str(resp.url)
etag = resp.headers.get("ETag") or resp.headers.get("Last-Modified")
# Keep this session for the handle that will use it.
return size, etag, session_url, s
# ------------------------------- FUSE ops -------------------------------- #
def getattr(self, path, fh=None):
self._reload_mappings()
if path == "/":
mode = stat.S_IFDIR | 0o555
now = int(time.time())
return {
"st_mode": mode,
"st_nlink": 2,
"st_ctime": now,
"st_mtime": now,
"st_atime": now,
}
src = self._source_for_path(path)
if not src:
raise OSError(errno.ENOENT, "No such file", path)
# Find or refresh meta
name = path.lstrip("/")
size: Optional[int] = None
with self._meta_lock:
meta = self.meta_cache.get(name)
if meta:
size = meta[0]
if size is None:
try:
size, etag, _session_url, sess = self._head_fetch_meta(src)
sess.close()
with self._meta_lock:
self.meta_cache[name] = (size, etag)
except Exception as e:
self._log.error("HEAD meta fetch failed for %s: %s", name, e)
raise OSError(errno.EIO, str(e))
mode = stat.S_IFREG | 0o444
now = int(time.time())
return {
"st_mode": mode,
"st_nlink": 1,
"st_size": size,
"st_ctime": now,
"st_mtime": now,
"st_atime": now,
}
def readdir(self, path, fh):
self._reload_mappings()
if path != "/":
raise OSError(errno.ENOENT, "No such directory", path)
yield from [".", "..", *sorted(self.mappings.keys())]
def open(self, path, flags):
# Read-only enforcement
if flags & (os.O_WRONLY | os.O_RDWR):
raise OSError(errno.EACCES, "Read-only filesystem")
src = self._source_for_path(path)
if not src:
raise OSError(errno.ENOENT, "No such file", path)
# HEAD to obtain size and session URL
try:
size, etag, session_url, session = self._head_fetch_meta(src)
except Exception as e:
self._log.error("open() HEAD failed for %s: %s", path, e)
raise OSError(errno.EIO, str(e))
# Update meta cache
with self._meta_lock:
self.meta_cache[path.lstrip("/")] = (size, etag)
with self._fh_lock:
fh = self._next_fh
self._next_fh += 1
self._open_handles[fh] = OpenHandle(
logical_path=path,
source_url=src,
session=session,
session_url=session_url,
size=size,
readahead=self.readahead,
verify_tls=self.verify_tls,
)
self._log.debug("Opened %s (fh=%d) session_url=%s size=%d", path, fh, session_url, size)
return fh
def read(self, path, size, offset, fh):
with self._fh_lock:
handle = self._open_handles.get(fh)
if not handle:
raise OSError(errno.EBADF, "Invalid file handle")
try:
return handle.ranged_get(offset, size)
except HTTPError as e:
self._log.error("HTTP read error for %s: %s", path, e)
raise OSError(errno.EIO, str(e))
except requests.RequestException as e:
self._log.error("Network error for %s: %s", path, e)
raise OSError(errno.EIO, str(e))
# VFS is read-only; deny write ops explicitly
def write(self, path, data, offset, fh):
raise OSError(errno.EROFS, "Read-only filesystem")
def truncate(self, path, length, fh=None):
raise OSError(errno.EROFS, "Read-only filesystem")
def unlink(self, path):
raise OSError(errno.EROFS, "Read-only filesystem")
def rename(self, old, new):
raise OSError(errno.EROFS, "Read-only filesystem")
def mkdir(self, path, mode):
raise OSError(errno.EROFS, "Read-only filesystem")
def rmdir(self, path):
raise OSError(errno.EROFS, "Read-only filesystem")
def chmod(self, path, mode):
raise OSError(errno.EROFS, "Read-only filesystem")
def chown(self, path, uid, gid):
raise OSError(errno.EROFS, "Read-only filesystem")
def utimens(self, path, times=None):
# Ignore timestamp changes to avoid errors in some clients
return 0
def release(self, path, fh):
with self._fh_lock:
handle = self._open_handles.pop(fh, None)
if handle:
handle.close()
return 0
def flush(self, path, fh):
# No writeback
return 0
# ---------------------------------- Main ----------------------------------- #
def parse_args(argv=None):
p = argparse.ArgumentParser(description="VOD virtual filesystem over HTTP with Range support")
p.add_argument("mountpoint", help="Mount point for the FUSE filesystem")
p.add_argument("--maps", dest="maps", default="/mnt/mappings",
help="Directory of mapping files (default: /mnt/mappings)")
p.add_argument("-f", "--foreground", action="store_true", help="Run in foreground")
p.add_argument("-d", "--debug", action="store_true", help="Enable debug logging")
p.add_argument("--insecure", action="store_true", help="Disable TLS verification")
p.add_argument("--no-readahead", action="store_true", help="Disable read-ahead cache")
return p.parse_args(argv)
def main(argv=None):
args = parse_args(argv)
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(level=log_level, format="[%(levelname)s] %(name)s: %(message)s")
fs = StreamFS(
mappings_dir=args.maps,
verify_tls=not args.insecure,
readahead=not args.no_readahead,
)
# Mount options: allow other users to read, default_permissions for kernel checks
fuse_opts = {
"foreground": args.foreground,
"allow_other": True,
"ro": True,
"default_permissions": True,
}
# Convert dict to FUSE option list
mount_opts = []
for k, v in fuse_opts.items():
if isinstance(v, bool):
if v:
mount_opts.append(k)
else:
mount_opts.append(f"{k}={v}")
FUSE(fs, args.mountpoint, nothreads=True, **{opt: True for opt in mount_opts})
if __name__ == "__main__":
main()

View file

@ -37,5 +37,18 @@ services:
ports:
- 8081:8081
dispatcharr-fs:
image: dispatcharr/fuse
container_name: dispatcharr-fs
cap_add:
- SYS_ADMIN
devices:
- /dev/fuse:/dev/fuse
security_opt:
- apparmor:unconfined
volumes:
- /appdata/dispatcharr/vod:/mnt:shared
restart: unless-stopped
volumes:
dispatcharr_dev_pgadmin:

523
docker/media-fs.py Normal file
View file

@ -0,0 +1,523 @@
#!/usr/bin/env python3
import errno
import os
import stat
import time
import argparse
import threading
from typing import Dict, Optional, Tuple
import requests
from requests.adapters import HTTPAdapter
from fuse import FUSE, Operations, FuseOSError
# Notes:
# - Requires: pip install fusepy requests
# - Linux: install FUSE (e.g., sudo apt-get install fuse3) and ensure user can mount
# - Mount with: sudo python3 url_stream_fs.py --source /path/to/url-files --mount /mnt/streamfs -o allow_other
#
# Behavior:
# - Mirrors the directory structure under --source.
# - Each regular file is treated as a "URL file": first non-empty, non-comment line is the media URL.
# - getattr:
# - For files, returns a read-only regular file with size derived from HEAD Content-Length.
# - Size is cached with TTL to avoid excessive HEADs.
# - open:
# - Performs HEAD against the file's URL, reads Content-Length and X-Session-URL (if present).
# - Stores a per-open handle with "session_url" for subsequent reads.
# - read:
# - Issues GET with Range: bytes=offset-end to the per-handle session_url (or original URL if no session header).
# - Returns exactly the bytes requested by FUSE (or fewer near EOF).
# - release:
# - Closes the per-open requests.Session and discards handle state.
DEFAULT_TTL_SECONDS = 60 # cache TTL for HEAD-derived file size (getattr)
DEFAULT_TIMEOUT = (5, 30) # (connect, read) timeouts for requests
USER_AGENT = "URLStreamFS/1.1 (+https://github.com/)"
# Read-only stat size strategy to avoid prematurely creating sessions:
# - "zero": always report 0 (never HEAD on getattr)
# - "cached": report last-known size if we've opened before (no HEAD on getattr)
# - "head": perform HEAD on getattr (may create a session early)
DEFAULT_STAT_MODE = "cached"
# If a GET returns one of these, we will refresh the session and retry once
SESSION_RETRY_STATUS = {401, 403, 404, 410, 412, 429, 500, 502, 503, 504}
def is_probably_url(s: str) -> bool:
s = s.strip().lower()
return s.startswith("http://") or s.startswith("https://")
def read_url_from_file(local_path: str) -> str:
# Reads first non-empty, non-comment line (comments start with '#')
with open(local_path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if not is_probably_url(line):
raise FuseOSError(errno.EINVAL)
return line
raise FuseOSError(errno.EINVAL)
class URLStreamFS(Operations):
def __init__(
self,
source_root: str,
extra_headers: Optional[Dict[str, str]] = None,
stat_ttl: int = DEFAULT_TTL_SECONDS,
timeout: Tuple[int, int] = DEFAULT_TIMEOUT,
only_ext: Optional[Tuple[str, ...]] = None,
stat_mode: str = DEFAULT_STAT_MODE,
):
self.source_root = os.path.abspath(source_root)
self.extra_headers = extra_headers or {}
self.stat_ttl = stat_ttl
self.timeout = timeout
self.only_ext = tuple(e.lower() for e in only_ext) if only_ext else None
self.stat_mode = stat_mode # zero|cached|head
# Cache for HEAD-derived sizes keyed by absolute path
self._size_cache: Dict[str, Dict] = {}
self._size_lock = threading.RLock()
# Handle table for per-open sessions
self._fh_lock = threading.RLock()
self._next_fh = 3 # arbitrary start
self._handles: Dict[int, Dict] = {}
# Helpers
def _full_path(self, path: str) -> str:
assert path.startswith("/")
return os.path.join(self.source_root, path.lstrip("/"))
def _is_supported_file(self, full: str) -> bool:
if not os.path.isfile(full):
return False
if self.only_ext:
_, ext = os.path.splitext(full)
return ext.lower() in self.only_ext
return True
def _requests_session(self) -> requests.Session:
s = requests.Session()
s.headers.update(
{
"User-Agent": USER_AGENT,
"Connection": "keep-alive",
**self.extra_headers,
}
)
# A small connection pool helps concurrent readers
adapter = HTTPAdapter(pool_connections=8, pool_maxsize=8, max_retries=0)
s.mount("http://", adapter)
s.mount("https://", adapter)
return s
def _head(self, session: requests.Session, url: str) -> requests.Response:
# Allow redirects; server returns Content-Length and possibly X-Session-URL
resp = session.head(url, allow_redirects=True, timeout=self.timeout)
if resp.status_code >= 400:
raise FuseOSError(errno.EIO)
return resp
def _update_size_cache(self, full: str, size: Optional[int]) -> None:
with self._size_lock:
self._size_cache[full] = {
"ts": time.time(),
"local_mtime": os.path.getmtime(full),
"size": size,
}
def _get_size_cached(self, full: str, url: str) -> Optional[int]:
# Returns size from cache or refreshes via HEAD (using a short-lived session not tied to 'open')
now = time.time()
local_mtime = os.path.getmtime(full)
with self._size_lock:
entry = self._size_cache.get(full)
if (
entry
and entry["local_mtime"] == local_mtime
and (now - entry["ts"]) <= self.stat_ttl
and entry.get("size") is not None
):
return entry["size"]
# Avoid creating server-side viewing sessions during getattr unless explicitly requested
if self.stat_mode != "head":
return None
# Refresh with HEAD (may create/advance a session server-side)
sess = self._requests_session()
try:
resp = self._head(sess, url)
size = None
cl = resp.headers.get("Content-Length")
if cl is not None:
try:
size = int(cl)
except ValueError:
size = None
finally:
sess.close()
with self._size_lock:
self._size_cache[full] = {
"ts": now,
"local_mtime": local_mtime,
"size": size,
}
return size
def _ensure_session(self, handle: Dict) -> None:
# Lazily start or refresh a session for this open handle
if handle.get("session_url"):
return
sess: requests.Session = handle["sess"]
orig_url: str = handle["orig_url"]
head = self._head(sess, orig_url)
session_url = head.headers.get("X-Session-URL") or orig_url
size = None
cl = head.headers.get("Content-Length")
if cl is not None:
try:
size = int(cl)
except ValueError:
size = None
handle["session_url"] = session_url
handle["size"] = size
# Save for getattr(stat_mode=cached)
self._update_size_cache(handle["full"], size)
def access(self, path, mode):
full = self._full_path(path)
if not os.path.exists(full):
raise FuseOSError(errno.ENOENT)
# Read-only FS
if mode & (os.W_OK | os.X_OK):
# Allow execute traversal on directories, deny writes
if os.path.isdir(full):
return 0
if mode & os.W_OK:
raise FuseOSError(errno.EROFS)
return 0
def getattr(self, path, fh=None):
full = self._full_path(path)
if not os.path.exists(full):
raise FuseOSError(errno.ENOENT)
st = os.lstat(full)
# Directory: mirror local attrs
if stat.S_ISDIR(st.st_mode):
return dict(
st_mode=(stat.S_IFDIR | 0o555), # read+execute
st_nlink=2,
st_size=0,
st_ctime=st.st_ctime,
st_mtime=st.st_mtime,
st_atime=st.st_atime,
)
# Files: expose as read-only regular files and override size from remote (per stat_mode)
if self._is_supported_file(full):
mode = stat.S_IFREG | 0o444
try:
url = read_url_from_file(full)
except FuseOSError:
url = None
size = 0
if url:
if self.stat_mode == "zero":
size = 0
elif self.stat_mode == "cached":
cached = self._get_size_cached(full, url) # will not network-call in cached mode
size = cached if cached is not None else 0
else: # head
probed = self._get_size_cached(full, url) # performs HEAD when stale
size = probed if probed is not None else 0
return dict(
st_mode=mode,
st_nlink=1,
st_size=size,
st_ctime=st.st_ctime,
st_mtime=st.st_mtime,
st_atime=st.st_atime,
)
# Non-supported regular files: show as 0444 with local size
return dict(
st_mode=(stat.S_IFREG | 0o444),
st_nlink=1,
st_size=st.st_size,
st_ctime=st.st_ctime,
st_mtime=st.st_mtime,
st_atime=st.st_atime,
)
def readdir(self, path, fh):
full = self._full_path(path)
if not os.path.isdir(full):
raise FuseOSError(errno.ENOTDIR)
entries = [".", ".."]
try:
for name in os.listdir(full):
entries.append(name)
except PermissionError:
raise FuseOSError(errno.EACCES)
for e in entries:
yield e
# Open returns a new file handle; session is created lazily on first read
def open(self, path, flags):
full = self._full_path(path)
if not os.path.exists(full):
raise FuseOSError(errno.ENOENT)
if not self._is_supported_file(full):
if flags & (os.O_WRONLY | os.O_RDWR):
raise FuseOSError(errno.EROFS)
raise FuseOSError(errno.EPERM)
if flags & (os.O_WRONLY | os.O_RDWR):
raise FuseOSError(errno.EROFS)
url = read_url_from_file(full)
sess = self._requests_session()
with self._fh_lock:
fh = self._next_fh
self._next_fh += 1
self._handles[fh] = {
"path": path,
"full": full,
"orig_url": url,
"session_url": None, # defer creating session until first read
"size": None,
"sess": sess,
"lock": threading.RLock(),
}
return fh
def _perform_range_get(self, sess: requests.Session, url: str, start: int, size: int) -> requests.Response:
end = start + size - 1 if size > 0 else None
headers = {}
if size > 0:
headers["Range"] = f"bytes={start}-{end}"
return sess.get(
url,
headers=headers,
stream=True,
allow_redirects=True,
timeout=self.timeout,
)
def read(self, path, size, offset, fh):
with self._fh_lock:
handle = self._handles.get(fh)
if handle is None:
raise FuseOSError(errno.EBADF)
sess: requests.Session = handle["sess"]
# Lazily create or refresh session before first read
try:
with handle["lock"]:
if not handle.get("session_url"):
self._ensure_session(handle)
session_url = handle["session_url"]
except Exception:
raise
# Attempt GET; if it indicates session is invalid, refresh once and retry
def fetch_once(target_url: str):
return self._perform_range_get(sess, target_url, max(0, offset), size)
try:
with handle["lock"]:
resp = fetch_once(session_url)
# If server sends a new X-Session-URL on GET, update our handle
new_session_url = resp.headers.get("X-Session-URL")
if new_session_url and new_session_url != session_url:
handle["session_url"] = new_session_url
session_url = new_session_url
if resp.status_code in SESSION_RETRY_STATUS:
# Refresh session and retry once
self._ensure_session(handle)
session_url = handle["session_url"]
resp.close()
resp = fetch_once(session_url)
if resp.status_code not in (200, 206):
if resp.status_code == 416:
return b""
raise FuseOSError(errno.EIO)
data = b""
start = max(0, offset)
if resp.status_code == 200 and start > 0:
# Fallback: server ignored range; skip then slice
for chunk in resp.iter_content(chunk_size=1024 * 1024):
if not chunk:
break
if start >= len(chunk):
start -= len(chunk)
continue
take = min(len(chunk) - start, size - len(data))
data += chunk[start : start + take]
start = 0
if len(data) >= size:
break
else:
for chunk in resp.iter_content(chunk_size=1024 * 256):
if not chunk:
break
need = size - len(data)
if need <= 0:
break
if len(chunk) > need:
data += chunk[:need]
break
data += chunk
return data
except requests.Timeout:
raise FuseOSError(errno.ETIMEDOUT)
except requests.RequestException:
raise FuseOSError(errno.EIO)
def release(self, path, fh):
with self._fh_lock:
handle = self._handles.pop(fh, None)
if handle:
try:
handle["sess"].close()
except Exception:
pass
return 0
# Read-only FS
def unlink(self, path):
raise FuseOSError(errno.EROFS)
def rename(self, old, new):
raise FuseOSError(errno.EROFS)
def mkdir(self, path, mode):
raise FuseOSError(errno.EROFS)
def rmdir(self, path):
raise FuseOSError(errno.EROFS)
def chmod(self, path, mode):
raise FuseOSError(errno.EROFS)
def chown(self, path, uid, gid):
raise FuseOSError(errno.EROFS)
def truncate(self, path, length, fh=None):
raise FuseOSError(errno.EROFS)
def utimens(self, path, times=None):
# Ignore updates
return 0
def statfs(self, path):
# Provide some sane defaults; not critical for media servers
block_size = 4096
total = 1 << 30 # 1 GiB virtual
free = total // 2
return dict(
f_bsize=block_size,
f_frsize=block_size,
f_blocks=total // block_size,
f_bfree=free // block_size,
f_bavail=free // block_size,
f_files=1000000,
f_ffree=999999,
f_namemax=255,
)
def parse_headers(header_list: Optional[list]) -> Dict[str, str]:
headers = {}
if not header_list:
return headers
for h in header_list:
if ":" not in h:
continue
k, v = h.split(":", 1)
headers[k.strip()] = v.strip()
return headers
def main():
parser = argparse.ArgumentParser(description="Mount URL-streaming FUSE filesystem")
parser.add_argument("--source", required=True, help="Local directory with URL files")
parser.add_argument("--mount", required=True, help="Mount point")
parser.add_argument(
"--ext",
action="append",
help="Only treat files with this extension as URL files (e.g., --ext .url). Repeatable.",
)
parser.add_argument(
"--header",
action="append",
help="Extra HTTP header to send to the API (e.g., --header 'Authorization: Bearer ...'). Repeatable.",
)
parser.add_argument("--stat-ttl", type=int, default=DEFAULT_TTL_SECONDS, help="Seconds to cache remote size")
parser.add_argument(
"--stat-mode",
choices=["zero", "cached", "head"],
default=DEFAULT_STAT_MODE,
help="How getattr reports size: zero (no HEAD), cached (no HEAD until opened), or head (HEAD on getattr).",
)
parser.add_argument("--fg", action="store_true", help="Run in foreground")
parser.add_argument("-o", dest="fuseopts", default="", help="Additional FUSE options (e.g., allow_other)")
args = parser.parse_args()
if not os.path.isdir(args.source):
raise SystemExit(f"Source directory not found: {args.source}")
if not os.path.isdir(args.mount):
raise SystemExit(f"Mount point must exist and be a directory: {args.mount}")
headers = parse_headers(args.header)
only_ext = tuple(args.ext) if args.ext else None
fs = URLStreamFS(
source_root=args.source,
extra_headers=headers,
stat_ttl=args.stat_ttl,
only_ext=only_ext,
stat_mode=args.stat_mode,
)
# Prepare FUSE options
fuse_kwargs = {
"foreground": args.fg,
"nothreads": False,
"allow_other": "allow_other" in (args.fuseopts or ""),
}
# fusepy uses options via -o; we pass the raw string if provided
FUSE(fs, args.mount, nothreads=fuse_kwargs["nothreads"], foreground=fuse_kwargs["foreground"], fsname="urlstreamfs", ro=True, debug=args.fg)
if __name__ == "__main__":
main()

220
docker/streamfs.py Normal file
View file

@ -0,0 +1,220 @@
#!/usr/bin/env python3
import os
import stat
import time
import errno
import logging
import requests
import subprocess
import json
from fuse import FUSE, Operations, FuseOSError
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
MAPPINGS_DIR = "/mnt/mappings" # folder with files whose content is the URL to the stream
MOUNT_POINT = "/mnt/streams" # mount point for the FUSE filesystem
CACHE_TTL = 300 # seconds for cache validity
class StreamFS(Operations):
def __init__(self, mappings_dir):
self.mappings_dir = mappings_dir
self._reload_mappings()
self.size_cache = {}
self.metadata_cache = {}
def _reload_mappings(self):
try:
self.files = os.listdir(self.mappings_dir)
except FileNotFoundError:
self.files = []
def _get_url_for_file(self, filename):
try:
with open(os.path.join(self.mappings_dir, filename), "r") as f:
return f.read().strip()
except Exception as e:
logging.error(f"Failed to read URL for {filename}: {e}")
return None
def _http_head_size(self, url):
now = time.time()
if url in self.size_cache:
ts, size = self.size_cache[url]
if now - ts < CACHE_TTL:
return size
try:
r = requests.head(url, timeout=5)
if r.status_code in (200, 206):
size = int(r.headers.get('Content-Length', '0'))
self.size_cache[url] = (now, size)
logging.debug(f"Fetched size {size} for {url}")
return size
else:
logging.warning(f"HEAD request returned status {r.status_code} for {url}")
except Exception as e:
logging.error(f"HEAD request failed for {url}: {e}")
return 0
def _get_metadata(self, url):
now = time.time()
if url in self.metadata_cache:
ts, metadata_str = self.metadata_cache[url]
if now - ts < CACHE_TTL:
return metadata_str
# Use ffprobe to get video metadata
# Note: ffprobe expects a URL or a local file path
try:
cmd = [
"ffprobe", "-v", "error", "-show_entries",
"format=duration:stream=codec_name,width,height",
"-of", "json", url
]
logging.debug(f"Running ffprobe for {url}")
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if proc.returncode != 0:
logging.error(f"ffprobe error for {url}: {proc.stderr.strip()}")
metadata_str = "ffprobe error: " + proc.stderr.strip()
else:
info = json.loads(proc.stdout)
format_info = info.get("format", {})
streams = info.get("streams", [])
duration = format_info.get("duration", "unknown")
duration = float(duration) if duration != "unknown" else None
# Gather codecs and resolution from first video stream
video_stream = None
for s in streams:
if s.get("codec_type") == "video":
video_stream = s
break
codec = video_stream.get("codec_name", "unknown") if video_stream else "unknown"
width = video_stream.get("width", "unknown") if video_stream else "unknown"
height = video_stream.get("height", "unknown") if video_stream else "unknown"
metadata_str = (
f"Duration: {duration:.2f} sec\n" if duration else "Duration: unknown\n"
)
metadata_str += f"Video codec: {codec}\n"
metadata_str += f"Resolution: {width}x{height}\n"
self.metadata_cache[url] = (now, metadata_str)
return metadata_str
except Exception as e:
logging.error(f"Failed to run ffprobe for {url}: {e}")
return "Metadata unavailable\n"
# -- FUSE operations --
def readdir(self, path, fh):
logging.debug(f"readdir called for {path}")
self._reload_mappings()
yield '.'
yield '..'
for filename in self.files:
# yield video file
yield filename
def getattr(self, path, fh=None):
logging.debug(f"getattr called for {path}")
if path == "/" or path == "":
return dict(st_mode=(stat.S_IFDIR | 0o755), st_nlink=2)
# Normal video files
filename = path.lstrip("/")
if filename not in self.files:
raise FuseOSError(errno.ENOENT)
url = self._get_url_for_file(filename)
if not url:
raise FuseOSError(errno.ENOENT)
size = self._http_head_size(url)
# Get stat of mapping file for ownership and timestamps
full_path = os.path.join(self.mappings_dir, filename)
try:
st = os.lstat(full_path)
uid = st.st_uid
gid = st.st_gid
atime = st.st_atime
mtime = st.st_mtime
ctime = st.st_ctime
except Exception:
uid = os.getuid()
gid = os.getgid()
atime = mtime = ctime = time.time()
return dict(
st_mode=(stat.S_IFREG | 0o444),
st_nlink=1,
st_size=size,
st_uid=uid,
st_gid=gid,
st_atime=atime,
st_mtime=mtime,
st_ctime=ctime,
)
def getxattr(self, path, name, position=0):
logging.debug(f"getxattr called for {path} name={name}")
# Return ENOTSUP error without exception
raise FuseOSError(errno.ENOTSUP)
def open(self, path, flags):
filename = path.lstrip("/").lstrip(".").rstrip(".info")
logging.debug(f"Requested to open {filename}")
logging.debug(f"Current files: {json.dumps(self.files)}")
if filename not in self.files:
raise FuseOSError(errno.ENOENT)
logging.debug(f"open called for file: {filename}")
return 0
def read(self, path, size, offset, fh):
logging.debug(f"read called for path={path}, size={size}, offset={offset}")
if path.endswith(".info"):
base_filename = path[1:-5]
url = self._get_url_for_file(base_filename)
if not url:
return b""
metadata_str = self._get_metadata(url)
data = metadata_str.encode("utf-8")
return data[offset:offset + size]
filename = path.lstrip("/")
url = self._get_url_for_file(filename)
if not url:
return b""
headers = {"Range": f"bytes={offset}-{offset + size - 1}"}
try:
with requests.get(url, headers=headers, stream=True, timeout=15) as r:
r.raise_for_status()
data = bytearray()
for chunk in r.iter_content(chunk_size=8192):
if not chunk:
break
data.extend(chunk)
if len(data) >= size:
break
# Make sure to return exactly requested size
return bytes(data[:size])
except Exception as e:
logging.error(f"Error reading {url} range {offset}-{offset + size - 1}: {e}")
return b""
if __name__ == "__main__":
os.makedirs(MAPPINGS_DIR, exist_ok=True)
os.makedirs(MOUNT_POINT, exist_ok=True)
logging.info(f"Mounting StreamFS: {MAPPINGS_DIR} -> {MOUNT_POINT}")
FUSE(StreamFS(MAPPINGS_DIR), MOUNT_POINT, nothreads=True, foreground=True, allow_other=True)

429
docker/virtfs.py Normal file
View file

@ -0,0 +1,429 @@
#!/usr/bin/env python3
import os
import stat
import logging
import requests
from fuse import FUSE, Operations, LoggingMixIn
from collections import OrderedDict
import time
from urllib.parse import urlparse
import threading
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# -------------------------
# Tunables for performance
# -------------------------
READ_CHUNK_BYTES = 1024 * 256 # 256 KiB chunks pulled from upstream per read iteration
BUFFER_MAX_BYTES = 1024 * 1024 * 8 # 8 MiB sliding window per open handle
DOWNLOAD_SLEEP_SEC = 0.0 # small pause per chunk to ease throttling (e.g., 0.01)
READ_WAIT_TIMEOUT = 15.0 # max seconds a read waits for needed bytes to arrive
HTTP_TIMEOUT_SEC = 20 # connect/read timeout for requests
USER_UID = 1000 # tweak to your environment if needed
USER_GID = 1000
class VideoStreamFS(LoggingMixIn, Operations):
def __init__(self, source_dir):
self.source_dir = source_dir
self.fd_counter = 0
self.open_files = {}
self._global_lock = threading.Lock()
self.cache = OrderedDict() # (fh, offset, size) -> (data, timestamp)
self.cache_max_items = 10
self.cache_ttl = 5 # seconds
self.cache_files = {}
logging.info(f"Initialized VideoStreamFS with source_dir={source_dir}")
# ---------- helpers ----------
def _full_path(self, partial):
if partial.startswith("/"):
partial = partial[1:]
return os.path.join(self.source_dir, partial)
def _head_follow(self, url):
"""
HEAD (preferred) to follow redirects and get final URL + size.
Fallback to GET(stream=True) if Content-Length missing.
"""
s = requests.Session()
r = s.head(url, allow_redirects=True, timeout=HTTP_TIMEOUT_SEC)
r.raise_for_status()
url_parts = urlparse(r.url)
final_url = f'{url_parts.scheme}://{url_parts.netloc}{r.headers.get('X-Session-URL')}'
size = r.headers.get("Content-Length")
if size is None:
# Fallback to GET (no Range) to fetch headers only
gr = s.get(final_url, stream=True, allow_redirects=True, timeout=HTTP_TIMEOUT_SEC)
try:
gr.raise_for_status()
size = gr.headers.get("Content-Length")
finally:
gr.close()
total_size = int(size) if size is not None and size.isdigit() else 0
return s, final_url, total_size
def _cache_get(self, fh, offset, size):
key = (fh, offset, size)
entry = self.cache.get(key)
if entry:
data, ts = entry
if time.time() - ts < self.cache_ttl:
logging.info(f"Cache HIT for fh={fh} offset={offset} size={size}")
return data
else:
logging.info(f"Cache EXPIRED for fh={fh} offset={offset} size={size}")
del self.cache[key]
return None
def _cache_set(self, fh, offset, size, data):
key = (fh, offset, size)
if len(self.cache) >= self.cache_max_items:
self.cache.popitem(last=False) # remove oldest
self.cache[key] = (data, time.time())
logging.info(f"Cache SET for fh={fh} offset={offset} size={size}")
def _full_path(self, partial):
if partial.startswith("/"):
partial = partial[1:] # strip leading slash
return os.path.join(self.source_dir, partial)
def getattr(self, path, fh=None):
logging.info(f"GETATTR {path}")
full_path = self._full_path(path)
if os.path.isdir(full_path):
st = os.lstat(full_path)
return {
"st_mode": stat.S_IFDIR | 0o755,
"st_nlink": 2,
"st_size": 0,
"st_uid": 1000,
"st_gid": 1000,
"st_ctime": st.st_ctime,
"st_mtime": st.st_mtime,
"st_atime": st.st_atime,
}
if not os.path.exists(full_path):
raise FileNotFoundError
# For regular files, do HEAD request to get size of the redirected resource
st = os.lstat(full_path)
mode = stat.S_IFREG | 0o444 # read-only file
try:
with open(full_path, "r") as f:
original_url = f.read().strip()
session, final_url, size = self._head_follow(original_url)
self.cache_files[path] = {
"original_url": original_url,
"url": final_url,
"size": size,
"session": session,
}
except Exception as e:
logging.warning(f"Failed HEAD request for {path}: {e}")
size = st.st_size
return {
"st_mode": mode,
"st_nlink": 1,
"st_size": size,
"st_uid": 1000,
"st_gid": 1000,
"st_ctime": st.st_ctime,
"st_mtime": st.st_mtime,
"st_atime": st.st_atime,
}
def readdir(self, path, fh):
logging.info(f"READDIR {path}")
full_path = self._full_path(path)
dirents = [".", ".."] + os.listdir(full_path)
for r in dirents:
yield r
def open(self, path, flags):
full_path = self._full_path(path)
if not os.path.exists(full_path):
raise FileNotFoundError(f"File not found: {path}")
# Resolve unique redirected URL & size using HEAD
session = self.cache_files[path]["session"]
stream_url = self.cache_files[path]["url"]
total_size = self.cache_files[path]["size"]
original_url = self.cache_files[path]["original_url"]
# Allocate fh
with self._global_lock:
self.fd_counter += 1
fh = self.fd_counter
# Per-handle state
state = {
"session": session, # requests.Session
"original_url": original_url, # stable URL from source file
"stream_url": stream_url, # unique redirected URL (this session only)
"size": total_size, # may be 0 if unknown
"resp": None, # active requests.Response (stream)
"resp_lock": threading.Lock(),
# Sliding buffer state
"buffer": bytearray(),
"buf_start": 0, # absolute offset of first byte in buffer
"buf_end": 0, # absolute offset *after* last byte in buffer
"desired_offset": 0, # where downloader should be streaming from
"stop_event": threading.Event(),
"have_data": threading.Condition(), # notify readers when new bytes arrive
"downloader": None,
}
# Start downloader; initial position is offset 0 (will adapt on first read)
t = threading.Thread(target=self._downloader_loop, args=(fh,), name=f"dl-{fh}", daemon=True)
state["downloader"] = t
self.open_files[fh] = state
t.start()
logging.debug(f"OPEN {path} fh={fh} original={original_url} stream={stream_url} size={total_size}")
return fh
def read(self, path, size, offset, fh):
st = self.open_files.get(fh)
if not st:
logging.error(f"READ on unknown fh={fh}")
return b""
# If requested range is outside current window, ask downloader to restart at offset
with st["have_data"]:
if not (st["buf_start"] <= offset < st["buf_end"] or (offset == st["buf_end"] and size == 0)):
# Request a (re)positioning of upstream stream
st["desired_offset"] = offset
# Clear buffer because we'll restart at new offset
with st["resp_lock"]:
st["buffer"].clear()
st["buf_start"] = offset
st["buf_end"] = offset
st["have_data"].notify_all()
deadline = time.time() + READ_WAIT_TIMEOUT
# Wait until we have enough bytes or stream stopped
while True:
available = st["buf_end"] - offset
if available >= size or st["stop_event"].is_set():
break
# If upstream known size and offset beyond EOF, return empty
if st["size"] and offset >= st["size"]:
break
remaining = deadline - time.time()
if remaining <= 0:
logging.warning(f"READ timeout fh={fh} off={offset} size={size} avail={available}")
break
st["have_data"].wait(timeout=min(0.2, remaining))
# Serve available bytes (may be partial near EOF/timeout)
start = max(offset, st["buf_start"])
end = min(offset + size, st["buf_end"])
if end <= start:
return b""
rel_start = start - st["buf_start"]
rel_end = end - st["buf_start"]
out = bytes(st["buffer"][rel_start:rel_end])
logging.debug(f"READ fh={fh} off={offset} size={size} -> returned={len(out)} "
f"(buf [{st['buf_start']},{st['buf_end']}) len={len(st['buffer'])})")
return out
def release(self, path, fh):
st = self.open_files.pop(fh, None)
if not st:
return 0
logging.debug(f"RELEASE fh={fh} stream_url discarded")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
# Close response and session
with st["resp_lock"]:
try:
if st["resp"] is not None:
st["resp"].close()
except Exception:
pass
try:
st["session"].close()
except Exception:
pass
del self.cache_files[path]
# Let the downloader thread exit
return 0
# ---------- downloader logic ----------
def _open_stream(self, st, start_offset):
"""
(Re)open the upstream HTTP stream at a specific offset using Range.
"""
# Close any existing response
with st["resp_lock"]:
if st["resp"] is not None:
try:
st["resp"].close()
except Exception:
pass
st["resp"] = None
headers = {"Range": f"bytes={start_offset}-"}
logging.debug(f"HTTP OPEN stream from {start_offset} url={st['stream_url']}")
r = st["session"].get(
st["stream_url"],
headers=headers,
stream=True,
allow_redirects=False, # IMPORTANT: keep the resolved URL (no new redirect)
timeout=HTTP_TIMEOUT_SEC,
)
# Accept 206 (preferred). Some servers may return 200 (no ranges) — we can handle if start_offset==0
if r.status_code not in (200, 206):
try:
r.raise_for_status()
finally:
r.close()
# Parse/refresh size from Content-Range if present
cr = r.headers.get("Content-Range")
if cr and "/" in cr:
try:
total = int(cr.split("/")[-1])
st["size"] = total
except Exception:
pass
elif st["size"] == 0:
# Try Content-Length as a fallback (represents remaining bytes from start_offset)
try:
rem = int(r.headers.get("Content-Length", "0"))
if rem > 0:
st["size"] = start_offset + rem
except Exception:
pass
with st["resp_lock"]:
st["resp"] = r
def _downloader_loop(self, fh):
st = self.open_files.get(fh)
if not st:
return
current_offset = 0
# Start at desired_offset (initially 0; read() may change it)
with st["have_data"]:
current_offset = st["desired_offset"]
try:
self._open_stream(st, current_offset)
except Exception as e:
logging.error(f"Downloader failed to open initial stream fh={fh}: {e}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
return
while not st["stop_event"].is_set():
# Check for seek request (desired_offset changed outside current window)
with st["have_data"]:
desired = st["desired_offset"]
# If desired is not the next byte to fetch, we need to restart stream at desired
if desired != st["buf_end"]:
current_offset = desired
# Reset buffer window to new start
with st["resp_lock"]:
st["buffer"].clear()
st["buf_start"] = desired
st["buf_end"] = desired
try:
self._open_stream(st, current_offset)
except Exception as e:
logging.error(f"Downloader reopen failed fh={fh} off={desired}: {e}")
st["stop_event"].set()
st["have_data"].notify_all()
break
# Pull next chunk
try:
with st["resp_lock"]:
r = st["resp"]
if r is None:
# Stream closed unexpectedly; try to reopen at buf_end
current = st["buf_end"]
try:
self._open_stream(st, current)
with st["resp_lock"]:
r = st["resp"]
except Exception as e:
logging.error(f"Downloader reopen-after-null failed fh={fh}: {e}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
break
chunk = r.raw.read(READ_CHUNK_BYTES)
if not chunk:
# EOF
logging.debug(f"Downloader EOF fh={fh}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
break
# Append chunk; enforce sliding window
with st["have_data"]:
st["buffer"].extend(chunk)
st["buf_end"] += len(chunk)
if len(st["buffer"]) > BUFFER_MAX_BYTES:
# Evict from front
evict = len(st["buffer"]) - BUFFER_MAX_BYTES
del st["buffer"][:evict]
st["buf_start"] += evict
st["have_data"].notify_all()
if DOWNLOAD_SLEEP_SEC > 0:
time.sleep(DOWNLOAD_SLEEP_SEC)
except requests.exceptions.RequestException as e:
logging.warning(f"Downloader network error fh={fh}: {e}; retrying shortly")
time.sleep(0.5)
# Try to reopen at current buf_end
try:
self._open_stream(st, st["buf_end"])
except Exception as e2:
logging.error(f"Downloader reopen after error failed fh={fh}: {e2}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
break
except Exception as e:
logging.error(f"Downloader unexpected error fh={fh}: {e}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
break
def main(source_dir, mount_dir):
FUSE(VideoStreamFS(source_dir), mount_dir, nothreads=True, foreground=True, ro=True, allow_other=True)
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <source_dir> <mount_dir>")
sys.exit(1)
main(sys.argv[1], sys.argv[2])