general_utils.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. #
  2. # Copyright (c) Contributors to the Open 3D Engine Project.
  3. # For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. #
  5. # SPDX-License-Identifier: Apache-2.0 OR MIT
  6. #
  7. #
  8. # -------------------------------------------------------------------------
  9. """! @brief This module contains several common utilities/operations for Python when creating tools for the DCCsi. """
  10. ##
  11. # @file general_utilities.py
  12. #
  13. # @brief This module contains several helper functions useful when creating Python tool scripts.
  14. #
  15. # @section General Utilities Description
  16. # Python module containing many commonly needed helper functions for Python tool development in the DCCsi.
  17. # DCCsi (Digital Content Creation Scripting Interface)
  18. #
  19. # @section QT Utilities Notes
  20. # - Comments are Doxygen compatible
  21. # standard imports
  22. import sqlite3
  23. from sqlite3 import Error
  24. from pathlib import Path
  25. from box import Box
  26. import logging as _logging
  27. import json
  28. import re
  29. import os
  30. # module global scope
  31. from DccScriptingInterface.azpy import _PACKAGENAME
  32. _MODULENAME = f'{_PACKAGENAME}.general_utilities'
  33. _LOGGER = _logging.getLogger(_MODULENAME)
  34. _LOGGER.debug('Initializing: {0}.'.format({_MODULENAME}))
  35. # -------------------------------------------------------------------------
  36. def get_incremented_filename(name):
  37. last_number = re.compile(r'(?:[^\d]*(\d+)[^\d]*)+')
  38. number_found = last_number.search(name)
  39. if number_found:
  40. next_number = str(int(number_found.group(1)) + 1)
  41. start, end = number_found.span(1)
  42. name = name[:max(end - len(next_number), start)] + next_number + name[end:]
  43. return name
  44. ##############
  45. # VALIDATION #
  46. ##############
  47. def validate_markdown(target_path: Path) -> bool:
  48. with open(target_path) as markdown_file:
  49. for line in markdown_file:
  50. if '### File Under' in line:
  51. return True
  52. return False
  53. def validate_path(target_path: Path) -> Path:
  54. if target_path.is_file():
  55. return target_path
  56. return Path('')
  57. #######################
  58. # GETTERS AND SETTERS #
  59. #######################
  60. def get_json_data(target_path: Path) -> dict:
  61. try:
  62. with open(target_path) as json_file:
  63. return json.load(json_file)
  64. except IOError:
  65. raise Exception(f'File [{target_path}] not found.')
  66. def get_commented_json_data(target_path: Path) -> dict:
  67. try:
  68. with open(target_path) as json_file:
  69. data = json_file.read()
  70. data = re.sub("//.*", "", data, flags=re.MULTILINE)
  71. return json.loads(data)
  72. except IOError:
  73. raise Exception(f'File [{target_path}] not found.')
  74. def get_markdown_information(target_path: Path) -> dict:
  75. markdown = {}
  76. current_section = ''
  77. content_string = ''
  78. with open(target_path) as markdown_file:
  79. for line in markdown_file:
  80. heading = re.match(r'[#]+[ ]{1,}[\w\s]+', line)
  81. if heading:
  82. heading_parts = heading.group(0).split(' ')
  83. section = ' '.join(heading_parts[1:])
  84. if current_section:
  85. markdown[current_section] = content_string
  86. content_string = ''
  87. current_section = section.rstrip()
  88. else:
  89. content_string += line.rstrip()
  90. markdown[current_section] = content_string
  91. return markdown
  92. def get_clean_path(target_path) -> str:
  93. return target_path.replace('\\', '/')
  94. def get_files_by_name(base_directory: Path, target_file: str) -> list:
  95. file_list = []
  96. for (root, dirs, files) in os.walk(base_directory, topdown=True):
  97. root = Path(root)
  98. for file in files:
  99. if file == target_file:
  100. file_list.append(root / file)
  101. return file_list
  102. def set_json_data(target_path: Path, json_data: Box):
  103. with open(str(target_path), 'w') as json_file:
  104. json.dump(json_data, json_file, indent=4)
  105. ###########################
  106. # SQLITE DATABASE HELPERS #
  107. ###########################
  108. def get_database(database_path: str) -> sqlite3.Connection:
  109. try:
  110. db = sqlite3.connect(database_path)
  111. _LOGGER.info(f'DB Connection Made: {sqlite3.version}')
  112. return db
  113. except Error as e:
  114. _LOGGER.info(f'Database connection failed. Error: {e}')
  115. return None
  116. def create_table(db: sqlite3.Connection, cursor: sqlite3.Cursor, table_name: str, headers: list):
  117. header_string = ', '.join(headers)
  118. _LOGGER.info(f'CreatingTable [{table_name}]: {header_string}')
  119. execute_db_command(db, cursor, f"""CREATE TABLE IF NOT EXISTS {table_name} ({header_string})""")
  120. def remove_table(db: sqlite3.Connection, cursor: sqlite3.Cursor, table_name: str):
  121. execute_db_command(db, cursor, f"""DROP TABLE {table_name}""")
  122. def create_table_entry(db: sqlite3.Connection, cursor: sqlite3.Cursor, table_name: str, values: list):
  123. values_string = ', '.join(values)
  124. execute_db_command(db, cursor, f"""INSERT INTO {table_name} VALUES ({values_string})""")
  125. def remove_table_entry(db: sqlite3.Connection, cursor: sqlite3.Cursor, table_name: str, row_id: int):
  126. execute_db_command(db, cursor, f"""DELETE from {table_name} where id = {row_id}""")
  127. def execute_db_command(db: sqlite3.Connection, cursor: sqlite3.Cursor, command: str):
  128. _LOGGER.info(f'Executing: {command}')
  129. cursor.execute(command)
  130. db.commit()
  131. def get_table_values(cursor: sqlite3.Cursor, table: str) -> dict:
  132. table_dict = {}
  133. cursor.execute(f"""SELECT * from {table}""")
  134. values = cursor.fetchall()
  135. for value in values:
  136. table_dict[value[0]] = value[1]
  137. return table_dict
  138. def get_database_values(cursor: sqlite3.Cursor, tables: list) -> dict:
  139. database_values = {}
  140. for table in tables:
  141. cursor.execute(f"""SELECT * from {table}""")
  142. table_values = cursor.fetchall()
  143. database_values[table] = table_values
  144. return database_values