FileManagement.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. # Imports
  7. import os
  8. import shutil
  9. import json
  10. import logging
  11. import ly_test_tools.environment.file_system as fs
  12. class FileManagement:
  13. """
  14. File Management static class: Handles file backup and restoration.
  15. Organizes backing up files into a single directory and
  16. mapping the back up files to their original files.
  17. Features accessible via the exposed decorator functions.
  18. """
  19. # Static variables
  20. MAX_BACKUPS = 10000 # Arbitrary number to cap same-file-name name generating
  21. backup_folder_name = ".backup"
  22. backup_folder_path = os.path.join(os.path.split(__file__)[0], backup_folder_name) # CWD plus backup folder name
  23. file_map_name = "_filebackup_map.json" # JSON file to store original to back up file mappings
  24. @staticmethod
  25. def _load_file_map():
  26. # type: () -> {str: str}
  27. """
  28. Loads the FileManagement's json file map.
  29. The returned dictionary has key: value sets such that:
  30. keys: full path to an original file currently being backed up
  31. values: name of the original file's backup file. Uses a numbering system for
  32. storing multiple file names that are identical but located in different directories.
  33. If there is no current json file or the file cannot be parsed, an empty dictionary is returned.
  34. :return: Dictionary to map original file's to their back up file names.
  35. """
  36. json_file_path = os.path.join(FileManagement.backup_folder_path, FileManagement.file_map_name)
  37. if os.path.exists(json_file_path):
  38. try:
  39. with open(json_file_path, "r") as f:
  40. return json.load(f)
  41. except ValueError:
  42. logging.info("Decoding JSON file {} failed. Empty dictionary used".format(json_file_path))
  43. return {}
  44. @staticmethod
  45. def _save_file_map(file_map):
  46. # type: ({str: str}) -> None
  47. """
  48. Saves the [file_map] dictionary as a json file.
  49. If [file_map] is empty, deletes the json file.
  50. :param file_map: A dictionary mapping original file paths to their back up file names.
  51. :return: None
  52. """
  53. file_path = os.path.join(FileManagement.backup_folder_path, FileManagement.file_map_name)
  54. if os.path.exists(file_path):
  55. fs.unlock_file(file_path)
  56. if len(file_map) > 0:
  57. with open(file_path, "w") as f:
  58. json.dump(file_map, f)
  59. fs.lock_file(file_path)
  60. else:
  61. fs.delete([file_path], True, False)
  62. @staticmethod
  63. def _next_available_name(file_name, file_map):
  64. # type: (str, {str: str}) -> str or None
  65. """
  66. Generates the next available backup file name using a FILE_NAME_x.ext naming convention
  67. where x is a number. Returns None if we've maxed out the numbering system.
  68. :param file_name: The base file to generate a back up file name for.
  69. :param file_map: The file_map for the FileManagement system
  70. :return: str
  71. """
  72. suffix, extension = file_name.split(".", 1)
  73. for i in range(FileManagement.MAX_BACKUPS):
  74. potential_name = suffix + "_" + str(i) + "." + extension
  75. if potential_name not in file_map.values():
  76. return potential_name
  77. return None
  78. @staticmethod
  79. def _backup_file(file_name, file_path):
  80. # type: (str, str) -> None
  81. """
  82. Backs up the [file_name] located at [file_path] into the FileManagement's backup folder.
  83. Leaves the backup file locked from writing privileges.
  84. :param file_name: The file's name that will be backed up
  85. :param file_path: The path to the file to back up
  86. :return: None
  87. """
  88. file_map = FileManagement._load_file_map()
  89. backup_path = FileManagement.backup_folder_path
  90. # If backup directory DNE, make one
  91. if not os.path.exists(backup_path):
  92. os.mkdir(backup_path)
  93. # Find my next storage name (myFile_1.txt)
  94. backup_file_name = FileManagement._next_available_name(file_name, file_map)
  95. if backup_file_name is None:
  96. # If _next_available_name returns None, we have backed up MAX_BACKUPS of files name [file_name]
  97. raise Exception(
  98. "FileManagement class ran out of backups per name. Max: {}".format(FileManagement.MAX_BACKUPS)
  99. )
  100. # If this backup file already exists, delete it.
  101. backup_storage_file = "{}.bak".format(os.path.normpath(os.path.join(backup_path, backup_file_name)))
  102. if os.path.exists(backup_storage_file):
  103. fs.delete([backup_storage_file], True, False)
  104. # Create backup file (myFile_1.txt.bak)
  105. original_file = os.path.normpath(os.path.join(file_path, file_name))
  106. fs.create_backup(original_file, backup_path, backup_file_name)
  107. # Update file map with new file
  108. file_map[original_file] = backup_file_name
  109. FileManagement._save_file_map(file_map)
  110. # Unlock original file to get it ready to be edited by the test
  111. fs.unlock_file(original_file)
  112. @staticmethod
  113. def _restore_file(file_name, file_path):
  114. # type: (str, str) -> None
  115. """
  116. Restores the [file_name] located at [file_path] which is backed up in FileManagement's backup folder.
  117. Locks [file_name] from writing privileges when done restoring.
  118. :param file_name: The Original file that was backed up, and now restored
  119. :param file_path: The location of the original file that was backed up
  120. :return: None
  121. """
  122. file_map = FileManagement._load_file_map()
  123. backup_path = FileManagement.backup_folder_path
  124. src_file = os.path.normpath(os.path.join(file_path, file_name))
  125. if src_file in file_map:
  126. backup_file_name = file_map[src_file]
  127. backup_file = "{}.bak".format(os.path.join(backup_path, backup_file_name))
  128. fs.unlock_file(src_file)
  129. if fs.restore_backup(src_file, backup_path, backup_file_name):
  130. fs.delete([backup_file], True, False)
  131. # Remove from file map
  132. del file_map[src_file]
  133. FileManagement._save_file_map(file_map)
  134. if not os.listdir(backup_path):
  135. # Delete backup folder if it's empty now
  136. os.rmdir(backup_path)
  137. @staticmethod
  138. def _find_files(file_names, root_dir, search_subdirs=False):
  139. # type: ([str], str, bool) -> {str:str}
  140. """
  141. Searches the [root_dir] directory tree for each of the file names in [file_names]. Returns a dictionary
  142. where keys = the entries in file_names, and their values are the paths they were found at.
  143. Raises a runtime warning if not exactly one copy of each file is found.
  144. :param file_names: A list of file_names to look for
  145. :param root_dir: The root directory to begin the search
  146. collect all file paths matching that file_name
  147. :param search_subdirs: Optional boolean flag for searching sub-directories of [parent_path]
  148. :return: A dictionary where keys are file names, and values are the file's path.
  149. """
  150. file_list = {}
  151. found_count = 0
  152. for file_name in file_names:
  153. file_list[file_name] = None
  154. if search_subdirs:
  155. # Search parent path for all file name arguments
  156. for dir_path, _, dir_files in os.walk(root_dir):
  157. for dir_file in dir_files:
  158. if dir_file in file_list.keys():
  159. if file_list[dir_file] is None:
  160. file_list[dir_file] = dir_path
  161. found_count += 1
  162. else:
  163. # Found multiple files with same name. Raise warning.
  164. raise RuntimeWarning("Found multiple files with the name {}.".format(dir_file))
  165. else:
  166. for dir_file in os.listdir(root_dir):
  167. if os.path.isfile(os.path.join(root_dir, dir_file)) and dir_file in file_names:
  168. file_list[dir_file] = root_dir
  169. not_found = [file_name for file_name in file_names if file_list[file_name] is None]
  170. if not_found:
  171. raise RuntimeWarning("Could not find the following files: {}".format(not_found))
  172. return file_list
  173. @staticmethod
  174. def _copy_file(src_file, src_path, target_file, target_path):
  175. # type: (str, str, str, str) -> None
  176. """
  177. Copies the [src_file] at located in [src_path] to the [target_file] located at [target_path].
  178. Leaves the [target_file] unlocked for reading and writing privileges
  179. :param src_file: The source file to copy (file name)
  180. :param src_path: The source file's path
  181. :param target_file: The target file to copy into (file name)
  182. :param target_path: The target file's path
  183. :return: None
  184. """
  185. target_file_path = os.path.join(target_path, target_file)
  186. src_file_path = os.path.join(src_path, src_file)
  187. if os.path.exists(target_file_path):
  188. fs.unlock_file(target_file_path)
  189. os.makedirs(target_path, exist_ok=True)
  190. shutil.copyfile(src_file_path, target_file_path)
  191. @staticmethod
  192. def file_revert(file_name, parent_path=None, search_subdirs=False):
  193. # type: (str, str, bool) -> function
  194. """
  195. Function decorator:
  196. Convenience version of file_revert_list for passing a single file
  197. Calls file_revert_list on a list one [file_name]
  198. :param file_name: The name of a file to backup (not path)
  199. :param parent_path: The root directory to search for the [file_names] (relative to the dev folder).
  200. Defaults to the project folder described by the inner function's workspace fixture
  201. :param search_subdirs: A boolean option for searching sub-directories for the files in [file_names]
  202. :return: function as per the function decorator pattern
  203. """
  204. return FileManagement.file_revert_list([file_name], parent_path, search_subdirs)
  205. @staticmethod
  206. def file_revert_list(file_names, parent_path=None, search_subdirs=False):
  207. # type: ([str], str, bool) -> function
  208. """
  209. Function decorator:
  210. Collects file names specified in [file_names] in the [parent_path] directory.
  211. Backs up these files before executing the parameter to the "wrap" function.
  212. If the search_subdirs flag is True, searches all subdirectories of [parent_path] for files.
  213. :param file_names: A list of file names (not paths)
  214. :param parent_path: The root directory to search for the [file_names] (relative to the dev folder).
  215. Defaults to the project folder described by the inner function's workspace fixture
  216. :param search_subdirs: A boolean option for searching sub-directories for the files in [file_names]
  217. :return: function as per the function decorator pattern
  218. """
  219. def wrap(func):
  220. # type: (function) -> function
  221. def inner(self, request, workspace, editor, launcher_platform):
  222. # type: (...) -> None
  223. """
  224. Inner function to wrap around decorated function. Function being decorated must match this
  225. functions parameter order.
  226. """
  227. root_path = parent_path
  228. if root_path is not None:
  229. root_path = os.path.join(workspace.paths.engine_root(), root_path)
  230. else:
  231. # Default to project folder (AutomatedTesting)
  232. root_path = workspace.paths.project()
  233. # Try to locate files
  234. try:
  235. file_list = FileManagement._find_files(file_names, root_path, search_subdirs)
  236. except RuntimeWarning as w:
  237. assert False, (
  238. w.args[0]
  239. + " Please check use of search_subdirs; make sure you are using the correct parent directory."
  240. )
  241. # Restore from backups
  242. for file_name, file_path in file_list.items():
  243. FileManagement._restore_file(file_name, file_path)
  244. # Create backups for each discovered path for each specified filename
  245. for file_name, file_path in file_list.items():
  246. FileManagement._backup_file(file_name, file_path)
  247. try:
  248. # Run the test
  249. func(self, request, workspace, editor, launcher_platform)
  250. finally:
  251. # Restore backups for each discovered path for each specified filename if they exist
  252. for file_name, file_path in file_list.items():
  253. FileManagement._restore_file(file_name, file_path)
  254. return inner
  255. return wrap
  256. @staticmethod
  257. def file_override(target_file, src_file, parent_path=None, search_subdirs=False):
  258. # type: (str, str, str, bool) -> function
  259. """
  260. Function decorator:
  261. Searches for [target_file] and [src_file] in [parent_path](relative to the project dev folder)
  262. and performs backup and file swapping. [target_file] is backed up and replaced with [src_file] before the
  263. decorated function executes.
  264. After the decorated function is executed [target_file] is restored to the state before being swapped.
  265. If [parent_path] is not specified, the project folder described by the current workspace will be used.
  266. If the search_subdirs flag is True, searches all subdirectories of [parent_path] for files.
  267. :param target_file: The name of the file being backed up and swapped into
  268. :param src_file: The name of the file to swap into [target_file]
  269. :param parent_path: The root directory to search for the [file_names] (relative to the dev folder).
  270. Defaults to the project folder described by the inner function's workspace fixture
  271. :param search_subdirs: A boolean option for searching sub-directories for the files in [file_names]
  272. :return: The decorated function
  273. """
  274. def wrap(func):
  275. def inner(self, request, workspace, editor, launcher_platform):
  276. """
  277. Inner function to wrap around decorated function. Function being decorated must match this
  278. functions parameter order.
  279. """
  280. root_path = parent_path
  281. if root_path is not None:
  282. root_path = os.path.join(workspace.paths.engine_root(), root_path)
  283. else:
  284. # Default to project folder (AutomatedTesting)
  285. root_path = workspace.paths.project()
  286. # Try to locate both target and source files
  287. try:
  288. file_list = FileManagement._find_files([target_file, src_file], root_path, search_subdirs)
  289. except RuntimeWarning as w:
  290. assert False, (
  291. w.args[0]
  292. + " Please check use of search_subdirs; make sure you are using the correct parent directory."
  293. )
  294. FileManagement._restore_file(target_file, file_list[target_file])
  295. FileManagement._backup_file(target_file, file_list[target_file])
  296. FileManagement._copy_file(src_file, file_list[src_file], target_file, file_list[target_file])
  297. try:
  298. # Run Test
  299. func(self, request, workspace, editor, launcher_platform)
  300. finally:
  301. FileManagement._restore_file(target_file, file_list[target_file])
  302. return inner
  303. return wrap