[Manage] cleanup briefly to reuse constants

This commit is contained in:
HappyZ 2023-05-31 18:40:07 -07:00
parent 74c2acbe3c
commit 6eb6f9645a
1 changed files with 53 additions and 27 deletions

View File

@ -4,6 +4,11 @@ import sqlite3
import fcntl import fcntl
import uuid import uuid
from utilities.constants import APIKEY
from utilities.constants import UUID
from utilities.constants import USERS_TABLE_NAME
from utilities.constants import HISTORY_TABLE_NAME
# Function to acquire a lock on the database file # Function to acquire a lock on the database file
def acquire_lock(lock_file): def acquire_lock(lock_file):
@ -21,15 +26,15 @@ def release_lock(lock_file):
USERS_TABLE_COLUMNS = [ USERS_TABLE_COLUMNS = [
"id INTEGER PRIMARY KEY AUTOINCREMENT", "id INTEGER PRIMARY KEY AUTOINCREMENT",
"username TEXT UNIQUE", "username TEXT UNIQUE",
"apikey TEXT", f"{APIKEY} TEXT",
"quota INT DEFAULT 50", "quota INT DEFAULT 50",
] ]
HISTORY_TABLE_COLUMNS = [ HISTORY_TABLE_COLUMNS = [
"uuid TEXT PRIMARY KEY", f"{UUID} TEXT PRIMARY KEY",
"created_at TIMESTAMP", "created_at TIMESTAMP",
"updated_at TIMESTAMP", "updated_at TIMESTAMP",
"apikey TEXT", f"{APIKEY} TEXT",
"priority INT", "priority INT",
"type TEXT", "type TEXT",
"status TEXT", "status TEXT",
@ -58,9 +63,9 @@ def create_or_update_table(c, table_name):
) )
existing_table = c.fetchone() existing_table = c.fetchone()
if table_name == "users": if table_name == USERS_TABLE_NAME:
target_columns = USERS_TABLE_COLUMNS target_columns = USERS_TABLE_COLUMNS
elif table_name == "history": elif table_name == HISTORY_TABLE_NAME:
target_columns = HISTORY_TABLE_COLUMNS target_columns = HISTORY_TABLE_COLUMNS
else: else:
target_columns = [] target_columns = []
@ -109,47 +114,59 @@ def modify_table(c, table_name, operation, column_name=None, data_type=None):
def create_user(c, username, apikey): def create_user(c, username, apikey):
"""Create a user with the given username and apikey, or update the apikey if the username already exists""" """Create a user with the given username and apikey, or update the apikey if the username already exists"""
c.execute("SELECT * FROM users WHERE username=?", (username,)) c.execute(f"SELECT * FROM {USERS_TABLE_NAME} WHERE username=?", (username,))
result = c.fetchone() result = c.fetchone()
if result is not None: if result is not None:
raise ValueError(f"found exisitng user {username}, please use update") raise ValueError(f"found exisitng user {username}, please use update")
else: else:
c.execute( c.execute(
"INSERT INTO users (username, apikey) VALUES (?, ?)", (username, apikey) f"INSERT INTO {USERS_TABLE_NAME} (username, {APIKEY}) VALUES (?, ?)",
(username, apikey),
) )
def update_user(c, username, apikey): def update_user(c, username, apikey):
"""Update the apikey for the user with the given username""" """Update the apikey for the user with the given username"""
c.execute("SELECT apikey FROM users WHERE username=?", (username,)) c.execute(f"SELECT {APIKEY} FROM {USERS_TABLE_NAME} WHERE username=?", (username,))
result = c.fetchone() result = c.fetchone()
if result is not None: if result is not None:
old_apikey = result[0] old_apikey = result[0]
c.execute("UPDATE history SET apikey=? WHERE apikey=?", (apikey, old_apikey)) c.execute(
c.execute("UPDATE users SET apikey=? WHERE username=?", (apikey, username)) f"UPDATE {HISTORY_TABLE_NAME} SET {APIKEY}=? WHERE {APIKEY}=?",
(apikey, old_apikey),
)
c.execute(
f"UPDATE {USERS_TABLE_NAME} SET {APIKEY}=? WHERE username=?",
(apikey, username),
)
else: else:
raise ValueError("username does not exist! create it first?") raise ValueError("username does not exist! create it first?")
def update_quota(c, apikey, quota): def update_quota(c, apikey, quota):
c.execute("SELECT username FROM users WHERE apikey=?", (apikey,)) c.execute(f"SELECT username FROM {USERS_TABLE_NAME} WHERE {APIKEY}=?", (apikey,))
result = c.fetchone() result = c.fetchone()
if result is not None: if result is not None:
c.execute("UPDATE users SET quota=? WHERE apikey=?", (quota, apikey)) c.execute(
f"UPDATE {USERS_TABLE_NAME} SET quota=? WHERE {APIKEY}=?", (quota, apikey)
)
raise ValueError(f"{apikey} does not exist") raise ValueError(f"{apikey} does not exist")
def update_username(c, apikey, username): def update_username(c, apikey, username):
c.execute("SELECT username FROM users WHERE apikey=?", (apikey,)) c.execute(f"SELECT username FROM {USERS_TABLE_NAME} WHERE {APIKEY}=?", (apikey,))
result = c.fetchone() result = c.fetchone()
if result is not None: if result is not None:
c.execute("UPDATE users SET username=? WHERE apikey=?", (username, apikey)) c.execute(
f"UPDATE {USERS_TABLE_NAME} SET username=? WHERE {APIKEY}=?",
(username, apikey),
)
def delete_user(c, username): def delete_user(c, username):
"""Delete the user with the given username, or ignore the operation if the user does not exist""" """Delete the user with the given username, or ignore the operation if the user does not exist"""
delete_jobs(c, username=username) delete_jobs(c, username=username)
c.execute("DELETE FROM users WHERE username=?", (username,)) c.execute(f"DELETE FROM {USERS_TABLE_NAME} WHERE username=?", (username,))
print(f"removed {c.rowcount} entries") print(f"removed {c.rowcount} entries")
@ -157,7 +174,7 @@ def delete_jobs(c, job_uuid="", username=""):
"""Delete the job with the given uuid, or ignore the operation if the uuid does not exist""" """Delete the job with the given uuid, or ignore the operation if the uuid does not exist"""
if username: if username:
c.execute( c.execute(
"SELECT img, ref_img, mask_img FROM history WHERE apikey=(SELECT apikey FROM users WHERE username=?)", f"SELECT img, ref_img, mask_img FROM {HISTORY_TABLE_NAME} WHERE apikey=(SELECT {APIKEY} FROM {USERS_TABLE_NAME} WHERE username=?)",
(username,), (username,),
) )
rows = c.fetchall() rows = c.fetchall()
@ -175,13 +192,13 @@ def delete_jobs(c, job_uuid="", username=""):
print(f"failed to remove {filepath}") print(f"failed to remove {filepath}")
raise raise
c.execute( c.execute(
"DELETE FROM history WHERE apikey=(SELECT apikey FROM users WHERE username=?)", f"DELETE FROM {HISTORY_TABLE_NAME} WHERE {APIKEY}=(SELECT {APIKEY} FROM {USERS_TABLE_NAME} WHERE username=?)",
(username,), (username,),
) )
print(f"removed {c.rowcount} entries") print(f"removed {c.rowcount} entries")
elif job_uuid: elif job_uuid:
c.execute( c.execute(
"SELECT img, ref_img, mask_img FROM history WHERE uuid=?", f"SELECT img, ref_img, mask_img FROM {HISTORY_TABLE_NAME} WHERE uuid=?",
(job_uuid,), (job_uuid,),
) )
result = c.fetchone() result = c.fetchone()
@ -197,7 +214,7 @@ def delete_jobs(c, job_uuid="", username=""):
print(f"failed to remove {filepath}") print(f"failed to remove {filepath}")
raise raise
c.execute( c.execute(
"DELETE FROM history WHERE uuid=?", f"DELETE FROM {HISTORY_TABLE_NAME} WHERE uuid=?",
(job_uuid,), (job_uuid,),
) )
print(f"removed {c.rowcount} entries") print(f"removed {c.rowcount} entries")
@ -207,15 +224,21 @@ def show_users(c, username="", details=False):
"""Print all users in the users table if username is not specified, """Print all users in the users table if username is not specified,
or only the user with the given username otherwise""" or only the user with the given username otherwise"""
if username: if username:
c.execute("SELECT username, apikey FROM users WHERE username=?", (username,)) c.execute(
f"SELECT username, {APIKEY} FROM {USERS_TABLE_NAME} WHERE username=?",
(username,),
)
user = c.fetchone() user = c.fetchone()
if user: if user:
c.execute("SELECT COUNT(*) FROM history WHERE apikey=?", (user[1],)) c.execute(
f"SELECT COUNT(*) FROM {HISTORY_TABLE_NAME} WHERE {APIKEY}=?",
(user[1],),
)
count = c.fetchone()[0] count = c.fetchone()[0]
print(f"Username: {user[0]}, API Key: {user[1]}, Number of jobs: {count}") print(f"Username: {user[0]}, API Key: {user[1]}, Number of jobs: {count}")
if details: if details:
c.execute( c.execute(
"SELECT uuid, created_at, updated_at, type, status, width, height, steps, img, ref_img, mask_img, is_private FROM history WHERE apikey=?", f"SELECT {UUID}, created_at, updated_at, type, status, width, height, steps, img, ref_img, mask_img, is_private FROM {HISTORY_TABLE_NAME} WHERE apikey=?",
(user[1],), (user[1],),
) )
rows = c.fetchall() rows = c.fetchall()
@ -224,15 +247,18 @@ def show_users(c, username="", details=False):
else: else:
print(f"No user with username '{username}' found") print(f"No user with username '{username}' found")
else: else:
c.execute("SELECT username, apikey FROM users") c.execute(f"SELECT username, {APIKEY} FROM {USERS_TABLE_NAME}")
users = c.fetchall() users = c.fetchall()
for user in users: for user in users:
c.execute("SELECT COUNT(*) FROM history WHERE apikey=?", (user[1],)) c.execute(
f"SELECT COUNT(*) FROM {HISTORY_TABLE_NAME} WHERE {APIKEY}=?",
(user[1],),
)
count = c.fetchone()[0] count = c.fetchone()[0]
print(f"Username: {user[0]}, API Key: {user[1]}, Number of jobs: {count}") print(f"Username: {user[0]}, API Key: {user[1]}, Number of jobs: {count}")
if details: if details:
c.execute( c.execute(
"SELECT * FROM history WHERE apikey=?", f"SELECT * FROM {HISTORY_TABLE_NAME} WHERE {APIKEY}=?",
(user[1],), (user[1],),
) )
rows = c.fetchall() rows = c.fetchall()
@ -260,8 +286,8 @@ def manage(args):
c = conn.cursor() c = conn.cursor()
# Create the users and history tables if they don't exist # Create the users and history tables if they don't exist
create_or_update_table(c, "users") create_or_update_table(c, USERS_TABLE_NAME)
create_or_update_table(c, "history") create_or_update_table(c, HISTORY_TABLE_NAME)
# Perform the requested action # Perform the requested action
if args.action == "create": if args.action == "create":