123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- from typing import List, Tuple, Union, Optional
- from collections import namedtuple
- import os
- from datetime import datetime
- from datetime import timedelta
- import statistics
- import argparse
- from celery.app.base import Celery
- from celery.schedules import crontab
- class InputDataFromUser:
- """ Входные данные от пользователя """
- def get_base_object_for_arguments(self) -> argparse.ArgumentParser:
- """ Получаем базовый объект для аргументов """
- parser = argparse.ArgumentParser()
- parser.add_argument(
- '-pf',
- '--parsing-frequency',
- type=str,
- choices=['daytime'],
- default='daytime',
- help='Частота парсинга'
- )
- parser.add_argument(
- '-wp',
- '--what-parse',
- type=str,
- choices=['all_products', 'specified_products'],
- default='specified_products',
- help='Что будем парсить?'
- )
- return parser
- def get_list_of_passed_arguments(self) -> argparse.Namespace:
- """ Получаем список из переданных аргументов """
- base_object_for_arguments = self.get_base_object_for_arguments()
- return base_object_for_arguments.parse_args()
- def get_time_to_start_tasks() -> Tuple[List[int], List[Tuple]]:
- """ Получаем время для запуска задач """
- def get_time_to_start_parsing_and_sending(
- file_content: str,
- what_time: str
- ) -> Tuple:
- """ Получаем время для запуска парсинга указанных задач и отправки статистики данных """
- list_of_hours = []
- minutes = 0
- if what_time == 'product_parsing':
- execution_time, _ = file_content.split('\n\n')
- elif what_time == 'sending_statistics':
- _, execution_time = file_content.split('\n\n')
- execution_time = execution_time.split('\n')[1].strip()
- ExecutionTime = namedtuple('ExecutionTime', ['hours', 'minutes'])
- for time in execution_time.split(','):
- hour, minute = time.split(':')
- list_of_hours.append(hour)
- minutes = minute
- execution_time = ExecutionTime(
- ','.join(list_of_hours),
- minutes
- )
- return execution_time
- # ВЫШЕ ОПРЕДЕЛЕНИЕ ФУНКЦИЙ
- path_to_file = os.path.abspath(
- 'user_data/settings.txt'
- )
- if not os.path.exists(path_to_file):
- raise ValueError('Не найдено файла user_data/settings.txt')
- with open(path_to_file) as file:
- file_content = file.read().strip()
- file_content = file_content.split(
- '-----------------------------'
- )[1].strip()
- time_to_start_parsing = get_time_to_start_parsing_and_sending(
- file_content,
- 'product_parsing'
- )
- sending_time_of_statistics = get_time_to_start_parsing_and_sending(
- file_content,
- 'sending_statistics'
- )
- return (time_to_start_parsing, sending_time_of_statistics)
- def run_necessary_task(app: Celery) -> None:
- """ Запускаем необходимые задачи """
- time_to_start_parsing, sending_time_of_statistics = get_time_to_start_tasks()
- list_of_passed_arguments = InputDataFromUser().get_list_of_passed_arguments()
- match list_of_passed_arguments.what_parse:
- case 'specified_products':
- app.conf.beat_schedule = {
- 'get-statistics-of-specified-products': {
- 'task': 'celery_app.get_statistics_of_specified_products',
- 'schedule': crontab(
- hour=time_to_start_parsing.hours,
- minute=time_to_start_parsing.minutes
- ),
- },
- }
- def get_list_of_links_of_prouducts_to_parsed() -> List[str]:
- """ Получаем список ссылок продуктов для парсинга """
- path_to_file = os.path.abspath(
- 'user_data/list_of_products_for_parsing.txt'
- )
- with open(path_to_file) as file:
- file_content = file.read().strip()
- if not file_content:
- raise ValueError('Файл со списком продуктов для парсинга пуст')
- return [line.strip() for line in file_content.split('\n')]
- def show_runtime(action):
- """ Замеряем время выполнения работы какого-то действия """
- def wrapper(func):
- def inner_wrapper(*args, **kwargs):
- start_time: datetime = datetime.now()
- start_time_str: str = start_time.strftime('%H:%M %d-%m-%Y')
- print(f'[{start_time_str}] Запущено выполнение: {action}')
- result = func(*args, **kwargs)
- end_time: datetime = datetime.now()
- end_time_str: datetime = end_time.strftime('%H:%M %d-%m-%Y')
- result: timedelta = end_time - start_time
- print(f'[{end_time_str}] Закончено выполнение: {action}')
- hours = result.seconds // 3600
- minutes = (result.seconds // 60) % 60
- print(f'[*] На выполнение ушло {hours} ч. {minutes} м.')
- return result
- return inner_wrapper
- return wrapper
- def calculate_average_value(
- list_of_values: List[Union[int, float, str]],
- round_up: Optional[int] = None
- ) -> Union[int, float]:
- """ Считаем среднее значение какой-то последовательности """
- list_of_new_values: List[Union[float, int]] = []
- for value in list_of_values:
- if type(value) == str or value == 0: continue
- list_of_new_values.append(value)
- if not list_of_new_values: return 0
- value: Union[int, float] = statistics.mean(list_of_new_values)
- if round_up: value = round(value, round_up)
- return value
|