This commit is contained in:
aloha_world_ 2026-01-20 19:14:57 +05:30 committed by GitHub
commit 69c1f7c468
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 1070 additions and 4 deletions

View file

@ -10,13 +10,13 @@ WORKDIR $HOME
######### Customize Container Here ###########
# Install unzip and recording dependencies
RUN apt-get update && apt-get install -y unzip ffmpeg python3 && rm -rf /var/lib/apt/lists/*
# Install Chromium
COPY ./src/ubuntu/install/chromium $INST_SCRIPTS/chromium/
RUN bash $INST_SCRIPTS/chromium/install_chromium.sh && rm -rf $INST_SCRIPTS/chromium/
# Install unzip
RUN apt-get update && apt-get install -y unzip
# Update the desktop environment to be optimized for a single application
RUN cp $HOME/.config/xfce4/xfconf/single-application-xfce-perchannel-xml/* $HOME/.config/xfce4/xfconf/xfce-perchannel-xml/
RUN cp /usr/share/backgrounds/bg_kasm.png /usr/share/backgrounds/bg_default.png
@ -32,6 +32,8 @@ COPY ./src/common/chrome-managed-policies/urlblocklist.json /etc/chromium/polici
COPY ./src/ubuntu/install/chromium/custom_startup.sh $STARTUPDIR/custom_startup.sh
RUN chmod +x $STARTUPDIR/custom_startup.sh
COPY ./src/ubuntu/install/chromium/recording_api.py /usr/local/bin/recording_api.py
RUN chmod +x /usr/local/bin/recording_api.py
# Install Custom Certificate Authority
# COPY ./src/ubuntu/install/certificates $INST_SCRIPTS/certificates/
@ -53,4 +55,7 @@ ENV HOME /home/kasm-user
WORKDIR $HOME
RUN mkdir -p $HOME && chown -R 1000:0 $HOME
EXPOSE 9222
EXPOSE 18080
USER 1000

View file

@ -0,0 +1,304 @@
# 录屏API测试命令
默认服务器地址: `http://localhost:18080`
如果服务器运行在其他地址,请替换 `localhost:18080` 为实际地址。
**重要:** 每个录屏会话都有一个唯一的UUID标识符。开始录屏时会返回UUID停止录屏和获取文件时需要使用这个UUID。
---
## 1. 检查录屏状态
```bash
curl -X GET http://localhost:18080/record/status
```
**格式化输出(需要 jq:**
```bash
curl -s -X GET http://localhost:18080/record/status | jq '.'
```
**预期响应:**
```json
{
"recording": false,
"current_uuid": null,
"current_file": null,
"last_file": null,
"display": ":1",
"recording_dir": "/home/kasm-user/recordings"
}
```
**响应字段说明:**
- `recording`: 是否正在录屏
- `current_uuid`: 当前录屏会话的UUID如果正在录屏
- `current_file`: 当前录制的文件路径
- `last_file`: 最后一个完成的录屏文件路径
---
## 2. 开始录屏(使用默认参数)
```bash
curl -X POST http://localhost:18080/record/start \
-H "Content-Type: application/json" \
-d '{}'
```
**格式化输出:**
```bash
curl -s -X POST http://localhost:18080/record/start \
-H "Content-Type: application/json" \
-d '{}' | jq '.'
```
**预期响应:**
```json
{
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"file": "/home/kasm-user/recordings/recording-550e8400-20240101T120000Z.mp4",
"video_size": "1920x1080",
"framerate": 25,
"display": ":1",
"recording_dir": "/home/kasm-user/recordings"
}
```
**注意:** 响应中包含 `uuid` 字段这是本次录屏的唯一标识符。后续停止录屏和获取文件时需要使用这个UUID。
---
## 3. 开始录屏(自定义参数)
```bash
curl -X POST http://localhost:18080/record/start \
-H "Content-Type: application/json" \
-d '{
"video_size": "1920x1080",
"framerate": 30,
"filename": "my-recording.mp4"
}'
```
**格式化输出:**
```bash
curl -s -X POST http://localhost:18080/record/start \
-H "Content-Type: application/json" \
-d '{
"video_size": "1920x1080",
"framerate": 30,
"filename": "my-recording.mp4"
}' | jq '.'
```
**参数说明:**
- `video_size`: 视频分辨率,格式为 "WIDTHxHEIGHT"(可选)
- `framerate`: 帧率,整数(可选)
- `filename`: 自定义文件名(可选)
---
## 4. 停止录屏
**使用UUID停止录屏推荐:**
```bash
curl -X POST http://localhost:18080/record/stop \
-H "Content-Type: application/json" \
-d '{"uuid": "550e8400-e29b-41d4-a716-446655440000"}'
```
**不使用UUID停止当前录制的会话:**
```bash
curl -X POST http://localhost:18080/record/stop \
-H "Content-Type: application/json" \
-d '{}'
```
**格式化输出:**
```bash
curl -s -X POST http://localhost:18080/record/stop \
-H "Content-Type: application/json" \
-d '{"uuid": "550e8400-e29b-41d4-a716-446655440000"}' | jq '.'
```
**预期响应:**
```json
{
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"file": "/home/kasm-user/recordings/recording-550e8400-20240101T120000Z.mp4",
"size": 1234567,
"exists": true
}
```
**响应字段说明:**
- `uuid`: 录屏会话的UUID
- `file`: 录屏文件的完整路径
- `size`: 文件大小(字节)
- `exists`: 文件是否存在
---
## 5. 通过UUID下载录屏文件推荐
```bash
curl -X GET "http://localhost:18080/record/file?uuid=550e8400-e29b-41d4-a716-446655440000" -o recording.mp4
```
**带进度条:**
```bash
curl -X GET "http://localhost:18080/record/file?uuid=550e8400-e29b-41d4-a716-446655440000" -o recording.mp4 --progress-bar
```
---
## 6. 下载录屏文件最后一个录制的文件不使用UUID
```bash
curl -X GET http://localhost:18080/record/file -o recording.mp4
```
---
## 7. 下载指定文件名的录屏文件
```bash
curl -X GET "http://localhost:18080/record/file?name=my-recording.mp4" -o my-recording.mp4
```
**或者使用绝对路径:**
```bash
curl -X GET "http://localhost:18080/record/file?name=/home/kasm-user/recordings/my-recording.mp4" -o my-recording.mp4
```
**注意:** 参数优先级:`uuid` > `name` > 最后一个录制的文件
---
## 完整测试流程示例使用UUID
```bash
# 1. 检查状态
curl -s -X GET http://localhost:18080/record/status | jq '.'
# 2. 开始录屏并获取UUID
START_RESPONSE=$(curl -s -X POST http://localhost:18080/record/start \
-H "Content-Type: application/json" \
-d '{"video_size": "1920x1080", "framerate": 30}')
echo "$START_RESPONSE" | jq '.'
# 3. 提取UUID
RECORDING_UUID=$(echo "$START_RESPONSE" | jq -r '.uuid')
echo "录屏UUID: $RECORDING_UUID"
# 4. 等待几秒(录屏中)
sleep 5
# 5. 检查状态(应该显示正在录屏)
curl -s -X GET http://localhost:18080/record/status | jq '.'
# 6. 使用UUID停止录屏
STOP_RESPONSE=$(curl -s -X POST http://localhost:18080/record/stop \
-H "Content-Type: application/json" \
-d "{\"uuid\": \"$RECORDING_UUID\"}")
echo "$STOP_RESPONSE" | jq '.'
# 7. 通过UUID下载文件
curl -X GET "http://localhost:18080/record/file?uuid=$RECORDING_UUID" -o "recording-${RECORDING_UUID}.mp4"
# 8. 验证文件
ls -lh "recording-${RECORDING_UUID}.mp4"
```
---
## 错误处理示例
### 尝试在录屏已运行时再次开始录屏
```bash
curl -s -X POST http://localhost:18080/record/start \
-H "Content-Type: application/json" \
-d '{}' | jq '.'
# 预期: {"error": "Recording already in progress"}
```
### 尝试在没有录屏时停止录屏
```bash
curl -s -X POST http://localhost:18080/record/stop \
-H "Content-Type: application/json" | jq '.'
# 预期: {"error": "No active recording"}
```
### 尝试下载不存在的文件
```bash
curl -s -X GET "http://localhost:18080/record/file?name=non-existent.mp4"
# 预期: {"error": "Recording not found"}
```
---
## 使用 Python requests 库测试
```python
import requests
import time
BASE_URL = "http://localhost:18080"
# 1. 检查状态
response = requests.get(f"{BASE_URL}/record/status")
print("状态:", response.json())
# 2. 开始录屏并获取UUID
response = requests.post(
f"{BASE_URL}/record/start",
json={"video_size": "1920x1080", "framerate": 30}
)
start_result = response.json()
print("开始录屏:", start_result)
# 3. 提取UUID
recording_uuid = start_result.get("uuid")
print(f"录屏UUID: {recording_uuid}")
# 4. 等待5秒
time.sleep(5)
# 5. 使用UUID停止录屏
response = requests.post(
f"{BASE_URL}/record/stop",
json={"uuid": recording_uuid}
)
stop_result = response.json()
print("停止录屏:", stop_result)
# 6. 通过UUID下载文件
if stop_result.get("exists"):
file_response = requests.get(f"{BASE_URL}/record/file?uuid={recording_uuid}")
filename = f"recording-{recording_uuid}.mp4"
with open(filename, "wb") as f:
f.write(file_response.content)
print(f"文件已下载: {filename}")
```
---
## 使用 HTTPie 测试(更简洁)
```bash
# 安装: pip install httpie
# 检查状态
http GET localhost:18080/record/status
# 开始录屏
http POST localhost:18080/record/start video_size=1920x1080 framerate=30
# 停止录屏
http POST localhost:18080/record/stop
# 下载文件
http GET localhost:18080/record/file --download
```

View file

@ -10,6 +10,22 @@ if [[ $MAXIMIZE == 'true' ]] ; then
fi
ARGS=${APP_ARGS:-$DEFAULT_ARGS}
RECORDING_API_PORT=${RECORDING_API_PORT:-18080}
RECORDINGS_DIR=${RECORDINGS_DIR:-$HOME/recordings}
RECORDING_DISPLAY=${RECORDING_DISPLAY:-${DISPLAY:-:1}}
# Ensure a writable directory for recordings.
mkdir -p "$RECORDINGS_DIR"
# Start the lightweight recording API if it is not already running.
if ! pgrep -f "recording_api.py" > /dev/null ; then
nohup python3 /usr/local/bin/recording_api.py \
--port "$RECORDING_API_PORT" \
--recording-dir "$RECORDINGS_DIR" \
--display "$RECORDING_DISPLAY" \
> /tmp/recording_api.log 2>&1 &
fi
options=$(getopt -o gau: -l go,assign,url: -n "$0" -- "$@") || exit
eval set -- "$options"

View file

@ -1,7 +1,7 @@
#!/usr/bin/env bash
set -ex
CHROME_ARGS="--password-store=basic --no-sandbox --ignore-gpu-blocklist --user-data-dir --no-first-run --simulate-outdated-no-au='Tue, 31 Dec 2099 23:59:59 GMT'"
CHROME_ARGS="--remote-debugging-port=9222 --remote-debugging-address=0.0.0.0 --password-store=basic --no-sandbox --ignore-gpu-blocklist --user-data-dir --no-first-run --simulate-outdated-no-au='Tue, 31 Dec 2099 23:59:59 GMT'"
ARCH=$(arch | sed 's/aarch64/arm64/g' | sed 's/x86_64/amd64/g')
if [[ "${DISTRO}" == @(debian|opensuse|ubuntu) ]] && [ ${ARCH} = 'amd64' ] && [ ! -z ${SKIP_CLEAN+x} ]; then

View file

@ -0,0 +1,575 @@
#!/usr/bin/env python3
"""
Lightweight recording control API for the Chromium image.
Endpoints:
- POST /record/start -> start a new recording (returns UUID)
- POST /record/stop -> stop a recording (requires UUID in body)
- GET /record/file -> download a recording (use ?uuid=... or ?name=...)
- GET /record/status -> current recorder status
Each recording session is identified by a unique UUID.
"""
import argparse
import json
import os
import platform
import shutil
import signal
import subprocess
import sys
import time
import uuid
from datetime import datetime
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import parse_qs, urlparse
# Detect operating system
IS_MACOS = platform.system() == "Darwin"
IS_LINUX = platform.system() == "Linux"
def _safe_json(handler, status, payload):
body = json.dumps(payload or {}).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
def _fail(handler, status, message):
_safe_json(handler, status, {"error": message})
class Recorder:
"""Simple ffmpeg-based screen recorder. Supports Linux (x11grab) and macOS (avfoundation)."""
def __init__(self, display, recording_dir, default_size, default_fps, capture_input=None):
self.display = display # X11 display for Linux
self.capture_input = capture_input # avfoundation input for macOS (e.g., "1:none")
self.recording_dir = recording_dir
self.default_size = default_size
self.default_fps = default_fps
self.process = None
self.current_file = None
self.last_file = None
# UUID-based recording management
# Format: {uuid: {"process": process, "file": file_path, "video_size": size, "framerate": fps}}
self.recordings = {}
self.current_uuid = None
# Platform info
self.platform = "macos" if IS_MACOS else "linux"
def is_running(self, recording_uuid=None):
if recording_uuid:
if recording_uuid not in self.recordings:
return False
rec = self.recordings[recording_uuid]
process = rec.get("process")
return process is not None and process.poll() is None
# Backward compatibility: check current recording
return self.process is not None and self.process.poll() is None
def start(self, video_size=None, framerate=None, filename=None):
if self.is_running():
raise RuntimeError("Recording already in progress")
if not shutil.which("ffmpeg"):
raise RuntimeError("ffmpeg is not available in PATH")
os.makedirs(self.recording_dir, exist_ok=True)
# Clean up old recording files to avoid disk bloat
self._cleanup_old_recordings()
if not video_size:
video_size = self._detect_resolution() or self.default_size
if "x" not in video_size:
raise RuntimeError("video_size must be formatted as WIDTHxHEIGHT")
framerate = int(framerate or self.default_fps)
# Generate unique UUID for this recording
recording_uuid = str(uuid.uuid4())
ts = datetime.utcnow().strftime("%Y%m%dT%H%M%SZ")
# Include UUID in filename if not provided
if filename:
name = filename
else:
name = f"recording-{recording_uuid[:8]}-{ts}.mp4"
target = os.path.join(self.recording_dir, name)
# Build ffmpeg command based on platform
cmd = self._build_ffmpeg_cmd(video_size, framerate, target)
# Start the ffmpeg process detached from stdin to avoid blocking.
process = subprocess.Popen(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
# Store recording info with UUID
self.recordings[recording_uuid] = {
"process": process,
"file": target,
"video_size": video_size,
"framerate": framerate,
}
# Update current recording (for backward compatibility)
self.process = process
self.current_file = target
self.current_uuid = recording_uuid
return recording_uuid, target, video_size, framerate
def _build_ffmpeg_cmd(self, video_size, framerate, target):
"""Build ffmpeg command based on platform."""
if IS_MACOS:
# macOS: use avfoundation
# capture_input format: "video_device:audio_device" e.g., "1:none" or "0:none"
capture_input = self.capture_input or "1:none"
return [
"ffmpeg",
"-y",
"-f", "avfoundation",
"-framerate", str(framerate),
"-capture_cursor", "1", # Capture mouse cursor
"-i", capture_input,
"-vf", f"scale={video_size.replace('x', ':')}", # Scale to desired size
"-codec:v", "libx264",
"-preset", "ultrafast",
"-pix_fmt", "yuv420p",
target,
]
else:
# Linux: use x11grab
return [
"ffmpeg",
"-y",
"-video_size", video_size,
"-framerate", str(framerate),
"-f", "x11grab",
"-i", self.display,
"-codec:v", "libx264",
"-preset", "ultrafast",
"-pix_fmt", "yuv420p",
target,
]
def stop(self, recording_uuid=None):
# If UUID provided, use it; otherwise use current recording
if recording_uuid:
if recording_uuid not in self.recordings:
raise RuntimeError(f"Recording with UUID {recording_uuid} not found")
rec = self.recordings[recording_uuid]
process = rec["process"]
finished_file = rec["file"]
if process is None or process.poll() is not None:
raise RuntimeError(f"Recording with UUID {recording_uuid} is not running")
else:
# Backward compatibility: use current recording
if not self.is_running():
raise RuntimeError("No active recording")
process = self.process
finished_file = self.current_file
recording_uuid = self.current_uuid
# Send SIGINT to gracefully stop ffmpeg
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=10) # Increased timeout for file finalization
except subprocess.TimeoutExpired:
# If graceful shutdown fails, force kill
process.kill()
process.wait(timeout=5)
# Wait for file to be finalized and verify it exists
max_wait = 3 # Maximum seconds to wait for file
wait_interval = 0.1 # Check every 100ms
waited = 0
while waited < max_wait:
if finished_file and os.path.exists(finished_file):
# Check if file is readable and has content
try:
size = os.path.getsize(finished_file)
if size > 0:
# File exists and has content
break
except OSError:
pass
time.sleep(wait_interval)
waited += wait_interval
# Verify the file exists and is readable
if not finished_file or not os.path.exists(finished_file):
raise RuntimeError(f"Recording file not found: {finished_file}")
try:
file_size = os.path.getsize(finished_file)
if file_size == 0:
raise RuntimeError(f"Recording file is empty: {finished_file}")
except OSError as e:
raise RuntimeError(f"Cannot access recording file: {e}")
# Update recording info
if recording_uuid and recording_uuid in self.recordings:
self.recordings[recording_uuid]["process"] = None
self.last_file = finished_file
# Update current recording state (for backward compatibility)
if recording_uuid == self.current_uuid:
self.last_file = finished_file
self.process = None
self.current_file = None
self.current_uuid = None
# Ensure we always return a UUID (even if None for backward compatibility)
return recording_uuid, finished_file
def get_file_by_uuid(self, recording_uuid):
"""Get recording file path by UUID."""
if recording_uuid not in self.recordings:
return None
rec = self.recordings[recording_uuid]
file_path = rec.get("file")
if file_path and os.path.exists(file_path):
return file_path
return None
def _cleanup_old_recordings(self):
"""Clean up old recording files to avoid disk bloat.
Removes files from completed recordings (where process is None or exited).
Keeps the last_file for potential download.
"""
uuids_to_remove = []
for rec_uuid, rec_info in self.recordings.items():
process = rec_info.get("process")
file_path = rec_info.get("file")
# Skip if recording is still running
if process is not None and process.poll() is None:
continue
# Skip the last finished file (user might want to download it)
if file_path == self.last_file:
continue
# Delete the file if it exists
if file_path and os.path.exists(file_path):
try:
os.remove(file_path)
except OSError:
pass # Ignore errors during cleanup
uuids_to_remove.append(rec_uuid)
# Remove cleaned up recordings from the dictionary
for rec_uuid in uuids_to_remove:
del self.recordings[rec_uuid]
def _detect_resolution(self):
"""Best-effort detection of the current display resolution."""
if IS_MACOS:
return self._detect_resolution_macos()
else:
return self._detect_resolution_linux()
def _detect_resolution_macos(self):
"""Detect screen resolution on macOS using system_profiler."""
try:
# Use system_profiler to get display info
probe = subprocess.check_output(
["system_profiler", "SPDisplaysDataType"],
stderr=subprocess.DEVNULL
).decode()
for line in probe.splitlines():
# Look for resolution line like "Resolution: 2560 x 1440"
if "Resolution:" in line and "x" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
res_part = parts[1].strip()
# Parse "2560 x 1440 (QHD/WQHD...)" or "2560 x 1440"
res_match = res_part.split()
if len(res_match) >= 3 and res_match[1].lower() == "x":
width = res_match[0].strip()
height = res_match[2].strip().split()[0] # Remove any trailing text
# Remove non-digit characters
width = ''.join(filter(str.isdigit, width))
height = ''.join(filter(str.isdigit, height))
if width and height:
return f"{width}x{height}"
except Exception:
pass
# Fallback: try using screenresolution if available
try:
if shutil.which("screenresolution"):
probe = subprocess.check_output(
["screenresolution", "get"],
stderr=subprocess.DEVNULL
).decode()
# Output format: "Display 0: 2560x1440x32@60Hz"
for line in probe.splitlines():
if "x" in line:
parts = line.split()
for part in parts:
if "x" in part and part[0].isdigit():
# Extract WIDTHxHEIGHT from "2560x1440x32@60Hz"
dims = part.split("x")
if len(dims) >= 2:
return f"{dims[0]}x{dims[1]}"
except Exception:
pass
return None
def _detect_resolution_linux(self):
"""Detect screen resolution on Linux using xdpyinfo."""
try:
# Preserve existing environment and set/override DISPLAY
env = os.environ.copy()
env["DISPLAY"] = self.display
probe = subprocess.check_output(
["xdpyinfo"], env=env, stderr=subprocess.DEVNULL
).decode()
for line in probe.splitlines():
if "dimensions:" in line:
parts = line.strip().split()
# dimensions: 1920x1080 pixels ...
if len(parts) >= 2:
return parts[1]
except Exception:
return None
return None
def build_handler(recorder):
class Handler(BaseHTTPRequestHandler):
server_version = "RecordingAPI/1.0"
protocol_version = "HTTP/1.1"
def log_message(self, fmt, *args):
# Keep stdout clean for other services.
return
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == "/record/status":
self._handle_status()
elif parsed.path == "/record/file":
self._handle_file(parsed)
else:
_fail(self, HTTPStatus.NOT_FOUND, "Endpoint not found")
def do_POST(self):
parsed = urlparse(self.path)
if parsed.path == "/record/start":
self._handle_start()
elif parsed.path == "/record/stop":
self._handle_stop()
else:
_fail(self, HTTPStatus.NOT_FOUND, "Endpoint not found")
def _read_json(self):
length = int(self.headers.get("Content-Length", 0))
if not length:
return {}
try:
return json.loads(self.rfile.read(length))
except Exception:
return {}
def _handle_start(self):
payload = self._read_json()
try:
recording_uuid, target, video_size, fps = recorder.start(
video_size=payload.get("video_size"),
framerate=payload.get("framerate"),
filename=payload.get("filename"),
)
except Exception as exc:
_fail(self, HTTPStatus.CONFLICT, str(exc))
return
response = {
"uuid": recording_uuid,
"file": target,
"video_size": video_size,
"framerate": fps,
"recording_dir": recorder.recording_dir,
"platform": recorder.platform,
}
# Add platform-specific capture info
if IS_MACOS:
response["capture_input"] = recorder.capture_input
else:
response["display"] = recorder.display
_safe_json(self, HTTPStatus.OK, response)
def _handle_stop(self):
payload = self._read_json()
recording_uuid = payload.get("uuid") if payload else None
try:
uuid_result, finished = recorder.stop(recording_uuid=recording_uuid)
except Exception as exc:
_fail(self, HTTPStatus.CONFLICT, str(exc))
return
# Get file information
file_info = {
"uuid": uuid_result,
"file": finished
}
try:
if os.path.exists(finished):
file_info["size"] = os.path.getsize(finished)
file_info["exists"] = True
else:
file_info["exists"] = False
except Exception:
file_info["exists"] = False
_safe_json(self, HTTPStatus.OK, file_info)
def _handle_status(self):
status = {
"recording": recorder.is_running(),
"current_uuid": recorder.current_uuid,
"current_file": recorder.current_file,
"last_file": recorder.last_file,
"recording_dir": recorder.recording_dir,
"platform": recorder.platform,
}
# Add platform-specific capture info
if IS_MACOS:
status["capture_input"] = recorder.capture_input
else:
status["display"] = recorder.display
_safe_json(self, HTTPStatus.OK, status)
def _handle_file(self, parsed):
params = parse_qs(parsed.query)
recording_uuid = params.get("uuid", [None])[0]
name = params.get("name", [None])[0]
target = None
# Priority: uuid > name > last_file
if recording_uuid:
target = recorder.get_file_by_uuid(recording_uuid)
elif name:
target = (
name
if os.path.isabs(name)
else os.path.join(recorder.recording_dir, name)
)
elif recorder.last_file:
target = recorder.last_file
if not target or not os.path.exists(target):
_fail(self, HTTPStatus.NOT_FOUND, "Recording not found")
return
try:
size = os.path.getsize(target)
self.send_response(HTTPStatus.OK)
self.send_header("Content-Type", "video/mp4")
self.send_header("Content-Length", str(size))
self.end_headers()
with open(target, "rb") as f:
shutil.copyfileobj(f, self.wfile)
except Exception as exc:
_fail(self, HTTPStatus.INTERNAL_SERVER_ERROR, str(exc))
return Handler
def parse_args():
parser = argparse.ArgumentParser(description="Simple recording control API server")
parser.add_argument("--host", default="0.0.0.0", help="Listening interface")
parser.add_argument("--port", type=int, default=18080, help="Listening port")
# Determine default recording directory based on platform
default_recording_dir = (
os.path.expanduser("~/recordings") if IS_MACOS
else os.environ.get("RECORDINGS_DIR", "/home/kasm-user/recordings")
)
parser.add_argument(
"--recording-dir",
default=os.environ.get("RECORDINGS_DIR", default_recording_dir),
help="Directory to store recordings",
)
parser.add_argument(
"--display",
default=os.environ.get("DISPLAY", ":1"),
help="X11 display to record (Linux only, e.g. :1)",
)
parser.add_argument(
"--capture-input",
default=os.environ.get("CAPTURE_INPUT", "2:none"),
help="macOS avfoundation capture input (e.g. '2:none' for screen without audio). Use 'ffmpeg -f avfoundation -list_devices true -i \"\"' to list devices.",
)
parser.add_argument(
"--video-size",
default=os.environ.get("RECORDING_SIZE", "1920x1080"),
help="Default video size when none provided (WIDTHxHEIGHT)",
)
parser.add_argument(
"--framerate",
type=int,
default=int(os.environ.get("RECORDING_FPS", 25)),
help="Default framerate when none provided",
)
return parser.parse_args()
def main():
args = parse_args()
recorder = Recorder(
display=args.display,
recording_dir=args.recording_dir,
default_size=args.video_size,
default_fps=args.framerate,
capture_input=args.capture_input,
)
server = ThreadingHTTPServer((args.host, args.port), build_handler(recorder))
# Platform-specific info in startup message
if IS_MACOS:
capture_info = f"capture_input={recorder.capture_input}"
else:
capture_info = f"display={recorder.display}"
print(
f"[recording_api] listening on {args.host}:{args.port}, "
f"platform={recorder.platform}, {capture_info}, dir={recorder.recording_dir}",
flush=True,
)
try:
server.serve_forever()
except KeyboardInterrupt:
pass
finally:
server.server_close()
if recorder.is_running():
try:
recorder.stop()
except Exception:
pass
if __name__ == "__main__":
sys.exit(main())

View file

@ -0,0 +1,166 @@
#!/bin/bash
# 录屏API测试脚本
# 默认服务器地址和端口
HOST="${HOST:-localhost}"
PORT="${PORT:-18080}"
BASE_URL="http://${HOST}:${PORT}"
echo "=========================================="
echo "录屏API测试脚本"
echo "服务器地址: ${BASE_URL}"
echo "=========================================="
echo ""
# 颜色输出
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
RED='\033[0;31m'
NC='\033[0m' # No Color
# 1. 检查API状态
echo -e "${YELLOW}[1] 检查录屏状态${NC}"
echo "请求: GET ${BASE_URL}/record/status"
echo ""
curl -s -X GET "${BASE_URL}/record/status" | jq '.' || curl -s -X GET "${BASE_URL}/record/status"
echo ""
echo "----------------------------------------"
echo ""
# 2. 开始录屏(使用默认参数)
echo -e "${YELLOW}[2] 开始录屏(使用默认参数)${NC}"
echo "请求: POST ${BASE_URL}/record/start"
echo "请求体: {}"
echo ""
RESPONSE=$(curl -s -X POST "${BASE_URL}/record/start" \
-H "Content-Type: application/json" \
-d '{}')
echo "$RESPONSE" | jq '.' || echo "$RESPONSE"
# 提取UUID
RECORDING_UUID=$(echo "$RESPONSE" | jq -r '.uuid' 2>/dev/null || echo "")
if [ -n "$RECORDING_UUID" ] && [ "$RECORDING_UUID" != "null" ]; then
echo -e "${GREEN}录屏UUID: ${RECORDING_UUID}${NC}"
fi
echo ""
echo "----------------------------------------"
echo ""
# 3. 开始录屏(自定义参数)
echo -e "${YELLOW}[3] 开始录屏(自定义参数)${NC}"
echo "请求: POST ${BASE_URL}/record/start"
echo "请求体: {\"video_size\": \"1920x1080\", \"framerate\": 30, \"filename\": \"test-recording.mp4\"}"
echo ""
RESPONSE=$(curl -s -X POST "${BASE_URL}/record/start" \
-H "Content-Type: application/json" \
-d '{
"video_size": "1920x1080",
"framerate": 30,
"filename": "test-recording.mp4"
}')
echo "$RESPONSE" | jq '.' || echo "$RESPONSE"
echo ""
echo "----------------------------------------"
echo ""
# 4. 再次检查状态(应该显示正在录屏)
echo -e "${YELLOW}[4] 检查录屏状态(应该显示正在录屏)${NC}"
echo "请求: GET ${BASE_URL}/record/status"
echo ""
curl -s -X GET "${BASE_URL}/record/status" | jq '.' || curl -s -X GET "${BASE_URL}/record/status"
echo ""
echo "----------------------------------------"
echo ""
# 5. 等待几秒(模拟录屏过程)
echo -e "${YELLOW}[5] 等待5秒模拟录屏过程${NC}"
sleep 5
echo "等待完成"
echo ""
echo "----------------------------------------"
echo ""
# 6. 停止录屏使用UUID
echo -e "${YELLOW}[6] 停止录屏使用UUID${NC}"
if [ -n "$RECORDING_UUID" ] && [ "$RECORDING_UUID" != "null" ]; then
echo "请求: POST ${BASE_URL}/record/stop"
echo "请求体: {\"uuid\": \"${RECORDING_UUID}\"}"
echo ""
RESPONSE=$(curl -s -X POST "${BASE_URL}/record/stop" \
-H "Content-Type: application/json" \
-d "{\"uuid\": \"${RECORDING_UUID}\"}")
else
echo "请求: POST ${BASE_URL}/record/stop"
echo "请求体: {} (使用当前录制的UUID)"
echo ""
RESPONSE=$(curl -s -X POST "${BASE_URL}/record/stop" \
-H "Content-Type: application/json" \
-d '{}')
fi
echo "$RESPONSE" | jq '.' || echo "$RESPONSE"
echo ""
echo "----------------------------------------"
echo ""
# 7. 再次检查状态(应该显示已停止)
echo -e "${YELLOW}[7] 检查录屏状态(应该显示已停止)${NC}"
echo "请求: GET ${BASE_URL}/record/status"
echo ""
curl -s -X GET "${BASE_URL}/record/status" | jq '.' || curl -s -X GET "${BASE_URL}/record/status"
echo ""
echo "----------------------------------------"
echo ""
# 8. 通过UUID下载录屏文件
echo -e "${YELLOW}[8] 通过UUID下载录屏文件${NC}"
if [ -n "$RECORDING_UUID" ] && [ "$RECORDING_UUID" != "null" ]; then
echo "请求: GET ${BASE_URL}/record/file?uuid=${RECORDING_UUID}"
echo ""
FILE_PATH=$(echo "$RESPONSE" | jq -r '.file' 2>/dev/null || echo "")
if [ -n "$FILE_PATH" ] && [ "$FILE_PATH" != "null" ]; then
FILENAME=$(basename "$FILE_PATH")
echo "正在通过UUID下载文件到: ${FILENAME}"
curl -s -X GET "${BASE_URL}/record/file?uuid=${RECORDING_UUID}" -o "${FILENAME}"
if [ $? -eq 0 ]; then
echo -e "${GREEN}下载成功: ${FILENAME}${NC}"
ls -lh "${FILENAME}"
else
echo -e "${RED}下载失败${NC}"
fi
else
echo "无法获取文件路径尝试直接通过UUID下载..."
curl -s -X GET "${BASE_URL}/record/file?uuid=${RECORDING_UUID}" -o "recording-${RECORDING_UUID}.mp4"
if [ $? -eq 0 ]; then
echo -e "${GREEN}下载成功: recording-${RECORDING_UUID}.mp4${NC}"
ls -lh "recording-${RECORDING_UUID}.mp4"
else
echo -e "${RED}下载失败${NC}"
fi
fi
else
echo "没有UUID尝试下载最后一个文件..."
curl -s -X GET "${BASE_URL}/record/file" -o "recording.mp4"
if [ $? -eq 0 ]; then
echo -e "${GREEN}下载成功: recording.mp4${NC}"
ls -lh "recording.mp4"
else
echo -e "${RED}下载失败${NC}"
fi
fi
echo ""
echo "----------------------------------------"
echo ""
# 9. 下载指定文件名的录屏文件
echo -e "${YELLOW}[9] 下载指定文件名的录屏文件${NC}"
echo "请求: GET ${BASE_URL}/record/file?name=test-recording.mp4"
echo ""
curl -s -X GET "${BASE_URL}/record/file?name=test-recording.mp4" -o "test-recording-download.mp4"
if [ $? -eq 0 ]; then
echo -e "${GREEN}下载成功: test-recording-download.mp4${NC}"
ls -lh "test-recording-download.mp4"
else
echo -e "${RED}下载失败(文件可能不存在)${NC}"
fi
echo ""
echo "=========================================="
echo "测试完成"
echo "=========================================="