Dispatcharr/docker/virtfs.py
2025-10-21 10:40:34 -04:00

429 lines
16 KiB
Python

#!/usr/bin/env python3
import os
import stat
import logging
import requests
from fuse import FUSE, Operations, LoggingMixIn
from collections import OrderedDict
import time
from urllib.parse import urlparse
import threading
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
# -------------------------
# Tunables for performance
# -------------------------
READ_CHUNK_BYTES = 1024 * 256 # 256 KiB chunks pulled from upstream per read iteration
BUFFER_MAX_BYTES = 1024 * 1024 * 8 # 8 MiB sliding window per open handle
DOWNLOAD_SLEEP_SEC = 0.0 # small pause per chunk to ease throttling (e.g., 0.01)
READ_WAIT_TIMEOUT = 15.0 # max seconds a read waits for needed bytes to arrive
HTTP_TIMEOUT_SEC = 20 # connect/read timeout for requests
USER_UID = 1000 # tweak to your environment if needed
USER_GID = 1000
class VideoStreamFS(LoggingMixIn, Operations):
def __init__(self, source_dir):
self.source_dir = source_dir
self.fd_counter = 0
self.open_files = {}
self._global_lock = threading.Lock()
self.cache = OrderedDict() # (fh, offset, size) -> (data, timestamp)
self.cache_max_items = 10
self.cache_ttl = 5 # seconds
self.cache_files = {}
logging.info(f"Initialized VideoStreamFS with source_dir={source_dir}")
# ---------- helpers ----------
def _full_path(self, partial):
if partial.startswith("/"):
partial = partial[1:]
return os.path.join(self.source_dir, partial)
def _head_follow(self, url):
"""
HEAD (preferred) to follow redirects and get final URL + size.
Fallback to GET(stream=True) if Content-Length missing.
"""
s = requests.Session()
r = s.head(url, allow_redirects=True, timeout=HTTP_TIMEOUT_SEC)
r.raise_for_status()
url_parts = urlparse(r.url)
final_url = f'{url_parts.scheme}://{url_parts.netloc}{r.headers.get('X-Session-URL')}'
size = r.headers.get("Content-Length")
if size is None:
# Fallback to GET (no Range) to fetch headers only
gr = s.get(final_url, stream=True, allow_redirects=True, timeout=HTTP_TIMEOUT_SEC)
try:
gr.raise_for_status()
size = gr.headers.get("Content-Length")
finally:
gr.close()
total_size = int(size) if size is not None and size.isdigit() else 0
return s, final_url, total_size
def _cache_get(self, fh, offset, size):
key = (fh, offset, size)
entry = self.cache.get(key)
if entry:
data, ts = entry
if time.time() - ts < self.cache_ttl:
logging.info(f"Cache HIT for fh={fh} offset={offset} size={size}")
return data
else:
logging.info(f"Cache EXPIRED for fh={fh} offset={offset} size={size}")
del self.cache[key]
return None
def _cache_set(self, fh, offset, size, data):
key = (fh, offset, size)
if len(self.cache) >= self.cache_max_items:
self.cache.popitem(last=False) # remove oldest
self.cache[key] = (data, time.time())
logging.info(f"Cache SET for fh={fh} offset={offset} size={size}")
def _full_path(self, partial):
if partial.startswith("/"):
partial = partial[1:] # strip leading slash
return os.path.join(self.source_dir, partial)
def getattr(self, path, fh=None):
logging.info(f"GETATTR {path}")
full_path = self._full_path(path)
if os.path.isdir(full_path):
st = os.lstat(full_path)
return {
"st_mode": stat.S_IFDIR | 0o755,
"st_nlink": 2,
"st_size": 0,
"st_uid": 1000,
"st_gid": 1000,
"st_ctime": st.st_ctime,
"st_mtime": st.st_mtime,
"st_atime": st.st_atime,
}
if not os.path.exists(full_path):
raise FileNotFoundError
# For regular files, do HEAD request to get size of the redirected resource
st = os.lstat(full_path)
mode = stat.S_IFREG | 0o444 # read-only file
try:
with open(full_path, "r") as f:
original_url = f.read().strip()
session, final_url, size = self._head_follow(original_url)
self.cache_files[path] = {
"original_url": original_url,
"url": final_url,
"size": size,
"session": session,
}
except Exception as e:
logging.warning(f"Failed HEAD request for {path}: {e}")
size = st.st_size
return {
"st_mode": mode,
"st_nlink": 1,
"st_size": size,
"st_uid": 1000,
"st_gid": 1000,
"st_ctime": st.st_ctime,
"st_mtime": st.st_mtime,
"st_atime": st.st_atime,
}
def readdir(self, path, fh):
logging.info(f"READDIR {path}")
full_path = self._full_path(path)
dirents = [".", ".."] + os.listdir(full_path)
for r in dirents:
yield r
def open(self, path, flags):
full_path = self._full_path(path)
if not os.path.exists(full_path):
raise FileNotFoundError(f"File not found: {path}")
# Resolve unique redirected URL & size using HEAD
session = self.cache_files[path]["session"]
stream_url = self.cache_files[path]["url"]
total_size = self.cache_files[path]["size"]
original_url = self.cache_files[path]["original_url"]
# Allocate fh
with self._global_lock:
self.fd_counter += 1
fh = self.fd_counter
# Per-handle state
state = {
"session": session, # requests.Session
"original_url": original_url, # stable URL from source file
"stream_url": stream_url, # unique redirected URL (this session only)
"size": total_size, # may be 0 if unknown
"resp": None, # active requests.Response (stream)
"resp_lock": threading.Lock(),
# Sliding buffer state
"buffer": bytearray(),
"buf_start": 0, # absolute offset of first byte in buffer
"buf_end": 0, # absolute offset *after* last byte in buffer
"desired_offset": 0, # where downloader should be streaming from
"stop_event": threading.Event(),
"have_data": threading.Condition(), # notify readers when new bytes arrive
"downloader": None,
}
# Start downloader; initial position is offset 0 (will adapt on first read)
t = threading.Thread(target=self._downloader_loop, args=(fh,), name=f"dl-{fh}", daemon=True)
state["downloader"] = t
self.open_files[fh] = state
t.start()
logging.debug(f"OPEN {path} fh={fh} original={original_url} stream={stream_url} size={total_size}")
return fh
def read(self, path, size, offset, fh):
st = self.open_files.get(fh)
if not st:
logging.error(f"READ on unknown fh={fh}")
return b""
# If requested range is outside current window, ask downloader to restart at offset
with st["have_data"]:
if not (st["buf_start"] <= offset < st["buf_end"] or (offset == st["buf_end"] and size == 0)):
# Request a (re)positioning of upstream stream
st["desired_offset"] = offset
# Clear buffer because we'll restart at new offset
with st["resp_lock"]:
st["buffer"].clear()
st["buf_start"] = offset
st["buf_end"] = offset
st["have_data"].notify_all()
deadline = time.time() + READ_WAIT_TIMEOUT
# Wait until we have enough bytes or stream stopped
while True:
available = st["buf_end"] - offset
if available >= size or st["stop_event"].is_set():
break
# If upstream known size and offset beyond EOF, return empty
if st["size"] and offset >= st["size"]:
break
remaining = deadline - time.time()
if remaining <= 0:
logging.warning(f"READ timeout fh={fh} off={offset} size={size} avail={available}")
break
st["have_data"].wait(timeout=min(0.2, remaining))
# Serve available bytes (may be partial near EOF/timeout)
start = max(offset, st["buf_start"])
end = min(offset + size, st["buf_end"])
if end <= start:
return b""
rel_start = start - st["buf_start"]
rel_end = end - st["buf_start"]
out = bytes(st["buffer"][rel_start:rel_end])
logging.debug(f"READ fh={fh} off={offset} size={size} -> returned={len(out)} "
f"(buf [{st['buf_start']},{st['buf_end']}) len={len(st['buffer'])})")
return out
def release(self, path, fh):
st = self.open_files.pop(fh, None)
if not st:
return 0
logging.debug(f"RELEASE fh={fh} stream_url discarded")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
# Close response and session
with st["resp_lock"]:
try:
if st["resp"] is not None:
st["resp"].close()
except Exception:
pass
try:
st["session"].close()
except Exception:
pass
del self.cache_files[path]
# Let the downloader thread exit
return 0
# ---------- downloader logic ----------
def _open_stream(self, st, start_offset):
"""
(Re)open the upstream HTTP stream at a specific offset using Range.
"""
# Close any existing response
with st["resp_lock"]:
if st["resp"] is not None:
try:
st["resp"].close()
except Exception:
pass
st["resp"] = None
headers = {"Range": f"bytes={start_offset}-"}
logging.debug(f"HTTP OPEN stream from {start_offset} url={st['stream_url']}")
r = st["session"].get(
st["stream_url"],
headers=headers,
stream=True,
allow_redirects=False, # IMPORTANT: keep the resolved URL (no new redirect)
timeout=HTTP_TIMEOUT_SEC,
)
# Accept 206 (preferred). Some servers may return 200 (no ranges) — we can handle if start_offset==0
if r.status_code not in (200, 206):
try:
r.raise_for_status()
finally:
r.close()
# Parse/refresh size from Content-Range if present
cr = r.headers.get("Content-Range")
if cr and "/" in cr:
try:
total = int(cr.split("/")[-1])
st["size"] = total
except Exception:
pass
elif st["size"] == 0:
# Try Content-Length as a fallback (represents remaining bytes from start_offset)
try:
rem = int(r.headers.get("Content-Length", "0"))
if rem > 0:
st["size"] = start_offset + rem
except Exception:
pass
with st["resp_lock"]:
st["resp"] = r
def _downloader_loop(self, fh):
st = self.open_files.get(fh)
if not st:
return
current_offset = 0
# Start at desired_offset (initially 0; read() may change it)
with st["have_data"]:
current_offset = st["desired_offset"]
try:
self._open_stream(st, current_offset)
except Exception as e:
logging.error(f"Downloader failed to open initial stream fh={fh}: {e}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
return
while not st["stop_event"].is_set():
# Check for seek request (desired_offset changed outside current window)
with st["have_data"]:
desired = st["desired_offset"]
# If desired is not the next byte to fetch, we need to restart stream at desired
if desired != st["buf_end"]:
current_offset = desired
# Reset buffer window to new start
with st["resp_lock"]:
st["buffer"].clear()
st["buf_start"] = desired
st["buf_end"] = desired
try:
self._open_stream(st, current_offset)
except Exception as e:
logging.error(f"Downloader reopen failed fh={fh} off={desired}: {e}")
st["stop_event"].set()
st["have_data"].notify_all()
break
# Pull next chunk
try:
with st["resp_lock"]:
r = st["resp"]
if r is None:
# Stream closed unexpectedly; try to reopen at buf_end
current = st["buf_end"]
try:
self._open_stream(st, current)
with st["resp_lock"]:
r = st["resp"]
except Exception as e:
logging.error(f"Downloader reopen-after-null failed fh={fh}: {e}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
break
chunk = r.raw.read(READ_CHUNK_BYTES)
if not chunk:
# EOF
logging.debug(f"Downloader EOF fh={fh}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
break
# Append chunk; enforce sliding window
with st["have_data"]:
st["buffer"].extend(chunk)
st["buf_end"] += len(chunk)
if len(st["buffer"]) > BUFFER_MAX_BYTES:
# Evict from front
evict = len(st["buffer"]) - BUFFER_MAX_BYTES
del st["buffer"][:evict]
st["buf_start"] += evict
st["have_data"].notify_all()
if DOWNLOAD_SLEEP_SEC > 0:
time.sleep(DOWNLOAD_SLEEP_SEC)
except requests.exceptions.RequestException as e:
logging.warning(f"Downloader network error fh={fh}: {e}; retrying shortly")
time.sleep(0.5)
# Try to reopen at current buf_end
try:
self._open_stream(st, st["buf_end"])
except Exception as e2:
logging.error(f"Downloader reopen after error failed fh={fh}: {e2}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
break
except Exception as e:
logging.error(f"Downloader unexpected error fh={fh}: {e}")
st["stop_event"].set()
with st["have_data"]:
st["have_data"].notify_all()
break
def main(source_dir, mount_dir):
FUSE(VideoStreamFS(source_dir), mount_dir, nothreads=True, foreground=True, ro=True, allow_other=True)
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <source_dir> <mount_dir>")
sys.exit(1)
main(sys.argv[1], sys.argv[2])