asset_database_utils.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. """
  2. Copyright (c) Contributors to the Open 3D Engine Project.
  3. For complete copyright and license terms please see the LICENSE at the root of this distribution.
  4. SPDX-License-Identifier: Apache-2.0 OR MIT
  5. """
  6. import binascii
  7. from dataclasses import dataclass, field
  8. import logging
  9. import sqlite3
  10. import os
  11. from typing import List
  12. import ly_test_tools.o3de.pipeline_utils as pipeline_utils
  13. # Index for ProductID in Products table in DB
  14. PRODUCT_ID_INDEX = 0
  15. logger = logging.getLogger(__name__)
  16. def do_select(asset_db_path, cmd):
  17. try:
  18. connection = sqlite3.connect(asset_db_path)
  19. # Get ProductID from database
  20. db_rows = connection.execute(cmd)
  21. return_result = db_rows.fetchall()
  22. connection.close()
  23. return return_result
  24. except sqlite3.Error as sqlite_error:
  25. print(f'select on db {asset_db_path} failed with exception {sqlite_error}')
  26. return []
  27. def get_active_platforms_from_db(asset_db_path) -> List[str]:
  28. """Returns a list of platforms that are active in the database, based on what jobs were run"""
  29. platform_rows = do_select(asset_db_path, f"select distinct Platform from Jobs")
  30. # Condense this into a single list of platforms.
  31. platforms = [platform[0] for platform in platform_rows]
  32. return platforms
  33. # Convert a source product path into a db product path
  34. # cache_platform/projectname/product_path
  35. def get_db_product_path(workspace, source_path, cache_platform):
  36. product_path = os.path.join(cache_platform, source_path)
  37. product_path = product_path.replace('\\', '/')
  38. return product_path
  39. def get_product_id(asset_db_path, product_name) -> str:
  40. # Get ProductID from database
  41. product_id = list(do_select(asset_db_path, f"SELECT ProductID FROM Products where ProductName='{product_name}'"))
  42. if len(product_id) == 0:
  43. return product_id # return empty list
  44. return product_id[0][PRODUCT_ID_INDEX] # Get product id from 'first' row
  45. # Retrieve a product_id given a source_path assuming the source is copied into the cache with the same
  46. # name or a product name without cache_platform or projectname prepended
  47. def get_product_id_from_relative(workspace, source_path, asset_platform):
  48. return get_product_id(workspace.paths.asset_db(), get_db_product_path(workspace, source_path, asset_platform))
  49. def get_missing_dependencies(asset_db_path, product_id) -> List[str]:
  50. return list(do_select(asset_db_path, f"SELECT * FROM MissingProductDependencies where ProductPK={product_id}"))
  51. def do_single_transaction(asset_db_path, cmd):
  52. try:
  53. connection = sqlite3.connect(asset_db_path)
  54. cursor = connection.cursor() # SQL cursor used for issuing commands
  55. cursor.execute(cmd)
  56. connection.commit() # Save changes
  57. connection.close()
  58. except sqlite3.Error as sqlite_error:
  59. print(f'transaction on db {asset_db_path} cmd {cmd} failed with exception {sqlite_error}')
  60. def clear_missing_dependencies(asset_db_path, product_id) -> None:
  61. do_single_transaction(asset_db_path, f"DELETE FROM MissingProductDependencies where ProductPK={product_id}")
  62. def clear_all_missing_dependencies(asset_db_path) -> None:
  63. do_single_transaction(asset_db_path, "DELETE FROM MissingProductDependencies;")
  64. @dataclass
  65. class DBProduct:
  66. product_name: str = None
  67. sub_id: int = 0
  68. asset_type: bytes = None
  69. @dataclass
  70. class DBJob:
  71. job_key: str = None
  72. builder_guid: bytes = None
  73. status: int = 0
  74. error_count: int = 0
  75. platform: str = None
  76. # Key: Product ID
  77. products: List[DBProduct] = field(default_factory=list)
  78. @dataclass
  79. class DBSourceAsset:
  80. source_file_name: str = None
  81. uuid: bytes = None
  82. scan_folder_key: str = field(compare=False, default=None)
  83. id: str = field(compare=False, default=None)
  84. # Key: Job ID
  85. jobs: List[DBJob] = field(default_factory=list)
  86. def compare_expected_asset_to_actual_asset(asset_db_path, expected_asset: DBSourceAsset, asset_path, cache_root):
  87. actual_asset = get_db_source_job_product_info(asset_db_path, asset_path, cache_root)
  88. assert expected_asset.uuid == actual_asset.uuid, \
  89. f"UUID for asset {expected_asset.source_file_name} is '{actual_asset.uuid}', but expected '{expected_asset.uuid}'"
  90. for expected_job in expected_asset.jobs:
  91. for actual_job in actual_asset.jobs:
  92. found_job = False
  93. if expected_job.job_key == actual_job.job_key and expected_job.platform == actual_job.platform:
  94. found_job = True
  95. assert expected_job == actual_job, \
  96. f"Expected job did not match actual job for key {expected_job.job_key} and platform {expected_job.platform}.\nExpected: {expected_job}\nActual: {actual_job}"
  97. # Remove the found job to speed up other searches.
  98. actual_asset.jobs.remove(actual_job)
  99. break
  100. assert found_job, f"For asset {expected_asset.source_file_name}, could not find job with key '{expected_job.job_key}' and platform '{expected_job.platform}'"
  101. # Don't assert on any actual jobs remaining, they could be for platforms not checked by this test, or new job keys not yet handled.
  102. def get_db_source_job_product_info(asset_db_path, filename, cache_root):
  103. source_db = get_source_info_from_filename(asset_db_path, filename)
  104. source = DBSourceAsset()
  105. source.source_file_name = filename
  106. source.id = source_db[0]
  107. source.scan_folder_key = source_db[1]
  108. source.uuid = binascii.hexlify(source_db[2])
  109. jobs_db = get_jobs_for_source(asset_db_path, source.id)
  110. for job_db in jobs_db:
  111. job_id = job_db[0]
  112. job = DBJob()
  113. job.job_key = job_db[1]
  114. job.platform = job_db[3]
  115. job.builder_guid = binascii.hexlify(job_db[4])
  116. job.status = job_db[5]
  117. job.error_count = job_db[6]
  118. job.warning_count = job_db[7]
  119. products_db = get_products_for_job(asset_db_path, job_id)
  120. for product_db in products_db:
  121. product = DBProduct()
  122. product.product_name = product_db[1]
  123. product.sub_id = product_db[2]
  124. product.asset_type = binascii.hexlify(product_db[3])
  125. job.products.append(product)
  126. source.jobs.append(job)
  127. return source
  128. def get_source_info_from_filename(asset_db_path, filename):
  129. sources = do_select(asset_db_path,
  130. f"SELECT SourceID,ScanFolderPK,SourceGuid FROM Sources where SourceName=\"{filename}\"")
  131. assert len(sources) == 1, f"Zero or multiple source assets found when only one was expected for '{filename}'"
  132. return sources[0]
  133. def get_jobs_for_source(asset_db_path, source_id):
  134. return do_select(asset_db_path,
  135. f"SELECT JobID,JobKey,Fingerprint,Platform,BuilderGuid,Status,ErrorCount,WarningCount FROM Jobs where SourcePK={source_id} order by JobKey")
  136. def get_products_for_job(asset_db_path, job_id):
  137. return do_select(asset_db_path,
  138. f"SELECT ProductID,ProductName,SubID,AssetType FROM Products where JobPK={job_id} order by ProductName")