utils.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. from typing import List, Tuple, Union, Optional
  2. from collections import namedtuple
  3. import os
  4. from datetime import datetime
  5. from datetime import timedelta
  6. import statistics
  7. import argparse
  8. from celery.app.base import Celery
  9. from celery.schedules import crontab
  10. class InputDataFromUser:
  11. """ Входные данные от пользователя """
  12. def get_base_object_for_arguments(self) -> argparse.ArgumentParser:
  13. """ Получаем базовый объект для аргументов """
  14. parser = argparse.ArgumentParser()
  15. parser.add_argument(
  16. '-pf',
  17. '--parsing-frequency',
  18. type=str,
  19. choices=['daytime'],
  20. default='daytime',
  21. help='Частота парсинга'
  22. )
  23. parser.add_argument(
  24. '-wp',
  25. '--what-parse',
  26. type=str,
  27. choices=['all_products', 'specified_products'],
  28. default='specified_products',
  29. help='Что будем парсить?'
  30. )
  31. return parser
  32. def get_list_of_passed_arguments(self) -> argparse.Namespace:
  33. """ Получаем список из переданных аргументов """
  34. base_object_for_arguments = self.get_base_object_for_arguments()
  35. return base_object_for_arguments.parse_args()
  36. def get_time_to_start_tasks() -> Tuple[List[int], List[Tuple]]:
  37. """ Получаем время для запуска задач """
  38. def get_time_to_start_parsing_and_sending(
  39. file_content: str,
  40. what_time: str
  41. ) -> Tuple:
  42. """ Получаем время для запуска парсинга указанных задач и отправки статистики данных """
  43. list_of_hours = []
  44. minutes = 0
  45. if what_time == 'product_parsing':
  46. execution_time, _ = file_content.split('\n\n')
  47. elif what_time == 'sending_statistics':
  48. _, execution_time = file_content.split('\n\n')
  49. execution_time = execution_time.split('\n')[1].strip()
  50. ExecutionTime = namedtuple('ExecutionTime', ['hours', 'minutes'])
  51. for time in execution_time.split(','):
  52. hour, minute = time.split(':')
  53. list_of_hours.append(hour)
  54. minutes = minute
  55. execution_time = ExecutionTime(
  56. ','.join(list_of_hours),
  57. minutes
  58. )
  59. return execution_time
  60. # ВЫШЕ ОПРЕДЕЛЕНИЕ ФУНКЦИЙ
  61. path_to_file = os.path.abspath(
  62. 'user_data/settings.txt'
  63. )
  64. if not os.path.exists(path_to_file):
  65. raise ValueError('Не найдено файла user_data/settings.txt')
  66. with open(path_to_file) as file:
  67. file_content = file.read().strip()
  68. file_content = file_content.split(
  69. '-----------------------------'
  70. )[1].strip()
  71. time_to_start_parsing = get_time_to_start_parsing_and_sending(
  72. file_content,
  73. 'product_parsing'
  74. )
  75. sending_time_of_statistics = get_time_to_start_parsing_and_sending(
  76. file_content,
  77. 'sending_statistics'
  78. )
  79. return (time_to_start_parsing, sending_time_of_statistics)
  80. def run_necessary_task(app: Celery) -> None:
  81. """ Запускаем необходимые задачи """
  82. time_to_start_parsing, sending_time_of_statistics = get_time_to_start_tasks()
  83. list_of_passed_arguments = InputDataFromUser().get_list_of_passed_arguments()
  84. match list_of_passed_arguments.what_parse:
  85. case 'specified_products':
  86. app.conf.beat_schedule = {
  87. 'get-statistics-of-specified-products': {
  88. 'task': 'celery_app.get_statistics_of_specified_products',
  89. 'schedule': crontab(
  90. hour=time_to_start_parsing.hours,
  91. minute=time_to_start_parsing.minutes
  92. ),
  93. },
  94. }
  95. def get_list_of_links_of_prouducts_to_parsed() -> List[str]:
  96. """ Получаем список ссылок продуктов для парсинга """
  97. path_to_file = os.path.abspath(
  98. 'user_data/list_of_products_for_parsing.txt'
  99. )
  100. with open(path_to_file) as file:
  101. file_content = file.read().strip()
  102. if not file_content:
  103. raise ValueError('Файл со списком продуктов для парсинга пуст')
  104. return [line.strip() for line in file_content.split('\n')]
  105. def show_runtime(action):
  106. """ Замеряем время выполнения работы какого-то действия """
  107. def wrapper(func):
  108. def inner_wrapper(*args, **kwargs):
  109. start_time: datetime = datetime.now()
  110. start_time_str: str = start_time.strftime('%H:%M %d-%m-%Y')
  111. print(f'[{start_time_str}] Запущено выполнение: {action}')
  112. result = func(*args, **kwargs)
  113. end_time: datetime = datetime.now()
  114. end_time_str: datetime = end_time.strftime('%H:%M %d-%m-%Y')
  115. result: timedelta = end_time - start_time
  116. print(f'[{end_time_str}] Закончено выполнение: {action}')
  117. hours = result.seconds // 3600
  118. minutes = (result.seconds // 60) % 60
  119. print(f'[*] На выполнение ушло {hours} ч. {minutes} м.')
  120. return result
  121. return inner_wrapper
  122. return wrapper
  123. def calculate_average_value(
  124. list_of_values: List[Union[int, float, str]],
  125. round_up: Optional[int] = None
  126. ) -> Union[int, float]:
  127. """ Считаем среднее значение какой-то последовательности """
  128. list_of_new_values: List[Union[float, int]] = []
  129. for value in list_of_values:
  130. if type(value) == str or value == 0: continue
  131. list_of_new_values.append(value)
  132. if not list_of_new_values: return 0
  133. value: Union[int, float] = statistics.mean(list_of_new_values)
  134. if round_up: value = round(value, round_up)
  135. return value