From 23ef83b20d3f7a8677a0acc8e1bb48a80db97574 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksa=20Siri=C5=A1ki?= <31509435+aleksasiriski@users.noreply.github.com> Date: Fri, 13 Jan 2023 20:51:20 +0100 Subject: [PATCH 1/3] Fixed Postgres SQLite uses `?` for vars in it's SQL and Postgres uses `%s`, I hope this it the only difference... Also, for some reason Postgres doesn't like fetch in single command as it errors out with: ``` 'NoneType' object has no attribute 'fetchall' ``` So I divided every execute from every fetch. --- rffmpeg | 202 ++++++++++++++++++++++++++------------------------------ 1 file changed, 94 insertions(+), 108 deletions(-) diff --git a/rffmpeg b/rffmpeg index 4f8a53c..668cf3c 100755 --- a/rffmpeg +++ b/rffmpeg @@ -44,13 +44,25 @@ log = logging.getLogger("rffmpeg") # Use Postgresql if specified, otherwise use SQLite DB_TYPE = "SQLITE" -POSTGRES_HOST = os.getenv("POSTGRES_HOST") -POSTGRES_DB = os.getenv("POSTGRES_DB") -POSTGRES_USER = os.getenv("POSTGRES_USER") -POSTGRES_PASS = os.getenv("POSTGRES_PASS") -if POSTGRES_HOST != None and POSTGRES_DB != None and POSTGRES_USER != None: +SQL_VAR_SIGN="?" +POSTGRES_DB = os.environ.get("RFFMPEG_POSTGRES_DB", "rffmpeg") +POSTGRES_USER = os.environ.get("RFFMPEG_POSTGRES_USER") +POSTGRES_PASS = os.environ.get("RFFMPEG_POSTGRES_PASS") +POSTGRES_PORT = os.environ.get("RFFMPEG_POSTGRES_PORT", "5432") +POSTGRES_HOST = os.environ.get("RFFMPEG_POSTGRES_HOST", "localhost") + +if POSTGRES_DB != None and POSTGRES_USER != None: DB_TYPE = "POSTGRES" + SQL_VAR_SIGN="%s" + POSTGRES_CREDENTIALS = { + "dbname": POSTGRES_DB, + "user": POSTGRES_USER, + "password": POSTGRES_PASS, + "port": int(POSTGRES_PORT), + "host": POSTGRES_HOST + } from psycopg2 import connect as postgres_connect + from psycopg2 import DatabaseError as postgres_error # Open a database connection (context manager) @@ -72,20 +84,26 @@ def dbconn(config): cur = conn.cursor() yield cur conn.commit() + conn.close() + log.debug("SQLite connection closed.") elif DB_TYPE == "POSTGRES": + conn = None try: log.debug("Using Postgresql as database. Connecting...") - conn = postgres_connect() + conn = postgres_connect(**POSTGRES_CREDENTIALS) cur = conn.cursor() cur.execute('SELECT version()') db_version = cur.fetchone() log.debug("Connected to Postgresql version {}".format(db_version)) yield cur conn.commit() - except (Exception, psycopg2.DatabaseError) as error: + except (Exception, postgres_error) as error: + print(error) log.error(error) - conn.close() - log.debug("Database connection closed.") + finally: + if conn is not None: + conn.close() + log.debug("Postgresql connection closed.") def fail(msg): @@ -161,11 +179,11 @@ def load_config(): # Parse the keys from the state group config["state_dir"] = config_directories.get("state", "/var/lib/rffmpeg") config["persist_dir"] = config_directories.get("persist", "/run/shm") - config["dir_owner"] = config_directories.get("owner", "jellyfin") - config["dir_group"] = config_directories.get("group", "sudo") + config["dir_owner"] = config_directories.get("owner", "root") + config["dir_group"] = config_directories.get("group", "root") # Parse the keys from the remote group - config["remote_user"] = config_remote.get("user", "jellyfin") + config["remote_user"] = config_remote.get("user", "root") config["remote_args"] = config_remote.get( "args", ["-i", "/var/lib/jellyfin/.ssh/id_rsa"] ) @@ -217,10 +235,8 @@ def cleanup(signum="", frame=""): global config with dbconn(config) as cur: - cur.execute("DELETE FROM states WHERE process_id = ?", (config["current_pid"],)) - cur.execute( - "DELETE FROM processes WHERE process_id = ?", (config["current_pid"],) - ) + cur.execute(f"DELETE FROM states WHERE process_id = {SQL_VAR_SIGN}", (config["current_pid"],)) + cur.execute(f"DELETE FROM processes WHERE process_id = {SQL_VAR_SIGN}", (config["current_pid"],)) def generate_ssh_command(config, target_hostname): @@ -282,8 +298,10 @@ def get_target_host(config): log.debug("Determining optimal target host") # Select all hosts and active processes from the database with dbconn(config) as cur: - hosts = cur.execute("SELECT * FROM hosts").fetchall() - processes = cur.execute("SELECT * FROM processes").fetchall() + cur.execute("SELECT * FROM hosts") + hosts = cur.fetchall() + cur.execute("SELECT * FROM processes") + processes = cur.fetchall() # Generate a mapping dictionary of hosts and processes host_mappings = dict() @@ -292,9 +310,8 @@ def get_target_host(config): # Get the latest state with dbconn(config) as cur: - current_state = cur.execute( - "SELECT * FROM states WHERE host_id = ? ORDER BY id DESC", (hid,) - ).fetchone() + cur.execute(f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC", (hid,)) + current_state = cur.fetchone() if not current_state: current_state = "idle" @@ -352,7 +369,7 @@ def get_target_host(config): ) with dbconn(config) as cur: cur.execute( - "INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)", + f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})", (hid, config["current_pid"], "bad"), ) continue @@ -415,11 +432,11 @@ def run_local_ffmpeg(config, ffmpeg_args): with dbconn(config) as cur: cur.execute( - "INSERT INTO processes (host_id, process_id, cmd) VALUES (?, ?, ?)", + f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})", (0, config["current_pid"], cmd_name + " " + " ".join(ffmpeg_args)), ) cur.execute( - "INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)", + f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})", (0, config["current_pid"], "active"), ) @@ -472,11 +489,11 @@ def run_remote_ffmpeg(config, target_hid, target_hostname, target_servername, ff with dbconn(config) as cur: cur.execute( - "INSERT INTO processes (host_id, process_id, cmd) VALUES (?, ?, ?)", + f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})", (target_hid, config["current_pid"], cmd_name + " " + " ".join(ffmpeg_args)), ) cur.execute( - "INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)", + f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})", (target_hid, config["current_pid"], "active"), ) @@ -547,7 +564,8 @@ def run_control(config): # Check conditions for migrations with dbconn(config) as cur: # Migration for new servername (PR #36) - if cur.execute("SELECT COUNT(*) AS CNTREC FROM pragma_table_info('hosts') WHERE name='servername'").fetchone()[0] == 0: + cur.execute("SELECT COUNT(*) AS CNTREC FROM pragma_table_info('hosts') WHERE name='servername'") + if cur.fetchone()[0] == 0: cur.execute( "ALTER TABLE hosts ADD servername TEXT NOT NULL DEFAULT 'invalid'" ) @@ -555,10 +573,11 @@ def run_control(config): # Migration for new servername (PR #36) if did_alter_0001AddServername: with dbconn(config) as cur: - for host in cur.execute("SELECT * FROM hosts").fetchall(): + cur.execute("SELECT * FROM hosts") + for host in cur.fetchall(): if host[3] == 'invalid': cur.execute( - "UPDATE hosts SET servername = ? WHERE hostname = ?", (host[1], host[1]) + f"UPDATE hosts SET servername = {SQL_VAR_SIGN} WHERE hostname = {SQL_VAR_SIGN}", (host[1], host[1]) ) @click.command(name="init", short_help="Initialize the system.") @@ -602,20 +621,37 @@ def run_control(config): ) ) - if Path(config["db_path"]).is_file(): - os.remove(config["db_path"]) - try: with dbconn(config) as cur: cur.execute( - "CREATE TABLE hosts (id INTEGER PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)" + "DROP TABLE IF EXISTS hosts" ) cur.execute( - "CREATE TABLE processes (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)" + "DROP TABLE IF EXISTS processes" ) cur.execute( - "CREATE TABLE states (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)" + "DROP TABLE IF EXISTS states" ) + if DB_TYPE == "SQLITE": + cur.execute( + "CREATE TABLE hosts (id INTEGER PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)" + ) + cur.execute( + "CREATE TABLE processes (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)" + ) + cur.execute( + "CREATE TABLE states (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)" + ) + elif DB_TYPE == "POSTGRES": + cur.execute( + "CREATE TABLE hosts (id SERIAL PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)" + ) + cur.execute( + "CREATE TABLE processes (id SERIAL PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)" + ) + cur.execute( + "CREATE TABLE states (id SERIAL PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)" + ) except Exception as e: fail("Failed to create database: {}".format(e)) @@ -625,12 +661,13 @@ def run_control(config): getgrnam(config["dir_group"]).gr_gid, ) os.chmod(config["state_dir"], 0o770) - os.chown( - config["db_path"], - getpwnam(config["dir_owner"]).pw_uid, - getgrnam(config["dir_group"]).gr_gid, - ) - os.chmod(config["db_path"], 0o660) + if DB_TYPE == "SQLITE": + os.chown( + config["db_path"], + getpwnam(config["dir_owner"]).pw_uid, + getgrnam(config["dir_group"]).gr_gid, + ) + os.chmod(config["db_path"], 0o660) rffmpeg_click.add_command(rffmpeg_click_init) @@ -640,9 +677,12 @@ def run_control(config): Show the current status of all rffmpeg target hosts and active processes. """ with dbconn(config) as cur: - hosts = cur.execute("SELECT * FROM hosts").fetchall() - processes = cur.execute("SELECT * FROM processes").fetchall() - states = cur.execute("SELECT * FROM states").fetchall() + cur.execute("SELECT * FROM hosts") + hosts = cur.fetchall() + cur.execute("SELECT * FROM processes") + processes = cur.fetchall() + cur.execute("SELECT * FROM states") + states = cur.fetchall() # Determine if there are any fallback processes running fallback_processes = list() @@ -667,9 +707,8 @@ def run_control(config): # Get the latest state with dbconn(config) as cur: - current_state = cur.execute( - "SELECT * FROM states WHERE host_id = ? ORDER BY id DESC", (hid,) - ).fetchone() + cur.execute(f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC", (hid,)) + current_state = cur.fetchone() if not current_state: current_state = "idle" @@ -797,9 +836,7 @@ def run_control(config): name = host click.echo("Adding new host '{}' ({})".format(host, name)) with dbconn(config) as cur: - cur.execute( - "INSERT INTO hosts (hostname, weight, servername) VALUES (?, ?, ?)", (host, weight, name) - ) + cur.execute(f"INSERT INTO hosts (hostname, weight, servername) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})", (host, weight, name)) rffmpeg_click.add_command(rffmpeg_click_add) @@ -807,7 +844,7 @@ def run_control(config): @click.argument("host") def rffmpeg_click_remove(host): """ - Remove a host with internal ID/IP/hostname/servername HOST from the database. + Remove a host with internal ID or IP or hostname or servername HOST from the database. """ try: host = int(host) @@ -817,13 +854,11 @@ def run_control(config): fieldAlt = "hostname" with dbconn(config) as cur: - entry = cur.execute( - "SELECT * FROM hosts WHERE {} = ?".format(field), (host,) - ).fetchall() + cur.execute(f"SELECT * FROM hosts WHERE {field} = {SQL_VAR_SIGN}", (host,)) + entry = cur.fetchall() if len(entry) < 1: - entry = cur.execute( - "SELECT * FROM hosts WHERE {} = ?".format(fieldAlt), (host,) - ).fetchall() + cur.execute(f"SELECT * FROM hosts WHERE {fieldAlt} = {SQL_VAR_SIGN}", (host,)) + entry = cur.fetchall() if len(entry) < 1: fail("No hosts found to delete!") @@ -834,7 +869,7 @@ def run_control(config): for host in entry: hid, hostname, weight, servername = host click.echo("\tID: {}\tHostname: {}\tServername: {}".format(hid, hostname, servername)) - cur.execute("DELETE FROM hosts WHERE id = ?", (hid,)) + cur.execute(f"DELETE FROM hosts WHERE id = {SQL_VAR_SIGN}", (hid,)) rffmpeg_click.add_command(rffmpeg_click_remove) @@ -869,55 +904,6 @@ def run_control(config): rffmpeg_click.add_command(rffmpeg_click_log) - @click.command(name="clear", short_help="Clear processes and states.") - @click.argument("host", required=False, default=None) - def rffmpeg_click_log(host): - """ - Clear all active process and states from the database, optionally limited to a host with internal ID/IP/hostname/servername HOST. - - This command is designed to assist in rare error cases whereby stuck process states are present in the database, and should be used sparingly. This will not affect running processes negatively, though rffmpeg will no longer see them as active. It is recommended to run this command only when you are sure that no processes are actually running. - """ - with dbconn(config) as cur: - if host is not None: - try: - host = int(host) - field = "id" - except ValueError: - field = "servername" - fieldAlt = "hostname" - - entry = cur.execute( - "SELECT id FROM hosts WHERE {} = ?".format(field), (host,) - ).fetchall() - if len(entry) < 1: - entry = cur.execute( - "SELECT id FROM hosts WHERE {} = ?".format(fieldAlt), (host,) - ).fetchall() - if len(entry) < 1: - fail("Host not found!") - if len(entry) > 1: - fail("Multiple hosts found, please be more specific!") - host_id = entry[0][0] - - click.echo("Clearing all active processes and states for host ID '{}'".format(host_id)) - processes = cur.execute( - "SELECT id FROM processes WHERE host_id = ?", (host_id,) - ).fetchall() - states = cur.execute( - "SELECT id FROM states WHERE host_id = ?", (host_id,) - ).fetchall() - - for process in processes: - cur.execute("DELETE FROM processes WHERE id = ?", process) - for state in states: - cur.execute("DELETE FROM states WHERE id = ?", state) - else: - click.echo("Clearing all active processes and states") - cur.execute("DELETE FROM processes") - cur.execute("DELETE FROM states") - - rffmpeg_click.add_command(rffmpeg_click_log) - return rffmpeg_click(obj={}) @@ -931,6 +917,6 @@ if __name__ == "__main__": if "rffmpeg" in cmd_name: run_control(config) - else: + ffmpeg_args = all_args[1:] run_ffmpeg(config, ffmpeg_args) From 7963c0713d8999fe6c787beefb36604604033bf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksa=20Siri=C5=A1ki?= <31509435+aleksasiriski@users.noreply.github.com> Date: Fri, 13 Jan 2023 20:57:26 +0100 Subject: [PATCH 2/3] Fixed Postgres and updated to master --- rffmpeg | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/rffmpeg b/rffmpeg index 668cf3c..4638dd3 100755 --- a/rffmpeg +++ b/rffmpeg @@ -844,7 +844,7 @@ def run_control(config): @click.argument("host") def rffmpeg_click_remove(host): """ - Remove a host with internal ID or IP or hostname or servername HOST from the database. + Remove a host with internal ID/IP/hostname/servername HOST from the database. """ try: host = int(host) @@ -904,6 +904,54 @@ def run_control(config): rffmpeg_click.add_command(rffmpeg_click_log) + @click.command(name="clear", short_help="Clear processes and states.") + @click.argument("host", required=False, default=None) + def rffmpeg_click_log(host): + """ + Clear all active process and states from the database, optionally limited to a host with internal ID/IP/hostname/servername HOST. + This command is designed to assist in rare error cases whereby stuck process states are present in the database, and should be used sparingly. This will not affect running processes negatively, though rffmpeg will no longer see them as active. It is recommended to run this command only when you are sure that no processes are actually running. + """ + with dbconn(config) as cur: + if host is not None: + try: + host = int(host) + field = "id" + except ValueError: + field = "servername" + fieldAlt = "hostname" + + entry = cur.execute( + "SELECT id FROM hosts WHERE {} = ?".format(field), (host,) + ).fetchall() + if len(entry) < 1: + entry = cur.execute( + "SELECT id FROM hosts WHERE {} = ?".format(fieldAlt), (host,) + ).fetchall() + if len(entry) < 1: + fail("Host not found!") + if len(entry) > 1: + fail("Multiple hosts found, please be more specific!") + host_id = entry[0][0] + + click.echo("Clearing all active processes and states for host ID '{}'".format(host_id)) + processes = cur.execute( + "SELECT id FROM processes WHERE host_id = ?", (host_id,) + ).fetchall() + states = cur.execute( + "SELECT id FROM states WHERE host_id = ?", (host_id,) + ).fetchall() + + for process in processes: + cur.execute("DELETE FROM processes WHERE id = ?", process) + for state in states: + cur.execute("DELETE FROM states WHERE id = ?", state) + else: + click.echo("Clearing all active processes and states") + cur.execute("DELETE FROM processes") + cur.execute("DELETE FROM states") + + rffmpeg_click.add_command(rffmpeg_click_log) + return rffmpeg_click(obj={}) @@ -918,5 +966,6 @@ if __name__ == "__main__": if "rffmpeg" in cmd_name: run_control(config) + else: ffmpeg_args = all_args[1:] run_ffmpeg(config, ffmpeg_args) From 94e4402dd2a01820de088208d95380c8f75f7111 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksa=20Siri=C5=A1ki?= <31509435+aleksasiriski@users.noreply.github.com> Date: Fri, 13 Jan 2023 21:00:07 +0100 Subject: [PATCH 3/3] Reverted root to jellyfin user I needed root for testing --- rffmpeg | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rffmpeg b/rffmpeg index 4638dd3..80bf951 100755 --- a/rffmpeg +++ b/rffmpeg @@ -179,11 +179,11 @@ def load_config(): # Parse the keys from the state group config["state_dir"] = config_directories.get("state", "/var/lib/rffmpeg") config["persist_dir"] = config_directories.get("persist", "/run/shm") - config["dir_owner"] = config_directories.get("owner", "root") - config["dir_group"] = config_directories.get("group", "root") + config["dir_owner"] = config_directories.get("owner", "jellyfin") + config["dir_group"] = config_directories.get("group", "sudo") # Parse the keys from the remote group - config["remote_user"] = config_remote.get("user", "root") + config["remote_user"] = config_remote.get("user", "jellyfin") config["remote_args"] = config_remote.get( "args", ["-i", "/var/lib/jellyfin/.ssh/id_rsa"] )