Merge pull request #51 from aleksasiriski/patch-1

This commit is contained in:
Joshua M. Boniface 2023-01-13 15:02:19 -05:00 committed by GitHub
commit ccb58e5260
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

145
rffmpeg
View file

@ -44,13 +44,25 @@ log = logging.getLogger("rffmpeg")
# Use Postgresql if specified, otherwise use SQLite # Use Postgresql if specified, otherwise use SQLite
DB_TYPE = "SQLITE" DB_TYPE = "SQLITE"
POSTGRES_HOST = os.getenv("POSTGRES_HOST") SQL_VAR_SIGN="?"
POSTGRES_DB = os.getenv("POSTGRES_DB") POSTGRES_DB = os.environ.get("RFFMPEG_POSTGRES_DB", "rffmpeg")
POSTGRES_USER = os.getenv("POSTGRES_USER") POSTGRES_USER = os.environ.get("RFFMPEG_POSTGRES_USER")
POSTGRES_PASS = os.getenv("POSTGRES_PASS") POSTGRES_PASS = os.environ.get("RFFMPEG_POSTGRES_PASS")
if POSTGRES_HOST != None and POSTGRES_DB != None and POSTGRES_USER != None: POSTGRES_PORT = os.environ.get("RFFMPEG_POSTGRES_PORT", "5432")
POSTGRES_HOST = os.environ.get("RFFMPEG_POSTGRES_HOST", "localhost")
if POSTGRES_DB != None and POSTGRES_USER != None:
DB_TYPE = "POSTGRES" DB_TYPE = "POSTGRES"
SQL_VAR_SIGN="%s"
POSTGRES_CREDENTIALS = {
"dbname": POSTGRES_DB,
"user": POSTGRES_USER,
"password": POSTGRES_PASS,
"port": int(POSTGRES_PORT),
"host": POSTGRES_HOST
}
from psycopg2 import connect as postgres_connect from psycopg2 import connect as postgres_connect
from psycopg2 import DatabaseError as postgres_error
# Open a database connection (context manager) # Open a database connection (context manager)
@ -72,20 +84,26 @@ def dbconn(config):
cur = conn.cursor() cur = conn.cursor()
yield cur yield cur
conn.commit() conn.commit()
conn.close()
log.debug("SQLite connection closed.")
elif DB_TYPE == "POSTGRES": elif DB_TYPE == "POSTGRES":
conn = None
try: try:
log.debug("Using Postgresql as database. Connecting...") log.debug("Using Postgresql as database. Connecting...")
conn = postgres_connect() conn = postgres_connect(**POSTGRES_CREDENTIALS)
cur = conn.cursor() cur = conn.cursor()
cur.execute('SELECT version()') cur.execute('SELECT version()')
db_version = cur.fetchone() db_version = cur.fetchone()
log.debug("Connected to Postgresql version {}".format(db_version)) log.debug("Connected to Postgresql version {}".format(db_version))
yield cur yield cur
conn.commit() conn.commit()
except (Exception, psycopg2.DatabaseError) as error: except (Exception, postgres_error) as error:
print(error)
log.error(error) log.error(error)
conn.close() finally:
log.debug("Database connection closed.") if conn is not None:
conn.close()
log.debug("Postgresql connection closed.")
def fail(msg): def fail(msg):
@ -217,10 +235,8 @@ def cleanup(signum="", frame=""):
global config global config
with dbconn(config) as cur: with dbconn(config) as cur:
cur.execute("DELETE FROM states WHERE process_id = ?", (config["current_pid"],)) cur.execute(f"DELETE FROM states WHERE process_id = {SQL_VAR_SIGN}", (config["current_pid"],))
cur.execute( cur.execute(f"DELETE FROM processes WHERE process_id = {SQL_VAR_SIGN}", (config["current_pid"],))
"DELETE FROM processes WHERE process_id = ?", (config["current_pid"],)
)
def generate_ssh_command(config, target_hostname): def generate_ssh_command(config, target_hostname):
@ -282,8 +298,10 @@ def get_target_host(config):
log.debug("Determining optimal target host") log.debug("Determining optimal target host")
# Select all hosts and active processes from the database # Select all hosts and active processes from the database
with dbconn(config) as cur: with dbconn(config) as cur:
hosts = cur.execute("SELECT * FROM hosts").fetchall() cur.execute("SELECT * FROM hosts")
processes = cur.execute("SELECT * FROM processes").fetchall() hosts = cur.fetchall()
cur.execute("SELECT * FROM processes")
processes = cur.fetchall()
# Generate a mapping dictionary of hosts and processes # Generate a mapping dictionary of hosts and processes
host_mappings = dict() host_mappings = dict()
@ -292,9 +310,8 @@ def get_target_host(config):
# Get the latest state # Get the latest state
with dbconn(config) as cur: with dbconn(config) as cur:
current_state = cur.execute( cur.execute(f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC", (hid,))
"SELECT * FROM states WHERE host_id = ? ORDER BY id DESC", (hid,) current_state = cur.fetchone()
).fetchone()
if not current_state: if not current_state:
current_state = "idle" current_state = "idle"
@ -352,7 +369,7 @@ def get_target_host(config):
) )
with dbconn(config) as cur: with dbconn(config) as cur:
cur.execute( cur.execute(
"INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)", f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(hid, config["current_pid"], "bad"), (hid, config["current_pid"], "bad"),
) )
continue continue
@ -415,11 +432,11 @@ def run_local_ffmpeg(config, ffmpeg_args):
with dbconn(config) as cur: with dbconn(config) as cur:
cur.execute( cur.execute(
"INSERT INTO processes (host_id, process_id, cmd) VALUES (?, ?, ?)", f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(0, config["current_pid"], cmd_name + " " + " ".join(ffmpeg_args)), (0, config["current_pid"], cmd_name + " " + " ".join(ffmpeg_args)),
) )
cur.execute( cur.execute(
"INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)", f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(0, config["current_pid"], "active"), (0, config["current_pid"], "active"),
) )
@ -472,11 +489,11 @@ def run_remote_ffmpeg(config, target_hid, target_hostname, target_servername, ff
with dbconn(config) as cur: with dbconn(config) as cur:
cur.execute( cur.execute(
"INSERT INTO processes (host_id, process_id, cmd) VALUES (?, ?, ?)", f"INSERT INTO processes (host_id, process_id, cmd) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(target_hid, config["current_pid"], cmd_name + " " + " ".join(ffmpeg_args)), (target_hid, config["current_pid"], cmd_name + " " + " ".join(ffmpeg_args)),
) )
cur.execute( cur.execute(
"INSERT INTO states (host_id, process_id, state) VALUES (?, ?, ?)", f"INSERT INTO states (host_id, process_id, state) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})",
(target_hid, config["current_pid"], "active"), (target_hid, config["current_pid"], "active"),
) )
@ -547,7 +564,8 @@ def run_control(config):
# Check conditions for migrations # Check conditions for migrations
with dbconn(config) as cur: with dbconn(config) as cur:
# Migration for new servername (PR #36) # Migration for new servername (PR #36)
if cur.execute("SELECT COUNT(*) AS CNTREC FROM pragma_table_info('hosts') WHERE name='servername'").fetchone()[0] == 0: cur.execute("SELECT COUNT(*) AS CNTREC FROM pragma_table_info('hosts') WHERE name='servername'")
if cur.fetchone()[0] == 0:
cur.execute( cur.execute(
"ALTER TABLE hosts ADD servername TEXT NOT NULL DEFAULT 'invalid'" "ALTER TABLE hosts ADD servername TEXT NOT NULL DEFAULT 'invalid'"
) )
@ -555,10 +573,11 @@ def run_control(config):
# Migration for new servername (PR #36) # Migration for new servername (PR #36)
if did_alter_0001AddServername: if did_alter_0001AddServername:
with dbconn(config) as cur: with dbconn(config) as cur:
for host in cur.execute("SELECT * FROM hosts").fetchall(): cur.execute("SELECT * FROM hosts")
for host in cur.fetchall():
if host[3] == 'invalid': if host[3] == 'invalid':
cur.execute( cur.execute(
"UPDATE hosts SET servername = ? WHERE hostname = ?", (host[1], host[1]) f"UPDATE hosts SET servername = {SQL_VAR_SIGN} WHERE hostname = {SQL_VAR_SIGN}", (host[1], host[1])
) )
@click.command(name="init", short_help="Initialize the system.") @click.command(name="init", short_help="Initialize the system.")
@ -602,20 +621,37 @@ def run_control(config):
) )
) )
if Path(config["db_path"]).is_file():
os.remove(config["db_path"])
try: try:
with dbconn(config) as cur: with dbconn(config) as cur:
cur.execute( cur.execute(
"CREATE TABLE hosts (id INTEGER PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)" "DROP TABLE IF EXISTS hosts"
) )
cur.execute( cur.execute(
"CREATE TABLE processes (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)" "DROP TABLE IF EXISTS processes"
) )
cur.execute( cur.execute(
"CREATE TABLE states (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)" "DROP TABLE IF EXISTS states"
) )
if DB_TYPE == "SQLITE":
cur.execute(
"CREATE TABLE hosts (id INTEGER PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)"
)
cur.execute(
"CREATE TABLE processes (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)"
)
cur.execute(
"CREATE TABLE states (id INTEGER PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)"
)
elif DB_TYPE == "POSTGRES":
cur.execute(
"CREATE TABLE hosts (id SERIAL PRIMARY KEY, hostname TEXT NOT NULL, weight INTEGER DEFAULT 1, servername TEXT NOT NULL)"
)
cur.execute(
"CREATE TABLE processes (id SERIAL PRIMARY KEY, host_id INTEGER, process_id INTEGER, cmd TEXT)"
)
cur.execute(
"CREATE TABLE states (id SERIAL PRIMARY KEY, host_id INTEGER, process_id INTEGER, state TEXT)"
)
except Exception as e: except Exception as e:
fail("Failed to create database: {}".format(e)) fail("Failed to create database: {}".format(e))
@ -625,12 +661,13 @@ def run_control(config):
getgrnam(config["dir_group"]).gr_gid, getgrnam(config["dir_group"]).gr_gid,
) )
os.chmod(config["state_dir"], 0o770) os.chmod(config["state_dir"], 0o770)
os.chown( if DB_TYPE == "SQLITE":
config["db_path"], os.chown(
getpwnam(config["dir_owner"]).pw_uid, config["db_path"],
getgrnam(config["dir_group"]).gr_gid, getpwnam(config["dir_owner"]).pw_uid,
) getgrnam(config["dir_group"]).gr_gid,
os.chmod(config["db_path"], 0o660) )
os.chmod(config["db_path"], 0o660)
rffmpeg_click.add_command(rffmpeg_click_init) rffmpeg_click.add_command(rffmpeg_click_init)
@ -640,9 +677,12 @@ def run_control(config):
Show the current status of all rffmpeg target hosts and active processes. Show the current status of all rffmpeg target hosts and active processes.
""" """
with dbconn(config) as cur: with dbconn(config) as cur:
hosts = cur.execute("SELECT * FROM hosts").fetchall() cur.execute("SELECT * FROM hosts")
processes = cur.execute("SELECT * FROM processes").fetchall() hosts = cur.fetchall()
states = cur.execute("SELECT * FROM states").fetchall() cur.execute("SELECT * FROM processes")
processes = cur.fetchall()
cur.execute("SELECT * FROM states")
states = cur.fetchall()
# Determine if there are any fallback processes running # Determine if there are any fallback processes running
fallback_processes = list() fallback_processes = list()
@ -667,9 +707,8 @@ def run_control(config):
# Get the latest state # Get the latest state
with dbconn(config) as cur: with dbconn(config) as cur:
current_state = cur.execute( cur.execute(f"SELECT * FROM states WHERE host_id = {SQL_VAR_SIGN} ORDER BY id DESC", (hid,))
"SELECT * FROM states WHERE host_id = ? ORDER BY id DESC", (hid,) current_state = cur.fetchone()
).fetchone()
if not current_state: if not current_state:
current_state = "idle" current_state = "idle"
@ -797,9 +836,7 @@ def run_control(config):
name = host name = host
click.echo("Adding new host '{}' ({})".format(host, name)) click.echo("Adding new host '{}' ({})".format(host, name))
with dbconn(config) as cur: with dbconn(config) as cur:
cur.execute( cur.execute(f"INSERT INTO hosts (hostname, weight, servername) VALUES ({SQL_VAR_SIGN}, {SQL_VAR_SIGN}, {SQL_VAR_SIGN})", (host, weight, name))
"INSERT INTO hosts (hostname, weight, servername) VALUES (?, ?, ?)", (host, weight, name)
)
rffmpeg_click.add_command(rffmpeg_click_add) rffmpeg_click.add_command(rffmpeg_click_add)
@ -817,13 +854,11 @@ def run_control(config):
fieldAlt = "hostname" fieldAlt = "hostname"
with dbconn(config) as cur: with dbconn(config) as cur:
entry = cur.execute( cur.execute(f"SELECT * FROM hosts WHERE {field} = {SQL_VAR_SIGN}", (host,))
"SELECT * FROM hosts WHERE {} = ?".format(field), (host,) entry = cur.fetchall()
).fetchall()
if len(entry) < 1: if len(entry) < 1:
entry = cur.execute( cur.execute(f"SELECT * FROM hosts WHERE {fieldAlt} = {SQL_VAR_SIGN}", (host,))
"SELECT * FROM hosts WHERE {} = ?".format(fieldAlt), (host,) entry = cur.fetchall()
).fetchall()
if len(entry) < 1: if len(entry) < 1:
fail("No hosts found to delete!") fail("No hosts found to delete!")
@ -834,7 +869,7 @@ def run_control(config):
for host in entry: for host in entry:
hid, hostname, weight, servername = host hid, hostname, weight, servername = host
click.echo("\tID: {}\tHostname: {}\tServername: {}".format(hid, hostname, servername)) click.echo("\tID: {}\tHostname: {}\tServername: {}".format(hid, hostname, servername))
cur.execute("DELETE FROM hosts WHERE id = ?", (hid,)) cur.execute(f"DELETE FROM hosts WHERE id = {SQL_VAR_SIGN}", (hid,))
rffmpeg_click.add_command(rffmpeg_click_remove) rffmpeg_click.add_command(rffmpeg_click_remove)
@ -874,7 +909,6 @@ def run_control(config):
def rffmpeg_click_log(host): def rffmpeg_click_log(host):
""" """
Clear all active process and states from the database, optionally limited to a host with internal ID/IP/hostname/servername HOST. Clear all active process and states from the database, optionally limited to a host with internal ID/IP/hostname/servername HOST.
This command is designed to assist in rare error cases whereby stuck process states are present in the database, and should be used sparingly. This will not affect running processes negatively, though rffmpeg will no longer see them as active. It is recommended to run this command only when you are sure that no processes are actually running. This command is designed to assist in rare error cases whereby stuck process states are present in the database, and should be used sparingly. This will not affect running processes negatively, though rffmpeg will no longer see them as active. It is recommended to run this command only when you are sure that no processes are actually running.
""" """
with dbconn(config) as cur: with dbconn(config) as cur:
@ -931,6 +965,7 @@ if __name__ == "__main__":
if "rffmpeg" in cmd_name: if "rffmpeg" in cmd_name:
run_control(config) run_control(config)
else: else:
ffmpeg_args = all_args[1:] ffmpeg_args = all_args[1:]
run_ffmpeg(config, ffmpeg_args) run_ffmpeg(config, ffmpeg_args)