Fixed Postgres

SQLite uses `?` for vars in it's SQL and Postgres uses `%s`, I hope this it the only difference...

Also, for some reason Postgres doesn't like fetch in single command as it errors out with:
```
'NoneType' object has no attribute 'fetchall'
```
So I divided every execute from every fetch.
This commit is contained in:
Aleksa Siriški 2023-01-13 20:51:20 +01:00 committed by GitHub
parent 51831feae4
commit 23ef83b20d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

202
rffmpeg
View file

@ -44,13 +44,25 @@ log = logging.getLogger("rffmpeg")
# Use Postgresql if specified, otherwise use SQLite
DB_TYPE = "SQLITE"
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASS = os.getenv("POSTGRES_PASS")
if POSTGRES_HOST != None and POSTGRES_DB != None and POSTGRES_USER != None:
SQL_VAR_SIGN="?"
POSTGRES_DB = os.environ.get("RFFMPEG_POSTGRES_DB", "rffmpeg")
POSTGRES_USER = os.environ.get("RFFMPEG_POSTGRES_USER")
POSTGRES_PASS = os.environ.get("RFFMPEG_POSTGRES_PASS")
POSTGRES_PORT = os.environ.get("RFFMPEG_POSTGRES_PORT", "5432")
POSTGRES_HOST = os.environ.get("RFFMPEG_POSTGRES_HOST", "localhost")
if POSTGRES_DB != None and POSTGRES_USER != None:
DB_TYPE = "POSTGRES"
SQL_VAR_SIGN="%s"
POSTGRES_CREDENTIALS = {
"dbname": POSTGRES_DB,
"user": POSTGRES_USER,
"password": POSTGRES_PASS,
"port": int(POSTGRES_PORT),
"host": POSTGRES_HOST
}
from psycopg2 import connect as postgres_connect
from psycopg2 import DatabaseError as postgres_error
# Open a database connection (context manager)
@ -72,20 +84,26 @@ def dbconn(config):
cur = conn.cursor()
yield cur
conn.commit()
conn.close()
log.debug("SQLite connection closed.")
elif DB_TYPE == "POSTGRES":
conn = None
try:
log.debug("Using Postgresql as database. Connecting...")
conn = postgres_connect()
conn = postgres_connect(**POSTGRES_CREDENTIALS)
cur = conn.cursor()
cur.execute('SELECT version()')
db_version = cur.fetchone()
log.debug("Connected to Postgresql version {}".format(db_version))
yield cur
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
except (Exception, postgres_error) as error:
print(error)
log.error(error)
conn.close()
log.debug("Database connection closed.")
finally:
if conn is not None:
conn.close()
log.debug("Postgresql connection closed.")
def fail(msg):
@ -161,11 +179,11 @@ def load_config():
# Parse the keys from the state group
config["state_dir"] = config_directories.get("state", "/var/lib/rffmpeg")
config["persist_dir"] = config_directories.get("persist", "/run/shm")
config["dir_owner"] = config_directories.get("owner", "jellyfin")
config["dir_group"] = config_directories.get("group", "sudo")
config["dir_owner"] = config_directories.get("owner", "root")
config["dir_group"] = config_directories.get("group", "root")
# Parse the keys from the remote group
config["remote_user"] = config_remote.get("user", "jellyfin")
config["remote_user"] = config_remote.get("user", "root")
config["remote_args"] = config_remote.get(
"args", ["-i", "/var/lib/jellyfin/.ssh/id_rsa"]
)
@ -217,10 +235,8 @@ def cleanup(signum="", frame=""):
global config
with dbconn(config) as cur:
cur.execute("DELETE FROM states WHERE process_id = ?", (config["current_pid"],))
cur.execute(
"DELETE FROM processes WHERE process_id = ?", (config["current_pid"],)
)
cur.execute(f"DELETE FROM states WHERE process_id = {SQL_VAR_SIGN}", (config["current_pid"],))
cur.execute(f"DELETE FROM processes WHERE process_id = {SQL_VAR_SIGN}", (config["current_pid"],))
def generate_ssh_command(config, target_hostname):
@ -282,8 +298,10 @@ def get_target_host(config):
log.debug("Determining optimal target host")
# Select all hosts and active processes from the database
with dbconn(config) as cur:
hosts = cur.execute("SELECT * FROM hosts").fetchall()
processes = cur.execute("SELECT * FROM processes").fetchall()
cur.execute("SELECT * FROM hosts")
hosts = cur.fetchall()
cur.execute("SELECT * FROM processes")
processes = cur.fetchall()
# Generate a mapping dictionary of hosts and processes
host_mappings = dict()
@ -292,9 +310,8 @@ def get_target_host(config):
# Get the latest state
with dbconn(config) as cur:
current_state = cur.execute(
"SELECT * FROM states WHERE host_id = ? ORDER BY id DESC", (hid,)
).fetchone()
cur.execute(f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC", (hid,))
current_state = cur.fetchone()
if not current_state:
current_state = "idle"
@ -352,7 +369,7 @@ def get_target_host(config):
)
with dbconn(config) as cur:
cur.execute(
"INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)",
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(hid, config["current_pid"], "bad"),
)
continue
@ -415,11 +432,11 @@ def run_local_ffmpeg(config, ffmpeg_args):
with dbconn(config) as cur:
cur.execute(
"INSERT INTO processes (host_id, process_id, cmd) VALUES (?, ?, ?)",
f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(0, config["current_pid"], cmd_name + " " + " ".join(ffmpeg_args)),
)
cur.execute(
"INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)",
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(0, config["current_pid"], "active"),
)
@ -472,11 +489,11 @@ def run_remote_ffmpeg(config, target_hid, target_hostname, target_servername, ff
with dbconn(config) as cur:
cur.execute(
"INSERT INTO processes (host_id, process_id, cmd) VALUES (?, ?, ?)",
f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(target_hid, config["current_pid"], cmd_name + " " + " ".join(ffmpeg_args)),
)
cur.execute(
"INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)",
f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(target_hid, config["current_pid"], "active"),
)
@ -547,7 +564,8 @@ def run_control(config):
# Check conditions for migrations
with dbconn(config) as cur:
# Migration for new servername (PR #36)
if cur.execute("SELECT COUNT(*) AS CNTREC FROM pragma_table_info('hosts') WHERE name='servername'").fetchone()[0] == 0:
cur.execute("SELECT COUNT(*) AS CNTREC FROM pragma_table_info('hosts') WHERE name='servername'")
if cur.fetchone()[0] == 0:
cur.execute(
"ALTER TABLE hosts ADD servername TEXT NOT NULL DEFAULT 'invalid'"
)
@ -555,10 +573,11 @@ def run_control(config):
# Migration for new servername (PR #36)
if did_alter_0001AddServername:
with dbconn(config) as cur:
for host in cur.execute("SELECT * FROM hosts").fetchall():
cur.execute("SELECT * FROM hosts")
for host in cur.fetchall():
if host[3] == 'invalid':
cur.execute(
"UPDATE hosts SET servername = ? WHERE hostname = ?", (host[1], host[1])
f"UPDATE hosts SET servername = {SQL_VAR_SIGN} WHERE hostname = {SQL_VAR_SIGN}", (host[1], host[1])
)
@click.command(name="init", short_help="Initialize the system.")
@ -602,20 +621,37 @@ def run_control(config):
)
)
if Path(config["db_path"]).is_file():
os.remove(config["db_path"])
try:
with dbconn(config) as cur:
cur.execute(
"CREATE TABLE hosts (id INTEGER PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)"
"DROP TABLE IF EXISTS hosts"
)
cur.execute(
"CREATE TABLE processes (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)"
"DROP TABLE IF EXISTS processes"
)
cur.execute(
"CREATE TABLE states (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)"
"DROP TABLE IF EXISTS states"
)
if DB_TYPE == "SQLITE":
cur.execute(
"CREATE TABLE hosts (id INTEGER PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)"
)
cur.execute(
"CREATE TABLE processes (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)"
)
cur.execute(
"CREATE TABLE states (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)"
)
elif DB_TYPE == "POSTGRES":
cur.execute(
"CREATE TABLE hosts (id SERIAL PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)"
)
cur.execute(
"CREATE TABLE processes (id SERIAL PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)"
)
cur.execute(
"CREATE TABLE states (id SERIAL PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)"
)
except Exception as e:
fail("Failed to create database: {}".format(e))
@ -625,12 +661,13 @@ def run_control(config):
getgrnam(config["dir_group"]).gr_gid,
)
os.chmod(config["state_dir"], 0o770)
os.chown(
config["db_path"],
getpwnam(config["dir_owner"]).pw_uid,
getgrnam(config["dir_group"]).gr_gid,
)
os.chmod(config["db_path"], 0o660)
if DB_TYPE == "SQLITE":
os.chown(
config["db_path"],
getpwnam(config["dir_owner"]).pw_uid,
getgrnam(config["dir_group"]).gr_gid,
)
os.chmod(config["db_path"], 0o660)
rffmpeg_click.add_command(rffmpeg_click_init)
@ -640,9 +677,12 @@ def run_control(config):
Show the current status of all rffmpeg target hosts and active processes.
"""
with dbconn(config) as cur:
hosts = cur.execute("SELECT * FROM hosts").fetchall()
processes = cur.execute("SELECT * FROM processes").fetchall()
states = cur.execute("SELECT * FROM states").fetchall()
cur.execute("SELECT * FROM hosts")
hosts = cur.fetchall()
cur.execute("SELECT * FROM processes")
processes = cur.fetchall()
cur.execute("SELECT * FROM states")
states = cur.fetchall()
# Determine if there are any fallback processes running
fallback_processes = list()
@ -667,9 +707,8 @@ def run_control(config):
# Get the latest state
with dbconn(config) as cur:
current_state = cur.execute(
"SELECT * FROM states WHERE host_id = ? ORDER BY id DESC", (hid,)
).fetchone()
cur.execute(f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC", (hid,))
current_state = cur.fetchone()
if not current_state:
current_state = "idle"
@ -797,9 +836,7 @@ def run_control(config):
name = host
click.echo("Adding new host '{}' ({})".format(host, name))
with dbconn(config) as cur:
cur.execute(
"INSERT INTO hosts (hostname, weight, servername) VALUES (?, ?, ?)", (host, weight, name)
)
cur.execute(f"INSERT INTO hosts (hostname, weight, servername) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})", (host, weight, name))
rffmpeg_click.add_command(rffmpeg_click_add)
@ -807,7 +844,7 @@ def run_control(config):
@click.argument("host")
def rffmpeg_click_remove(host):
"""
Remove a host with internal ID/IP/hostname/servername HOST from the database.
Remove a host with internal ID or IP or hostname or servername HOST from the database.
"""
try:
host = int(host)
@ -817,13 +854,11 @@ def run_control(config):
fieldAlt = "hostname"
with dbconn(config) as cur:
entry = cur.execute(
"SELECT * FROM hosts WHERE {} = ?".format(field), (host,)
).fetchall()
cur.execute(f"SELECT * FROM hosts WHERE {field} = {SQL_VAR_SIGN}", (host,))
entry = cur.fetchall()
if len(entry) < 1:
entry = cur.execute(
"SELECT * FROM hosts WHERE {} = ?".format(fieldAlt), (host,)
).fetchall()
cur.execute(f"SELECT * FROM hosts WHERE {fieldAlt} = {SQL_VAR_SIGN}", (host,))
entry = cur.fetchall()
if len(entry) < 1:
fail("No hosts found to delete!")
@ -834,7 +869,7 @@ def run_control(config):
for host in entry:
hid, hostname, weight, servername = host
click.echo("\tID: {}\tHostname: {}\tServername: {}".format(hid, hostname, servername))
cur.execute("DELETE FROM hosts WHERE id = ?", (hid,))
cur.execute(f"DELETE FROM hosts WHERE id = {SQL_VAR_SIGN}", (hid,))
rffmpeg_click.add_command(rffmpeg_click_remove)
@ -869,55 +904,6 @@ def run_control(config):
rffmpeg_click.add_command(rffmpeg_click_log)
@click.command(name="clear", short_help="Clear processes and states.")
@click.argument("host", required=False, default=None)
def rffmpeg_click_log(host):
"""
Clear all active process and states from the database, optionally limited to a host with internal ID/IP/hostname/servername HOST.
This command is designed to assist in rare error cases whereby stuck process states are present in the database, and should be used sparingly. This will not affect running processes negatively, though rffmpeg will no longer see them as active. It is recommended to run this command only when you are sure that no processes are actually running.
"""
with dbconn(config) as cur:
if host is not None:
try:
host = int(host)
field = "id"
except ValueError:
field = "servername"
fieldAlt = "hostname"
entry = cur.execute(
"SELECT id FROM hosts WHERE {} = ?".format(field), (host,)
).fetchall()
if len(entry) < 1:
entry = cur.execute(
"SELECT id FROM hosts WHERE {} = ?".format(fieldAlt), (host,)
).fetchall()
if len(entry) < 1:
fail("Host not found!")
if len(entry) > 1:
fail("Multiple hosts found, please be more specific!")
host_id = entry[0][0]
click.echo("Clearing all active processes and states for host ID '{}'".format(host_id))
processes = cur.execute(
"SELECT id FROM processes WHERE host_id = ?", (host_id,)
).fetchall()
states = cur.execute(
"SELECT id FROM states WHERE host_id = ?", (host_id,)
).fetchall()
for process in processes:
cur.execute("DELETE FROM processes WHERE id = ?", process)
for state in states:
cur.execute("DELETE FROM states WHERE id = ?", state)
else:
click.echo("Clearing all active processes and states")
cur.execute("DELETE FROM processes")
cur.execute("DELETE FROM states")
rffmpeg_click.add_command(rffmpeg_click_log)
return rffmpeg_click(obj={})
@ -931,6 +917,6 @@ if __name__ == "__main__":
if "rffmpeg" in cmd_name:
run_control(config)
else:
ffmpeg_args = all_args[1:]
run_ffmpeg(config, ffmpeg_args)