diff --git a/manage_db.py b/manage_db.py index 222bf70..53ebb38 100644 --- a/manage_db.py +++ b/manage_db.py @@ -4,6 +4,11 @@ import sqlite3 import fcntl import uuid +from utilities.constants import APIKEY +from utilities.constants import UUID +from utilities.constants import USERS_TABLE_NAME +from utilities.constants import HISTORY_TABLE_NAME + # Function to acquire a lock on the database file def acquire_lock(lock_file): @@ -21,15 +26,15 @@ def release_lock(lock_file): USERS_TABLE_COLUMNS = [ "id INTEGER PRIMARY KEY AUTOINCREMENT", "username TEXT UNIQUE", - "apikey TEXT", + f"{APIKEY} TEXT", "quota INT DEFAULT 50", ] HISTORY_TABLE_COLUMNS = [ - "uuid TEXT PRIMARY KEY", + f"{UUID} TEXT PRIMARY KEY", "created_at TIMESTAMP", "updated_at TIMESTAMP", - "apikey TEXT", + f"{APIKEY} TEXT", "priority INT", "type TEXT", "status TEXT", @@ -58,9 +63,9 @@ def create_or_update_table(c, table_name): ) existing_table = c.fetchone() - if table_name == "users": + if table_name == USERS_TABLE_NAME: target_columns = USERS_TABLE_COLUMNS - elif table_name == "history": + elif table_name == HISTORY_TABLE_NAME: target_columns = HISTORY_TABLE_COLUMNS else: target_columns = [] @@ -109,47 +114,59 @@ def modify_table(c, table_name, operation, column_name=None, data_type=None): def create_user(c, username, apikey): """Create a user with the given username and apikey, or update the apikey if the username already exists""" - c.execute("SELECT * FROM users WHERE username=?", (username,)) + c.execute(f"SELECT * FROM {USERS_TABLE_NAME} WHERE username=?", (username,)) result = c.fetchone() if result is not None: raise ValueError(f"found exisitng user {username}, please use update") else: c.execute( - "INSERT INTO users (username, apikey) VALUES (?, ?)", (username, apikey) + f"INSERT INTO {USERS_TABLE_NAME} (username, {APIKEY}) VALUES (?, ?)", + (username, apikey), ) def update_user(c, username, apikey): """Update the apikey for the user with the given username""" - c.execute("SELECT apikey FROM users WHERE username=?", (username,)) + c.execute(f"SELECT {APIKEY} FROM {USERS_TABLE_NAME} WHERE username=?", (username,)) result = c.fetchone() if result is not None: old_apikey = result[0] - c.execute("UPDATE history SET apikey=? WHERE apikey=?", (apikey, old_apikey)) - c.execute("UPDATE users SET apikey=? WHERE username=?", (apikey, username)) + c.execute( + f"UPDATE {HISTORY_TABLE_NAME} SET {APIKEY}=? WHERE {APIKEY}=?", + (apikey, old_apikey), + ) + c.execute( + f"UPDATE {USERS_TABLE_NAME} SET {APIKEY}=? WHERE username=?", + (apikey, username), + ) else: raise ValueError("username does not exist! create it first?") def update_quota(c, apikey, quota): - c.execute("SELECT username FROM users WHERE apikey=?", (apikey,)) + c.execute(f"SELECT username FROM {USERS_TABLE_NAME} WHERE {APIKEY}=?", (apikey,)) result = c.fetchone() if result is not None: - c.execute("UPDATE users SET quota=? WHERE apikey=?", (quota, apikey)) + c.execute( + f"UPDATE {USERS_TABLE_NAME} SET quota=? WHERE {APIKEY}=?", (quota, apikey) + ) raise ValueError(f"{apikey} does not exist") def update_username(c, apikey, username): - c.execute("SELECT username FROM users WHERE apikey=?", (apikey,)) + c.execute(f"SELECT username FROM {USERS_TABLE_NAME} WHERE {APIKEY}=?", (apikey,)) result = c.fetchone() if result is not None: - c.execute("UPDATE users SET username=? WHERE apikey=?", (username, apikey)) + c.execute( + f"UPDATE {USERS_TABLE_NAME} SET username=? WHERE {APIKEY}=?", + (username, apikey), + ) def delete_user(c, username): """Delete the user with the given username, or ignore the operation if the user does not exist""" delete_jobs(c, username=username) - c.execute("DELETE FROM users WHERE username=?", (username,)) + c.execute(f"DELETE FROM {USERS_TABLE_NAME} WHERE username=?", (username,)) print(f"removed {c.rowcount} entries") @@ -157,7 +174,7 @@ def delete_jobs(c, job_uuid="", username=""): """Delete the job with the given uuid, or ignore the operation if the uuid does not exist""" if username: c.execute( - "SELECT img, ref_img, mask_img FROM history WHERE apikey=(SELECT apikey FROM users WHERE username=?)", + f"SELECT img, ref_img, mask_img FROM {HISTORY_TABLE_NAME} WHERE apikey=(SELECT {APIKEY} FROM {USERS_TABLE_NAME} WHERE username=?)", (username,), ) rows = c.fetchall() @@ -175,13 +192,13 @@ def delete_jobs(c, job_uuid="", username=""): print(f"failed to remove {filepath}") raise c.execute( - "DELETE FROM history WHERE apikey=(SELECT apikey FROM users WHERE username=?)", + f"DELETE FROM {HISTORY_TABLE_NAME} WHERE {APIKEY}=(SELECT {APIKEY} FROM {USERS_TABLE_NAME} WHERE username=?)", (username,), ) print(f"removed {c.rowcount} entries") elif job_uuid: c.execute( - "SELECT img, ref_img, mask_img FROM history WHERE uuid=?", + f"SELECT img, ref_img, mask_img FROM {HISTORY_TABLE_NAME} WHERE uuid=?", (job_uuid,), ) result = c.fetchone() @@ -197,7 +214,7 @@ def delete_jobs(c, job_uuid="", username=""): print(f"failed to remove {filepath}") raise c.execute( - "DELETE FROM history WHERE uuid=?", + f"DELETE FROM {HISTORY_TABLE_NAME} WHERE uuid=?", (job_uuid,), ) print(f"removed {c.rowcount} entries") @@ -207,15 +224,21 @@ def show_users(c, username="", details=False): """Print all users in the users table if username is not specified, or only the user with the given username otherwise""" if username: - c.execute("SELECT username, apikey FROM users WHERE username=?", (username,)) + c.execute( + f"SELECT username, {APIKEY} FROM {USERS_TABLE_NAME} WHERE username=?", + (username,), + ) user = c.fetchone() if user: - c.execute("SELECT COUNT(*) FROM history WHERE apikey=?", (user[1],)) + c.execute( + f"SELECT COUNT(*) FROM {HISTORY_TABLE_NAME} WHERE {APIKEY}=?", + (user[1],), + ) count = c.fetchone()[0] print(f"Username: {user[0]}, API Key: {user[1]}, Number of jobs: {count}") if details: c.execute( - "SELECT uuid, created_at, updated_at, type, status, width, height, steps, img, ref_img, mask_img, is_private FROM history WHERE apikey=?", + f"SELECT {UUID}, created_at, updated_at, type, status, width, height, steps, img, ref_img, mask_img, is_private FROM {HISTORY_TABLE_NAME} WHERE apikey=?", (user[1],), ) rows = c.fetchall() @@ -224,15 +247,18 @@ def show_users(c, username="", details=False): else: print(f"No user with username '{username}' found") else: - c.execute("SELECT username, apikey FROM users") + c.execute(f"SELECT username, {APIKEY} FROM {USERS_TABLE_NAME}") users = c.fetchall() for user in users: - c.execute("SELECT COUNT(*) FROM history WHERE apikey=?", (user[1],)) + c.execute( + f"SELECT COUNT(*) FROM {HISTORY_TABLE_NAME} WHERE {APIKEY}=?", + (user[1],), + ) count = c.fetchone()[0] print(f"Username: {user[0]}, API Key: {user[1]}, Number of jobs: {count}") if details: c.execute( - "SELECT * FROM history WHERE apikey=?", + f"SELECT * FROM {HISTORY_TABLE_NAME} WHERE {APIKEY}=?", (user[1],), ) rows = c.fetchall() @@ -260,8 +286,8 @@ def manage(args): c = conn.cursor() # Create the users and history tables if they don't exist - create_or_update_table(c, "users") - create_or_update_table(c, "history") + create_or_update_table(c, USERS_TABLE_NAME) + create_or_update_table(c, HISTORY_TABLE_NAME) # Perform the requested action if args.action == "create":