|
- #!/usr/bin/python3
- ###############################################################################
- # #
- # L3q - Light, light, lightweight queue #
- # Copyright (C) 2023-2024 Marcus Pedersén marcus.pedersen@slu.se #
- # #
- # This program is free software: you can redistribute it and/or modify #
- # it under the terms of the GNU General Public License as published by #
- # the Free Software Foundation, either version 3 of the License, or #
- # (at your option) any later version. #
- # #
- # This program is distributed in the hope that it will be useful, #
- # but WITHOUT ANY WARRANTY; without even the implied warranty of #
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
- # GNU General Public License for more details. #
- # #
- # You should have received a copy of the GNU General Public License #
- # along with this program. If not, see <http://www.gnu.org/licenses/>. #
- # #
- ###############################################################################
- from libl3q.daemon.daemon import L3QdConfig
- from libl3q.daemon.daemon import JobState, JobType, NodeState
- from libl3q.common.common import L3QAction
- from libl3q.daemon.l3qworker import L3QWorker
- from libl3q.daemon.sqlbackup import L3QBackup
- from libl3q.common.version import VERSION, ascii_l3q
- from libl3q.daemon import sql, queue
- from werkzeug import Response, Request
- from multiprocessing import Process
- from math import ceil
- import json
- import sys
- import pwd
- import os
- import getpass
- import string
- import random
- import logging
- import datetime
- import signal
- import socket
- import inspect
- import sqlite3
- from typing import Any, List
- class L3Qd:
- def __init__(self, conf: L3QdConfig) -> None:
- '''
- Init of l3q daemon.
- Init of needed resources for daemon to run.
- '''
- self.conf = conf
- self.conf.logger.info('Starting l3q daemon ...')
- # load validate_host
- self.validate = {}
- try:
- for r in self.conf.db_cur.execute('''SELECT * FROM validate_host'''):
- if(len(r) == 8):
- self.validate[r['key']] = r['l3qd_key']
- else:
- conf.logger.error('Row in validate_host has wrong data: {}'.format(tuple(r)))
- except Exception as e:
- print('Error reading validate_host table from database', file=sys.stderr)
- conf.logger.error('Reading validate_host table from database: {}'.format(e))
- exit(1)
-
-
- def despatch_request(self, request):
- '''
- Handles incomming requests and responds
- depending to type of action specified in
- request.
- If invalid request or request from non
- validated host an empty json dict is returned.
- '''
- if(request.method == 'POST' and request.path == '/'):
- try:
- data = json.loads(request.data)
- except Exception as e:
- print('Error decoding request.data from json')
- self.conf.logger.error('Decoding request.data from json: {}'.format(e))
- return Response(json.dumps({}))
-
- if(data.get('key') in self.validate):
- if(L3QAction.ADD_PARA == data.get('action')):
- resp = self.__add_para(data)
- elif(L3QAction.ADD_SEQ == data.get('action')):
- resp = self.__add_seq(data)
- elif(L3QAction.CANCEL_JOB == data.get('action')):
- resp = self.__cancel_job(data)
- elif(L3QAction.GET_QUEUE == data.get('action')):
- resp = self.__get_queue(data)
- elif(L3QAction.GET_JOB_INFO == data.get('action')):
- resp = self.__get_job_info(data)
- elif(L3QAction.GET_TASK_LIST == data.get('action')):
- resp = self.__get_task_list(data)
- elif(L3QAction.GET_TASK_INFO == data.get('action')):
- resp = self.__get_task_info(data)
- elif(L3QAction.GET_HISTORY == data.get('action')):
- resp = self.__get_history(data)
- elif(L3QAction.GET_NODE_STATUS == data.get('action')):
- resp = self.__get_node_status(data)
- elif(L3QAction.SET_NODE_OFFLINE == data.get('action')):
- resp = self.__set_node_offline(data)
- elif(L3QAction.SET_NODE_ONLINE == data.get('action')):
- resp = self.__set_node_online(data)
- else:
- resp = {}
- self.conf.logger.warning('Non valid action value from host: {}'.format(request.host))
- else:
- resp = {}
- self.conf.logger.warning('Non valid or missing key from host: {}'.format(request.host))
- # node post
- elif(request.method == 'POST' and request.path == '/node-l3qd'):
- try:
- data = json.loads(request.data)
- except Exception as e:
- print('Error decoding request.data from json')
- self.conf.logger.error('Decoding request.data from json: {}'.format(e))
- return Response(json.dumps({}))
- if(data.get('validate') in self.validate):
- if('data' in data and 'tasks' in data['data']['StatusTasks']):
- resp = self.__node_status_tasks(data['data']['StatusTasks']['tasks'], data['validate'])
- else:
- resp = {}
- self.conf.logger.warning('Wrong data in post from node: {}, data: {}'.format(request.host, data))
- else:
- resp = {}
- self.conf.logger.warning('Non valid or missing key from host: {}'.format(request.host))
- # end node
- else:
- resp = {}
- self.conf.logger.warning('Wrong request method: {} or wrong path: {} from client: {}'.format(request.method, request.path, request.host))
-
- return Response(json.dumps(resp))
-
- def __add_para(self, data):
- '''
- Call when L3QAction.ADD_PARA is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.ADD_PARA }
- if('tasks' in data and 'user' in data and 'name' in data
- and 'cores' in data and 'nodes' in data and 'depend' in data):
- dt_now = datetime.datetime.now()
- dt_now = dt_now.strftime('%Y-%m-%d %H:%M:%S')
- jobid = -1
- adjusting = ''
- nodes_needed = ceil(len(data.get('tasks'))/data.get('cores'))
- cores_needed = ceil(len(data.get('tasks'))/data.get('nodes'))
- if((nodes_needed * data.get('cores')) < (cores_needed * data.get('nodes'))):
- if(nodes_needed < data.get('nodes')):
- adjusting = 'Reduced the number of nodes from {} to {} as all nodes where not required.'.format(data.get('nodes'), nodes_needed)
- cores_needed = data.get('cores')
- else:
- nodes_needed = data.get('nodes')
- cores_needed = data.get('cores')
- else:
- if(cores_needed < data.get('cores')):
- adjusting = 'Reduced the number of cores from {} to {} as all cores where not required.'.format(data.get('cores'), cores_needed)
- nodes_needed = data.get('nodes')
- else:
- nodes_needed = data.get('nodes')
- cores_needed = data.get('cores')
-
- try:
- self.conf.db_cur.execute('SELECT total_cpus from node')
- no_nodes = 0
- for row in self.conf.db_cur:
- if(row['total_cpus'] != None):
- if(row['total_cpus'] >= cores_needed):
- no_nodes += 1
- if(no_nodes < cores_needed):
- resp['error'] = 'Error, failed to add parallel job, the amount of requested resources are not available in l3q'
- return resp
- except Exception as e:
- self.conf.logger.error('ADD_PARA: Getting cores for nodes from database: {}'.format(e))
- resp['error'] = 'Error, failed to add parallel job'
- return resp
- if(len(data.get('depend')) > 0):
- try:
- dep = data.get('depend').strip().split(',')
- sql_where = 'WHERE'
- for i, d in enumerate(dep):
- if(i >= len(dep) - 1):
- sql_where += ' id = {}'.format(d)
- else:
- sql_where += ' id = {} or'.format(d)
- sql = 'SELECT * FROM job {}'.format(sql_where)
- self.conf.db_cur.execute(sql)
- rows = self.conf.db_cur.fetchall()
- for r in rows:
- if(r['state'] == JobState.ERROR or
- r['state'] == JobState.CANCEL or
- r['state'] == JobState.CANCELED):
- resp['error'] = 'Error, failed to add parallel job, one or more dependent jobs are not in a valid state.'
- return resp
- if(len(rows) != len(dep)):
- resp['error'] = 'Error, failed to add parallel job, not all depenent jobs exist'
- return resp
- except Exception as e:
- self.conf.logger.error('ADD_PARA: Retrieving depend jobs from database: {}'.format(e))
- resp['error'] = 'Error, failed to add parallel job'
- return resp
- try:
- state = JobState.QUEUED
- if(len(data.get('depend')) > 0):
- state = JobState.DEPEND
- self.conf.db_cur.execute('''INSERT INTO job (name, type, init_date, hosts_alloc, cores_alloc, no_tasks_running, no_tasks_finished, no_tasks_err, depend, user, state)
- VALUES (?, ?, ?, ?, ?, 0, 0, 0, ?, ?, ?)''', (data.get('name'), JobType.PARALLEL, dt_now, nodes_needed, cores_needed, data.get('depend'), data.get('user'), state))
- jobid = self.conf.db_cur.lastrowid
- for t in data.get('tasks'):
- if(len(t) == 1):
- self.conf.db_cur.execute('''INSERT INTO task (command, workdir, jobid) VALUES (?, ?, ?)''', (t[0], "$HOME", jobid))
- elif(len(t) == 2):
- self.conf.db_cur.execute('''INSERT INTO task (command, workdir, jobid) VALUES (?, ?, ?)''', (t[0], t[1], jobid))
- else:
- self.conf.logger.error('ADD_SEQ: Failed to parse task from client: {}'.format(t))
- resp['error'] = 'Error, failed to add sequence job'
- return resp
- except Exception as e:
- self.conf.logger.error('ADD_PARA: Saving to database: {}'.format(e))
- resp['error'] = 'Error, failed to add parallel job'
- resp['response'] = 'Parallel job with jobid: {} successfully added to the l3q queue'.format(jobid)
- resp['response_id'] = jobid
- if(len(adjusting) > 0):
- resp['response'] += '\n{}'.format(adjusting)
- else:
- del data['key']
- self.conf.logger.error('ADD_PARA: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
-
- return resp
-
- def __add_seq(self, data):
- '''
- Call when L3QAction.ADD_SEQ is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.ADD_SEQ }
- if('tasks' in data and 'user' in data and 'name' in data and 'depend' in data):
- dt_now = datetime.datetime.now()
- dt_now = dt_now.strftime('%Y-%m-%d %H:%M:%S')
- jobid = -1
- if(len(data.get('depend')) > 0):
- try:
- dep = data.get('depend').strip().split(',')
- sql_where = 'WHERE'
- for i, d in enumerate(dep):
- if(i >= len(dep) - 1):
- sql_where += ' id = {}'.format(d)
- else:
- sql_where += ' id = {} or'.format(d)
- sql = 'SELECT * FROM job {}'.format(sql_where)
- self.conf.db_cur.execute(sql)
- rows = self.conf.db_cur.fetchall()
- for r in rows:
- if(r['state'] == JobState.ERROR or
- r['state'] == JobState.CANCEL or
- r['state'] == JobState.CANCELED):
- resp['error'] = 'Error, failed to add sequence job, one or more dependent jobs are not in a valid state.'
- return resp
- if(len(rows) != len(dep)):
- resp['error'] = 'Error, failed to add sequence job, not all depenent jobs exist'
- return resp
- except Exception as e:
- self.conf.logger.error('ADD_SEQ: Retrieving depend jobs from database: {}'.format(e))
- resp['error'] = 'Error, failed to add sequence job'
- return resp
- try:
- state = JobState.QUEUED
- if(len(data.get('depend')) > 0):
- state = JobState.DEPEND
- self.conf.db_cur.execute('''INSERT INTO job (name, type, init_date, hosts_alloc, cores_alloc, no_tasks_running, no_tasks_finished, no_tasks_err, depend, user, state)
- VALUES (?, ?, ?, 1, 1, 0, 0, 0, ?, ?, ?)''', (data.get('name'), JobType.SEQUENCE, dt_now, data.get('depend'), data.get('user'), state))
- jobid = self.conf.db_cur.lastrowid
- for t in data.get('tasks'):
- if(len(t) == 1):
- self.conf.db_cur.execute('''INSERT INTO task (command, workdir, jobid) VALUES (?, ?, ?)''', (t[0], "$HOME", jobid))
- elif(len(t) == 2):
- self.conf.db_cur.execute('''INSERT INTO task (command, workdir, jobid) VALUES (?, ?, ?)''', (t[0], t[1], jobid))
- else:
- self.conf.logger.error('ADD_SEQ: Failed to parse task from client: {}'.format(t))
- resp['error'] = 'Error, failed to add sequence job'
- return resp
-
- resp['response'] = 'Sequence job with jobid: {} successfully added to the l3q queue'.format(jobid)
- resp['response_id'] = jobid
- except Exception as e:
- self.conf.logger.error('ADD_SEQ: Saving to database: {}'.format(e))
- resp['error'] = 'Error, failed to add sequence job'
- else:
- del data['key']
- self.conf.logger.error('ADD_SEQ: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
-
- return resp
- def __cancel_job(self, data):
- '''
- Call when L3QAction.CANCEL_JOB is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.CANCEL_JOB}
- if('jobid' in data and 'user' in data):
- try:
- self.conf.db_cur.execute('SELECT * FROM job where id = ?', (data.get('jobid'),))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 0):
- resp['error'] = 'No job with jobid: {} found.'.format(data.get('jobid'))
- else:
- if(rows[0]['user'] == data.get('user') or data.get('user') == 'root'):
- if(rows[0]['state'] == JobState.RUNNING or rows[0]['state'] == JobState.QUEUED or rows[0]['state'] == JobState.DEPEND):
- self.conf.db_cur.execute('SELECT id,depend FROM job WHERE state = ? and depend LIKE ?', (JobState.DEPEND, "%{}%".format(data.get('jobid'))))
- depend_rows = self.conf.db_cur.fetchall()
- depend: list[int] = []
- for d in depend_rows:
- dep = d['depend'].strip()
- dep = dep.split(',')
- if(str(data.get('jobid')) in dep):
- depend.append(d['id'])
- if(len(depend) == 0):
- self.conf.db_cur.execute('UPDATE job SET state = ? WHERE id = ?', (JobState.CANCEL,data.get('jobid')))
- resp['response'] = 'Job with jobid: {} successfully canceled.'.format(data.get('jobid'))
- else:
- d_str = ""
- for d in depend:
- if(len(d_str) == 0):
- d_str += f"{d}"
- else:
- d_str += f",{d}"
-
- resp['error'] = 'Job with jobid: {} has depening jobs (id): {}\nDepending jobs must be cancelled first.'.format(data.get('jobid'), d_str)
- elif(rows[0]['state'] == JobState.CANCEL):
- resp['error'] = 'Job with jobid: {} is already in Cancel state and is cancelling.'.format(data.get('jobid'))
- else:
- resp['error'] = 'Job with jobid: {} is in state {} and can not be cancelled.'.format(data.get('jobid'), rows[0]['state'])
- else:
- resp['response'] = 'Permission denied to cancel job with id: {}'.format(data.get('jobid'))
- except Exception as e:
- self.conf.logger.error('CANCEL_JOB: Saving to database: {}'.format(e))
- resp['error'] = 'Error cancel job with jobid: {}'.format(data.get('jobid'))
- else:
- del data['key']
- self.conf.logger.error('CANCEL_JOB: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
-
- return resp
-
- def __get_queue(self, data):
- '''
- Call when L3QAction.GET_QUEUE is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.GET_QUEUE }
- queue = []
- try:
- self.conf.db_cur.execute('SELECT * FROM job WHERE (state = ? OR state = ? OR state = ? OR state = ?) order by id', (JobState.RUNNING, JobState.QUEUED, JobState.CANCEL, JobState.DEPEND))
- for row in self.conf.db_cur:
- r = {'jobid': row['id']}
- r['user'] = row['user']
- r['init_date'] = row['init_date']
- r['cores_alloc'] = row['cores_alloc']
- r['hosts_alloc'] = row['hosts_alloc']
- r['tasks'] = str(row['no_tasks_running']) +'/' + str(row['no_tasks_finished']) + '/' + str(row['no_tasks_err'])
- #r['nodes'] = row['nodes']
- r['state'] = row['state']
- r['depend'] = row['depend']
-
- if(row['name']):
- r['name'] = row['name']
- else:
- r['name'] = '-'
- if(row['start_date']):
- r['start_date'] = row['start_date']
- else:
- r['start_date'] = '-'
- c = self.conf.db_conn.execute('''SELECT *
- FROM task
- LEFT JOIN node ON task.nodeid = node.id
- LEFT JOIN validate_host ON node.validate = validate_host.id
- WHERE task.jobid = ? ORDER BY task.id''', (r['jobid'],))
- c_row = c.fetchall()
- if(len(c_row) > 0):
- r['tasks'] += ':' + str(len(c_row))
- else:
- r['tasks'] += ':-'
- nodes = set()
- for t in c_row:
- if(t['name']):
- nodes.add(t['name'])
- if(len(nodes) == 0):
- r['nodes'] = '-'
- else:
- str_n = ''
- for n in nodes:
- if(len(str_n) > 0 ):
- str_n += f",{n}"
- else:
- str_n = n
-
- r['nodes'] = str_n
-
-
- c.close()
-
- queue.append(r)
-
- except Exception as e:
- self.conf.logger.error('GET_QUEUE: Reading from database: {}'.format(e))
- resp['error'] = 'Error reading queue.'
- resp['queue'] = queue
- return resp
- def __get_job_info(self, data):
- '''
- Call when L3QAction.GET_JOB_INFO is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.GET_JOB_INFO }
- if('jobid' in data):
- try:
- self.conf.db_cur.execute('SELECT * FROM job WHERE id = ?', (data.get('jobid'),))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 0):
- resp['error'] = 'No job with jobid: {} found.'.format(data.get('jobid'))
- else:
- r = rows[0]
- resp['jobid'] = r['id']
- resp['init_date'] = r['init_date']
- resp['type'] = r['type']
- resp['hosts_alloc'] = r['hosts_alloc']
- resp['cores_alloc'] = r['cores_alloc']
- resp['no_tasks_running'] = r['no_tasks_running']
- resp['no_tasks_finished'] = r['no_tasks_finished']
- resp['no_tasks_err'] = r['no_tasks_err']
- resp['user'] = r['user']
- resp['state'] = r['state']
- resp['depend'] = r['depend']
- if(r['name']):
- resp['name'] = r['name']
- else:
- resp['name'] = '-'
- if(r['start_date']):
- resp['start_date'] = r['start_date']
- else:
- resp['start_date'] = '-'
- if(r['end_date']):
- resp['end_date'] = r['end_date']
- else:
- resp['end_date'] = '-'
- if(r['start_date'] and not r['end_date']):
- dt_now = datetime.datetime.now()
- start_d = datetime.datetime.strptime(r['start_date'], '%Y-%m-%d %H:%M:%S')
- resp['runtime'] = f"{dt_now-start_d}"
- elif(r['start_date'] and r['end_date']):
- start_d = datetime.datetime.strptime(r['start_date'], '%Y-%m-%d %H:%M:%S')
- end_d = datetime.datetime.strptime(r['end_date'], '%Y-%m-%d %H:%M:%S')
- resp['runtime'] = f"{end_d-start_d}"
- else:
- resp['runtime'] = '-'
- nodes = set()
-
- if(r['start_date']):
- c = self.conf.db_conn.execute(
- 'SELECT * FROM task LEFT JOIN node ON task.nodeid = node.id LEFT JOIN validate_host ON node.validate = validate_host.id WHERE task.jobid = ? ORDER BY seqno',
- (data.get('jobid'),))
- tasks = []
- for t in c:
- if(t['seqno']):
- task = { 'seq': t['seqno'] }
- else:
- task = { 'seq': '-' }
- task['taskid'] = t['id']
-
- if(t['exit_code'] != None):
- task['exit_code'] = t['exit_code']
- else:
- task['exit_code'] = '-'
- if(t['name']):
- task['node'] = t['name']
- nodes.add(t['name'])
- else:
- task['node'] = '-'
-
- task['command'] = t['command']
-
- if(t['start_time']):
- task['start_date'] = t['start_time']
- else:
- task['start_date'] = '-'
- if(data.get('details')):
- if(t['end_time']):
- task['end_date'] = t['end_time']
- else:
- task['end_date'] = '-'
- if(t['start_time'] and not t['end_time']):
- dt_now = datetime.datetime.now()
- start_d = datetime.datetime.strptime(t['start_time'], '%Y-%m-%d %H:%M:%S')
- task['runtime'] = f"{dt_now-start_d}"
- elif(t['start_time'] and t['end_time']):
- start_d = datetime.datetime.strptime(t['start_time'], '%Y-%m-%d %H:%M:%S')
- end_d = datetime.datetime.strptime(t['end_time'], '%Y-%m-%d %H:%M:%S')
- task['runtime'] = f"{end_d-start_d}"
- else:
- task['runtime'] = '-'
- if(t['workdir']):
- task['workdir'] = t['workdir']
- else:
- task['workdir'] = '-'
-
- if(t['stdout']):
- task['stdout'] = t['stdout']
- else:
- task['stdout'] = '-'
- if(t['stderr']):
- task['stderr'] = t['stderr']
- else:
- task['stderr'] = '-'
-
- tasks.append(task)
- resp['tasks'] = tasks
- if(len(nodes) == 0):
- resp['nodes'] = '-'
- else:
- str_n = ''
- for n in nodes:
- if(len(str_n) > 0 ):
- str_n += f",{n}"
- else:
- str_n = n
- resp['nodes'] = str_n
-
- except Exception as e:
- self.conf.logger.error('GET_JOB_INFO: Reading from database: {}'.format(e))
- resp['error'] = 'Error reading job.'
- else:
- self.conf.logger.error('GET_JOB_INFO: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
- return resp
-
- def __get_task_list(self, data):
- '''
- Call when L3QAction.GET_TASK_LIST is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.GET_TASK_LIST }
- if('jobid' in data):
- try:
- c = self.conf.db_conn.execute('SELECT * FROM task WHERE jobid = ? ORDER BY id', (data.get('jobid'),))
- rows = c.fetchall()
-
- if(len(rows) == 0):
- resp['error'] = 'No job with jobid: {} found.'.format(data.get('jobid'))
- else:
- resp['jobid'] = data.get('jobid')
- tasks = []
- for t in rows:
- if(t['start_time']):
- c = self.conf.db_conn.execute(
- 'SELECT * FROM task INNER JOIN node ON task.nodeid = node.id INNER JOIN validate_host ON node.validate = validate_host.id WHERE task.id = ? ORDER BY task.id',
- (t['id'],))
- t = c.fetchall()[0]
- if(t['seqno']):
- task = { 'seq': t['seqno'] }
- else:
- task = { 'seq': '-' }
- task['taskid'] = t['id']
-
- if(t['exit_code'] != None):
- task['exit_code'] = t['exit_code']
- else:
- task['exit_code'] = '-'
- if('name' in t.keys()):
- task['node'] = t['name']
- else:
- task['node'] = '-'
-
- task['command'] = t['command']
-
- if(t['start_time']):
- task['start_date'] = t['start_time']
- else:
- task['start_date'] = '-'
- if(t['end_time']):
- task['end_date'] = t['end_time']
- else:
- task['end_date'] = '-'
-
- if(t['start_time'] and not t['end_time']):
- dt_now = datetime.datetime.now()
- start_d = datetime.datetime.strptime(t['start_time'], '%Y-%m-%d %H:%M:%S')
- task['runtime'] = f"{dt_now-start_d}"
- elif(t['start_time'] and t['end_time']):
- start_d = datetime.datetime.strptime(t['start_time'], '%Y-%m-%d %H:%M:%S')
- end_d = datetime.datetime.strptime(t['end_time'], '%Y-%m-%d %H:%M:%S')
- task['runtime'] = f"{end_d-start_d}"
- else:
- task['runtime'] = '-'
- tasks.append(task)
- resp['tasks'] = tasks
-
- except Exception as e:
- self.conf.logger.error('GET_TASK_LIST: Reading from database: {}'.format(e))
- resp['error'] = 'Error reading tasks.'
- else:
- self.conf.logger.error('GET_TASK_LIST: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
- return resp
-
- def __get_task_info(self, data):
- '''
- Call when L3QAction.GET_TASK_INFO is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.GET_TASK_INFO }
- if('taskid' in data):
- try:
- self.conf.db_cur.execute('SELECT * FROM task WHERE id = ?', (data.get('taskid'),))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 0):
- resp['error'] = 'No task with taskid: {} found.'.format(data.get('taskid'))
- else:
- r = rows[0]
- if(r['start_time']):
- self.conf.db_cur.execute(
- 'SELECT * FROM task INNER JOIN node ON task.nodeid = node.id INNER JOIN validate_host ON node.validate = validate_host.id WHERE task.id = ? ORDER BY seqno',
- (data.get('taskid'),))
- c = self.conf.db_cur.fetchall()
- if(len(c) == 0):
- resp['error'] = 'No task with taskid: {} found.'.format(data.get('taskid'))
- return resp
- else:
- r = c[0]
- if(r['seqno']):
- resp['seq'] = r['seqno']
- else:
- resp['seq'] = '-'
- resp['taskid'] = r['id']
-
- if(r['exit_code'] != None):
- resp['exit_code'] = r['exit_code']
- else:
- resp['exit_code'] = '-'
-
- if('name' in r):
- resp['node'] = r['name']
- else:
- resp['node'] = '-'
-
- resp['command'] = r['command']
-
- if(r['start_time']):
- resp['start_date'] = r['start_time']
- else:
- resp['start_date'] = '-'
-
- if(r['end_time']):
- resp['end_date'] = r['end_time']
- else:
- resp['end_date'] = '-'
-
- if(r['start_time'] and not r['end_time']):
- dt_now = datetime.datetime.now()
- start_d = datetime.datetime.strptime(r['start_time'], '%Y-%m-%d %H:%M:%S')
- resp['runtime'] = f"{dt_now-start_d}"
- elif(r['start_time'] and r['end_time']):
- start_d = datetime.datetime.strptime(r['start_time'], '%Y-%m-%d %H:%M:%S')
- end_d = datetime.datetime.strptime(r['end_time'], '%Y-%m-%d %H:%M:%S')
- resp['runtime'] = f"{end_d-start_d}"
- else:
- resp['runtime'] = '-'
-
- if(r['workdir']):
- resp['workdir'] = r['workdir']
- else:
- resp['workdir'] = '-'
-
- if(r['unitname']):
- resp['unitname'] = r['unitname']
- else:
- resp['unitname'] = '-'
-
- if(r['active_state']):
- resp['active_state'] = r['active_state']
- else:
- resp['active_state'] = '-'
-
- if(r['sub_state']):
- resp['sub_state'] = r['sub_state']
- else:
- resp['sub_state'] = '-'
-
- if(r['exit_code']):
- resp['exit_code'] = r['exit_code']
- else:
- resp['exit_code'] = '-'
-
- if(r['memory_peak']):
- resp['memory_peak'] = r['memory_peak']
- else:
- resp['memory_peak'] = '-'
-
- if(r['jobid']):
- resp['jobid'] = r['jobid']
- else:
- resp['active_state'] = '-'
-
- if(r['stdout']):
- resp['stdout'] = r['stdout']
- else:
- resp['stdout'] = '-'
-
- if(r['stderr']):
- resp['stderr'] = r['stderr']
- else:
- resp['stderr'] = '-'
- except Exception as e:
- self.conf.logger.error('GET_TASK_INFO: Reading from database: {}'.format(e))
- resp['error'] = 'Error reading task.'
- else:
- self.conf.logger.error('GET_TASK_INFO: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
- return resp
-
- def __get_history(self, data):
- '''
- Call when L3QAction.GET_HISTORY is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.GET_HISTORY }
- history = []
- try:
- if('number' in data):
- self.conf.db_cur.execute('SELECT * FROM job WHERE not (state = ? OR state = ? OR state = ? OR state = ?) ORDER BY id DESC LIMIT ?', (JobState.RUNNING, JobState.QUEUED, JobState.CANCEL, JobState.DEPEND, data.get('number')))
- else:
- self.conf.db_cur.execute('SELECT * FROM job WHERE not (state = ? OR state = ? OR state = ? OR state = ?) ORDER BY id DESC', (JobState.RUNNING, JobState.QUEUED, JobState.CANCEL, JobState.DEPEND))
- for row in self.conf.db_cur:
- r = {'jobid': row['id']}
- r['user'] = row['user']
- r['init_date'] = row['init_date']
- r['cores_alloc'] = row['cores_alloc']
- r['hosts_alloc'] = row['hosts_alloc']
- r['tasks'] = str(row['no_tasks_running']) +'/' + str(row['no_tasks_finished']) + '/' + str(row['no_tasks_err'])
- r['state'] = row['state']
- r['depend'] = row['depend']
-
- if(row['name']):
- r['name'] = row['name']
- else:
- r['name'] = '-'
- if(row['start_date']):
- r['start_date'] = row['start_date']
- else:
- r['start_date'] = '-'
- if(row['end_date']):
- r['end_date'] = row['end_date']
- else:
- r['end_date'] = '-'
- c = self.conf.db_conn.execute('''SELECT *
- FROM task
- LEFT JOIN node ON task.nodeid = node.id
- LEFT JOIN validate_host ON node.validate = validate_host.id
- WHERE task.jobid = ? ORDER BY task.id''', (r['jobid'],))
- c_row = c.fetchall()
- if(len(c_row) > 0):
- r['tasks'] += ':' + str(len(c_row))
- else:
- r['tasks'] += ':-'
- nodes = set()
- for t in c_row:
- if(t['name']):
- nodes.add(t['name'])
- if(len(nodes) == 0):
- r['nodes'] = '-'
- else:
- str_n = ''
- for n in nodes:
- if(len(str_n) > 0 ):
- str_n += f",{n}"
- else:
- str_n = n
-
- r['nodes'] = str_n
-
- c.close()
-
- history.append(r)
-
- except Exception as e:
- self.conf.logger.error('GET_HISTORY: Reading from database: {}'.format(e))
- resp['error'] = 'Error reading history.'
- resp['history'] = history
- return resp
-
- def __get_node_status(self, data):
- '''
- Call when L3QAction.GET_NODE_STATUS is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.GET_NODE_STATUS }
- nodes = []
- try:
- self.conf.db_cur.execute('SELECT * FROM node INNER JOIN validate_host ON node.validate = validate_host.id order by name')
-
- for row in self.conf.db_cur:
- r = {'name': row['name']}
- if(row['total_cpus']):
- r['cores'] = row['total_cpus']
- else:
- r['cores'] = '-'
- if(row['alloc_cpus'] != None):
- r['cores_alloc'] = row['alloc_cpus']
- else:
- r['cores_alloc'] = '-'
- if(row['used_cpus'] != None):
- r['cores_in_use'] = row['used_cpus']
- else:
- r['cores_in_use'] = '-'
- if(row['total_memory']):
- r['ram'] = str(row['total_memory'])
- else:
- r['ram'] = '-'
- if(row['used_memory']):
- r['ram_used'] = str(row['used_memory'])
- else:
- r['ram_used'] = '-'
- if(row['total_swap']):
- r['total_swap'] = str(row['total_swap'])
- else:
- r['total_swap'] = '-'
- if(row['used_swap']):
- r['used_swap'] = str(row['used_swap'])
- else:
- r['used_swap'] = '-'
- if(row['state']):
- r['state'] = row['state']
- else:
- r['state'] = '-'
- nodes.append(r)
-
- except Exception as e:
- self.conf.logger.error('GET_NODE_STATUS: Reading from database: {}'.format(e))
- resp['error'] = 'Error reading node status.'
- resp['nodes'] = nodes
- return resp
- def __add_node(self, data):
- '''
- Call when L3QAction.ADD_NODE is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.ADD_NODE }
- if('node_name' in data):
- try:
- self.conf.db_cur.execute('SELECT * FROM node where name = ?', (data.get('node_name'),))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) != 0):
- resp['error'] = 'Node: {} does already exist.'.format(data.get('node_name'))
- else:
- self.conf.db_cur.execute('INSERT INTO node (name) values (?)', (data.get('node_name'),))
- resp['node'] = 'Node: {} successfully added.'.format(data.get('node_name'))
- except Exception as e:
- self.conf.logger.error('ADD_NODE: Saving to database: {}'.format(e))
- resp['error'] = 'Error saving node: {}'.format(data.get('node_name'))
- else:
- del data['key']
- self.conf.logger.error('ADD_NODE: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
-
- return resp
- def __remove_node(self, data):
- '''
- Call when L3QAction.REMOVE_NODE is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.REMOVE_NODE }
- if('node_name' in data):
- try:
- self.conf.db_cur.execute('SELECT * FROM node where name = ?', (data.get('node_name'),))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 0):
- resp['error'] = 'Node: {} does not exist.'.format(data.get('node_name'))
- else:
- self.conf.db_cur.execute('DELETE FROM node where name = (?)', (data.get('node_name'),))
- resp['node'] = 'Node: {} successfully removed.'.format(data.get('node_name'))
- except Exception as e:
- self.conf.logger.error('REMOVE_NODE: Deleting from database: {}'.format(e))
- resp['error'] = 'Error removing node: {}'.format(data.get('node_name'))
- else:
- del data['key']
- self.conf.logger.error('REMOVE_NODE: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
-
- return resp
- def __set_node_offline(self, data):
- '''
- Call when L3QAction.SET_NODE_OFFLINE is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.SET_NODE_OFFLINE }
- if('node_name' in data):
- try:
- self.conf.db_cur.execute('SELECT id FROM validate_host where name = ?', (data.get('node_name'),))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 0):
- resp['error'] = 'Node: {} does not exist.'.format(data.get('node_name'))
- else:
- for r in rows:
- self.conf.db_cur.execute("UPDATE node SET state = 'Maintenance draining' where validate = (?)", (r['id'],))
-
- resp['node'] = 'Node: {} successfully changed state to offline (Maintenance draining).'.format(data.get('node_name'))
- except Exception as e:
- self.conf.logger.error('SET_NODE_OFFLINE: Saving to database: {}'.format(e))
- resp['error'] = 'Error changing state to offline on node: {}'.format(data.get('node_name'))
- else:
- del data['key']
- self.conf.logger.error('SET_NODE_OFFLINE: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
-
- return resp
-
- def __set_node_online(self, data):
- '''
- Call when L3QAction.SET_NODE_ONLINE is requested.
- Will parse data dict and do required functionallity.
- Returns response dict.
- '''
- resp = {'action': L3QAction.SET_NODE_ONLINE }
- if('node_name' in data):
- try:
- self.conf.db_cur.execute('SELECT id FROM validate_host where name = ?', (data.get('node_name'),))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 0):
- resp['error'] = 'Node: {} does not exist.'.format(data.get('node_name'))
- else:
- for r in rows:
- self.conf.db_cur.execute("UPDATE node SET state = 'Soft Online' where validate = (?)", (r['id'],))
-
- resp['node'] = 'Node: {} successfully changed state to online (Soft Online).'.format(data.get('node_name'))
- except Exception as e:
- self.conf.logger.error('SET_NODE_ONLINE: Saving to database: {}'.format(e))
- resp['error'] = 'Error changing state to online on node: {}'.format(data.get('node_name'))
- else:
- del data['key']
- self.conf.logger.error('SET_NODE_ONLINE: wrong variables sent in request: {}'.format(data))
- resp['error'] = 'Error wrong variables sent in request'
-
- return resp
-
- def __send_task_to_node(self, key: str, hostname: str, ip: str, port: int, job: List[Any], task: List[Any]) -> bool:
- '''
- Creates the required json data
- to send to node. Send task to
- node and verify response from node.
- '''
-
- task = {
- 'taskid': task['id'],
- 'command': task['command'],
- 'working_directory': task['workdir']
- }
-
- node_job = {
- 'cmd':
- {
- 'ExecuteL3qdJob':
- {
- 'job':
- {
- 'jobid': job['id'],
- 'cores_alloc': job['cores_alloc'],
- 'memory_alloc': 0,
- 'exec_type': job['type'],
- 'tasks': [task],
- 'user': job['user'],
- 'group': '',
- }
- }
- },
- 'validate': key
- }
- # test
- print(f"Send task to node: {node_job}")
- try:
- json_data = json.dumps(node_job)
- except Exception as e:
- self.logger.error('Error encoding data into json string:\n{}, __send_task_to_node'.format(e))
- return False
- try:
- resp = requests.post('https://' + hostname + ':' + str(port) + '/l3qd/request',
- data=json_data, verify=False)
- resp.raise_for_status()
- except:
- try:
- resp = requests.post('https://' + ip + ':' + str(port) + '/l3qd/request',
- data=json_data, verify=False)
- resp.raise_for_status()
- except Exception as e:
- self.logger.error('Failed to send jobid: {} to node name: {}, {}, __send_task_to_node'
- .format(job['id'], hostname, e))
- return False
- try:
- data = json.loads(resp.text)
- except Exception as e:
- self.logger.error('Failed to parse response from node name: {} when sending job with id: {}, __send_task_to_node'
- .format(hostname, job['id']))
- return False
- if(not data):
- self.logger.error('Error response from node name: {} when sending job with id: {}\nEmpty respone from node, __send_task_to_node'
- .format(hostname, job['id']))
- return False
- elif('Error' in data['data']):
- self.logger.error('Error response from node name: {} when sending job with id: {}\nNode error: {}, __send_task_to_node'
- .format(hostname, job['id'], data['data']['Error']))
- return False
- elif(('TaskExecuted' in data['data']) and ('validate' in data)):
- if(data['validate'] in self.validate):
- if(data['data']['TaskExecuted']['jobid'] == job['id']):
- sent_tasks = []
- for t in node_job['tasks']:
- sent_tasks.append(t['taskid'])
-
- resp_tasks = data['data']['TaskExecuted']['task']
- if(sorted(sent_tasks) == sorted(resp_tasks)):
- return True
- else:
- self.logger.error('Wrong taskids: {} in answer from node name: {} when sending job with id: {} and taskids: {}, __send_task_to_node'
- .format(resp_tasks,
- hostname,
- job['id'],
- sent_tasks))
- return False
- else:
- self.logger.error('Wrong jobid: {} in answer from node name: {} when sending job with id: {}, __send_task_to_node'
- .format(data['data']['TaskExecuted']['jobid'],
- hostname,
- job['id']))
- return False
- else:
- self.logger.error('Validation failure from node name: {} when sending job with id: {}'
- .format(hostname, job['cmd']['ExecuteL3qdJob']['job']['jobid']))
- return False
-
- return False
-
- # def __prepare_jobs_to_send(self, jobs: list[queue.QRun]) -> None:
- # '''
- # Collect required data
- # from database and sends
- # jobs to specified nodes.
- # '''
- #
- # node_jobs: dict[str, Any] = {}
- # for j in jobs:
- # try:
- # self.conf.db_cur.execute('SELECT hosts_alloc,cores_alloc,type FROM job WHERE id = ?', (j.jobid,))
- # job = self.conf.db_cur.fetchall()
- # if(job[0].get('hosts_alloc') != len(j.node_ids)):
- # self.conf.logger.error(f'Wrong number of nodes: {len(j.node_ids)} returned from queue, required: {job[0].get("hosts_alloc")}, in send_job_to_nodes')
- # continue
- # except Exception as e:
- # self.conf.logger.error(f'Failed to get jobid: {j.jobid} from database: {e}, in send_job_to_nodes')
- # continue
- #
- # try:
- # self.conf.db_cur.execute('SELECT id,command,seqno,workdir FROM task WHERE jobid = ?', (j.jobid,))
- # tasks = self.conf.db_cur.fetchall()
- # except Exception as e:
- # self.conf.logger.error(f'Failed to get tasks for jobid: {j.jobid} from database: {e}, in send_job_to_nodes')
- # continue
- #
- # for i in j.node_ids:
- # try:
- # self.conf.db_cur.execute('SELECT hostname,ip FROM validate_host INNER JOIN node ON validate_host.id = node.validate WHERE node.id = ?)', (i,))
- # host = self.conf.db_cur.fetchall()
- # node_jobs[host[0]['hostname']] = {}
- # node_jobs[host[0]['hostname']]['ip'] = host[0]['ip']
- # except Exception as e:
- # self.conf.logger.error(f'Failed to get node with id: {i} from database: {e}, in send_job_to_nodes')
- # continue
- #
- #
- #
- # tasks_per_node = ceil(len(tasks)/len(node_jobs))
- #
- # c = 1
- # for n in node_jobs:
- # start = c - 1
- # end = c * tasks_per_node - 1
- #
- # if(start >= len(tasks)):
- # break
- #
- # elif(end >= len(tasks)):
- # node_jobs[n]['tasks'] = tasks[start:]
- # break
- # else:
- # node_jobs[n]['tasks'] = tasks[start:end]
- # c += 1
- #
- # # self.__send_to_node(hostname, ip, tasks)
- # # send to nodes
- # # if send to node success update start_date for job
- #
- # def __update_queue(self) -> None:
- # '''
- # Update configured queue.
- # Sends all queued and depending
- # jobs and node info to queue
- # object that returns jobs to
- # be launched on the nodes.
- # Send jobs to the nodes and
- # updates the database.
- # '''
- #
- # queue = eval(f"queue.{self.conf.queue}()")
- # job_queue = []
- # try:
- # self.conf.db_cur.execute('SELECT id,init_date,hosts_alloc,cores_alloc,depend,state FROM job WHERE state = ?', (JobState.QUEUED, JobState.DEPEND))
- # db_job_queue = self.conf.db_cur.fetchall()
- #
- # for j in db_job_queue:
- # if(j.get("state") == JobState.QUEUED):
- # qj = queue.QJob.from_dict(j)
- # if(qj != None):
- # job_queue.append(qj)
- # else:
- # depend = j.get("depend")
- #
- # if(depend != None):
- # all_depend_term = True
- # depend_err = False
- # depend_cancel = False
- # for d in depend.trim().split(','):
- # try:
- # self.conf.db_cur.execute('SELECT state FROM job WHERE id = ?"', (d,))
- # job = self.conf.db_cur.fetchall()
- #
- # if(len(job) == 0):
- # self.conf.logger.error('Failed to get job with jobid: {} from database'.format(d))
- # else:
- # for dj in job:
- # if(dj.get('state') != JobState.TERMINATED):
- # all_depend_term = False
- # if(dj.get('state') == JobState.ERROR or dj.get('state') == JobState.NODE_ERROR):
- # depend_err = True
- # elif(dj.get('state') == JobState.CANCEL or dj.get('state') == JobState.CANCELED):
- # depend_cancel = True
- # except Exception as e:
- # self.conf.logger.error('Failed to get depend jobs to jobid:{} from database: {}'.format(j.get("id"),e))
- #
- # if(all_depend_term):
- # qj = queue.QJob.from_dict(dj)
- # if(qj != None):
- # job_queue.append(qj)
- # elif(depend_err):
- # try:
- # self.conf.db_cur.execute('UPDATE job SET state = ? where jobid = ?', (JobState.ERROR, j.get('id')))
- # except Exception as e:
- # self.conf.logger.error(f'Failed to update job state for jobid: {j.get("id")} in database: {e}')
- # elif(depend_cancel):
- # try:
- # self.conf.db_cur.execute('UPDATE job SET state = ? where jobid = ?', (JobState.CANCELED, j.get('id')))
- # except Exception as e:
- # self.conf.logger.error(f'Failed to update job state for jobid: {j.get("id")} in database: {e}')
- # except Exception as e:
- # self.conf.logger.error(f'Failed to get all queued jobs from database: {e}')
- #
- #
- # online_nodes = []
- # try:
- # self.conf.db_cur.execute('SELECT id,total_cpus,alloc_cpus FROM node WHERE state = ?"', (NodeState.ONLINE,))
- #
- # nodes = self.conf.db_cur.fetchall()
- #
- # for n in nodes:
- # node = queue.QNode.from_dict(n)
- #
- # if(node != None):
- # online_nodes.append(node)
- #
- # except Exception as e:
- # self.conf.logger.error(f'Failed to get all online nodes from database: {e}')
- #
- #
- # run = queue.run_queue(job_queue, online_nodes)
- # self.__prepare_jobs_to_send(run)
-
- def __update_jobs(self):
- '''
- Updates status for running jobs.
- Should be run after node update
- of tasks.
- Goes though all tasks for running
- jobs and if all tasks for a job
- has terminated job status is updated.
- '''
- try:
- self.conf.db_cur.execute('SELECT * FROM job where state = ?', (JobState.RUNNING,))
- rows = self.conf.db_cur.fetchall()
- for row in rows:
- try:
- self.conf.db_cur.execute('SELECT * FROM task where jobid = ?', (row['id'],))
- tasks = self.conf.db_cur.fetchall()
- all_tasks_done = True
- failed_tasks = False
- no_tasks_finished = 0
- no_tasks_err = 0
- no_tasks_running = 0
- nodes = set()
- for t in tasks:
- nodes.add(t['nodeid'])
- if(t['sub_state'] == 'failed'):
- failed_tasks = True
- no_tasks_err += 1
- elif(t['sub_state'] == 'exited'):
- no_tasks_finished += 1
- elif(t['sub_state'] != 'exited'):
- all_tasks_done = False
- running = 0
- tot_run = len(tasks) - no_tasks_finished - no_tasks_err
- tot_alloc = row['hosts_alloc'] * row['cores_alloc']
- if(tot_alloc > tot_run):
- running = tot_run
- else:
- running = tot_alloc
- try:
- self.conf.db_cur.execute('UPDATE job SET no_tasks_running = ?,no_tasks_finished = ?,no_tasks_err =? where id = ?', (running,no_tasks_finished,no_tasks_err, row['id']))
- except Exception as e:
- self.conf.logger.error(f'Failed to update job task run/fin/err for jobid: {row["id"]} in database: {e}')
-
- try:
- if(all_tasks_done and not failed_tasks):
- self.conf.db_cur.execute('UPDATE job SET state = ? where jobid = ?', (JobState.TERMINATED, row['id']))
- elif(all_tasks_done and failed_tasks):
- self.conf.db_cur.execute('UPDATE job SET state = ? where jobid = ?', (JobState.ERROR, row['id']))
- except Exception as e:
- self.conf.logger.error(f'Failed to update job state for jobid: {row["id"]} in database: {e}')
- try:
- if(all_tasks_done):
- cores_alloc = row['cores_alloc']
- for n in nodes:
- self.conf.db_cur.execute('SELECT alloc_cpus FROM node WHERE id = ?', (n,))
- node_alloc = self.conf.db_cur.fetchall()
- if(len(node_alloc) != 1):
- self.conf.logger.error(f'Failed to get node allocation with node id {n} from database')
- continue
- alloc = node_alloc['alloc_cpus']
- alloc = alloc - cores_alloc
- if(alloc < 0):
- alloc = 0
- self.conf.db_cur.execute('UPDATE node SET alloc_cpus = ? where id = ?', (alloc, n))
- except Exception as e:
- self.conf.logger.error(f'Failed to update node allocation for jobid: {row["id"]} in database: {e}')
- except Exception as e:
- self.conf.logger.error(f'Failed to get all tasks for jobid: {row["id"]} from database: {e}')
- except Exception as e:
- self.conf.logger.error('Failed to get all running jobs from database: {}'.format(e))
-
- def __update_running_job(self, jobid: str, host_key: str) -> None:
- '''
- Updates running jobs.
- If there are still tasks that
- has not been sent to nodes
- this method sends a new task
- to node that just terminanted
- a task and is available for
- another task.
- '''
- # test
- print(f"uptading running task: {jobid}")
- try:
- self.conf.db_cur.execute('SELECT job.id,type, cores_alloc, user,task.nodeid AS nodeid FROM job INNER JOIN task ON job.id = task.jobid WHERE job.id = ?', (jobid,))
- job = self.conf.db_cur.fetchall()
- if(len(job) != 1):
- self.conf.logger.error(f'Job not found in database where jobid: {jobid}, in __update_running_jobs')
- else:
- try:
- self.conf.db_cur.execute('SELECT hostname,ip,port FROM validate_host INNER JOIN node ON validate_host.id = node.validate WHERE node.id = ?', (job[0]['nodeid'],))
- node = self.conf.db_cur.fetchall()
- except Exception as e:
- self.conf.logger.error(f'Failed to get node where previous task executed from database where jobid: {job[0]["id"]}, nodeid: {job[0]["nodeid"]}, in __update_running_para_jobs: {e}')
- try:
- self.conf.db_cur.execute('SELECT id,command,seqno,workdir FROM task WHERE task.jobid = ? AND start_time IS NULL LIMIT 1', (job[0]['id'],))
- unstarted_task = self.conf.db_cur.fetchall()
-
- if(len(unstarted_task) > 0 and len(node) > 0):
- task = [ unstarted_task[0] ]
- self.__send_task_to_node(host_key, node[0]['hostname'], node[0]['ip'], node[0]['port'], job[0], task)
- except Exception as e:
- self.conf.logger.error(f'Failed to get all non started tasks from database where jobid: {job_type[0]["id"]}, in __update_running_para_jobs: {e}')
- except Exception as e:
- self.conf.logger.error(f'Failed to get job from database where jobid: {jobid}, in __update_running_para_jobs: {e}')
-
- def __node_status_tasks(self, tasks: list[dict[str,str]], key: str) -> dict[Any, Any]:
- '''
- Updates database with task values received from node.
- Updates job status and queue if needed.
- '''
-
- resp: dict[Any,Any] = {}
- resp_err = ""
- # update values in db
- for t in tasks:
- try:
- self.conf.db_cur.execute('SELECT * FROM task where id = ? and jobid = ?', (t.get('taskid'), t.get('jobid')))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 0):
- resp['Err'] = 'Taskid: {}, Jobid: {} does not exist.'.format(t.get('taskid'), t.get('jobid'))
- else:
- if(t.get('sub_state') == "failed" or t.get('sub_state') == "exited"):
- self.conf.db_cur.execute(
- '''UPDATE task SET
- unitname = ?,
- start_time = ?,
- end_time = ?,
- mono_start_time = ?,
- mono_end_time = ?,
- active_state = ?,
- sub_state = ?,
- exit_code = ?,
- stdout = ?,
- stderr = ?
- where id = ? and jobid = ?''',
- (
- t.get('unitname'),
- t.get('start_time'),
- t.get('end_time'),
- t.get('mono_start_time'),
- t.get('mono_end_time'),
- t.get('active_state'),
- t.get('sub_state'),
- t.get('exit_code'),
- t.get('stdout'),
- t.get('stderr'),
- t.get('taskid'),
- t.get('jobid'),
- ))
- self.__update_running_job(t['jobid'], key)
- # create
- # self.__update_job_info(t.get('sub_state')), update task count exit, error aso
- # self.__update_node_info....., update alloc if job is done
- else:
- self.conf.db_cur.execute(
- '''UPDATE task SET
- unitname = ?,
- start_time = ?,
- end_time = ?,
- mono_start_time = ?,
- mono_end_time = ?,
- active_state = ?,
- sub_state = ?,
- exit_code = ?,
- main_pid = ?,
- memory_peak = ?,
- pids_peak = ?,
- stdout = ?,
- stderr = ?
- where id = ? and jobid = ?''',
- (
- t.get('unitname'),
- t.get('start_time'),
- t.get('end_time'),
- t.get('mono_start_time'),
- t.get('mono_end_time'),
- t.get('active_state'),
- t.get('sub_state'),
- t.get('exit_code'),
- t.get('main_pid'),
- t.get('memory_peak'),
- t.get('pids_peak'),
- t.get('stdout'),
- t.get('stderr'),
- t.get('taskid'),
- t.get('jobid'),
- ))
-
- except Exception as e:
- self.conf.logger.error('Failed to save task to database, taskid: {}, jobid: {}, {}'
- .format(t.get('taskid'), t.get('jobid'), e))
- if(len(resp_err) == 0):
- resp_err = f"Taskid: {t.get('taskid')}, Jobid: {t.get('jobid')}; "
- else:
- resp_err = resp_err + f"Taskid: {t.get('taskid')}, Jobid: {t.get('jobid')}; "
- if(len(resp_err) != 0):
- resp_err = "Failed to save tasks to database: " + resp_err
- self.__update_jobs()
-
- # Update and remove queue code, done in worker today
- #try:
- # c = self.conf.db_cur.execute('SELECT count(*) FROM node WHERE state = ?', (NodeState.ONLINE,))
- # c_row = c.fetchall()
- #
- # if(len(c_row) == 1):
- # if(len(self.update_from_hosts) >= c_row[0]['count(*)']):
- # # TODO
- # self.__update_jobs()
- # #self.__update_queue()
- # self.update_from_hosts.clear()
- # else:
- # # TODO
- # self.__update_jobs()
- # #self.__update_queue()
- # self.update_from_hosts.clear()
- #
- #except Exception as e:
- # self.conf.logger.error('Failed to get all nodes with state online from database, {}'.format(e))
- # # TODO
- # self.__update_jobs()
- # #self.__update_queue()
- # self.update_from_hosts.clear()
- resp['validate'] = self.validate[key]
- if(len(resp_err) != 0):
- resp['result'] = { 'Err': resp_err }
- else:
- resp['result'] = { 'Ok': 'Tasks successfully updated' }
-
- return resp
-
- def wsgi_app(self, environ, start_response):
- '''
- wsgi app
- Called on each request
- '''
- request = Request(environ)
- response = self.despatch_request(request)
- return response(environ, start_response)
- def __call__(self, environ, start_response):
- '''Called on each request'''
- return self.wsgi_app(environ, start_response)
- class ValidateD:
- def __init__(self, conf):
- '''
- Init of validation daemon.
- Prompt for a temporary password to be
- used from client side.
- '''
- self.conf = conf
-
- print('')
- print('Validate server is running on port: {}'.format(self.conf.port))
- print('')
- print('#### INSTRUCTIONS ####')
- print('Specify temporary password that should be used on the client host')
- print('when promped for.')
- pw = getpass.getpass('Enter temporary password: ')
- self.password = pw
- print('')
- print('Go to client host or compute node and run as root:')
- print('Client: l3q validate --port {}'.format(self.conf.port))
- print('Node: node-l3qd --validate')
- print('Enter the same password at the prompt.')
- print('')
- print('When command prints: Validation SUCCESS!!')
- print('Validation is done and you can enter CTRL+C to stop this validation server.')
- print('')
- def despatch_request(self, request):
- '''
- Handles incomming requests and responds
- depending if sent password was correct.
- If invalid request an empty json dict is
- returned.
- '''
- #Validation request from client
- if(request.method == 'POST' and request.path == '/'):
- try:
- data = json.loads(request.data)
- except Exception as e:
- print('Error decoding request.data from json')
- self.conf.logger.error('ValidateD: Decoding request.data from json: {}'.format(e))
- return Response(json.dumps({}))
- if(L3QAction.VALIDATE_KEY == data.get('action')):
- if(data.get('password') == self.password):
- source = string.ascii_letters + string.digits
- key = ''.join((random.choice(source) for i in range(50)))
- resp = {}
- host = request.remote_addr
- hostname = socket.gethostbyaddr(host)[0]
- name = data.get('hostname')
-
- try:
- self.conf.db_cur.execute('''SELECT * FROM validate_host where ip = ? AND host_type = ?''', (host, "client"))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) != 0):
- self.conf.db_cur.execute('UPDATE validate_host SET key = ?, hostname = ?, name = ? WHERE ip = ?;', (key, hostname, name, host))
- else:
- self.conf.db_cur.execute('INSERT INTO validate_host (host_type, ip, hostname, name, key) values (?, ?, ?, ?, ?);', ("client", host, hostname, name, key))
- except Exception as e:
- resp['action'] = L3QAction.VALIDATE_KEY
- resp['error'] = 'Error saving to database on validate server.'
- print('Error saving to database on validate server.', file=sys.stderr)
- self.conf.logger.error('ValidateD: Saving to database: {}'.format(e))
- else:
- resp['action'] = L3QAction.VALIDATE_KEY
- resp['key'] = key
- print('Validation SUCCESS with host: {}'.format(host))
- print('Restart the l3q daemon: l3qd to use the new configuration')
- else:
- resp = {}
- resp['action'] = L3QAction.VALIDATE_KEY
- resp['error'] = "Wrong password"
- print('Wrong password from host: {}'.format(request.host.split(':')[0]), file=sys.stderr)
- else:
- resp = {}
- self.conf.logger.warning('ValidateD: Wrong json data from client: {}, data: {}'.format(request.host, data))
- # Validation request from node-l3qd
- elif(request.method == 'POST' and request.path == '/validate'):
- try:
- data = json.loads(request.data)
- except Exception as e:
- print('Error decoding request.data from json')
- self.conf.logger.error('ValidateD: Decoding request.data from json: {}'.format(e))
- return Response(json.dumps({}))
- if(L3QAction.VALIDATE_KEY == data.get('action')):
- if(data.get('password') == self.password):
- source = string.ascii_letters + string.digits
- l3qd_key = ''.join((random.choice(source) for i in range(50)))
- key = data.get('validate')
- resp = {}
- host = request.remote_addr
- hostname = socket.gethostbyaddr(host)[0]
- name = data.get('hostname')
- port = data.get('port')
- try:
- self.conf.db_cur.execute('''SELECT * FROM validate_host where ip = ? AND host_type = ?''', (host, "node"))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) != 0):
- self.conf.db_cur.execute('UPDATE validate_host SET key = ?, l3qd_key = ?, hostname = ?, port = ?, name = ? WHERE ip = ? AND host_type = ?;',
- (key, l3qd_key, hostname, port, name, host, "node"))
- else:
- self.conf.db_cur.execute('INSERT INTO validate_host (host_type, ip, hostname, port, name, key, l3qd_key) values (?, ?, ?, ?, ?, ?, ?);',
- ("node", host, hostname, port, name, key, l3qd_key))
- self.conf.db_cur.execute('''SELECT id FROM validate_host where key = ? AND l3qd_key = ?''', (key, l3qd_key))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 1):
- val_id = rows[0]['id']
- self.conf.db_cur.execute('SELECT * FROM node WHERE validate = ?',
- (val_id,))
- rows = self.conf.db_cur.fetchall()
- if(len(rows) == 1):
- self.conf.db_cur.execute('UPDATE node SET state = ? WHERE validate = ?',
- (NodeState.OFFLINE, rows[0]['id']))
- else:
- self.conf.db_cur.execute('INSERT INTO node (state, validate) VALUES (?, ?)',
- (NodeState.OFFLINE, val_id))
- else:
- print('Error fetching newly created record from database on validate server.', file=sys.stderr)
- self.conf.logger.error('ValidateD: Error fetching newly created validate record in database')
- except Exception as e:
- resp['validate'] = { 'Err': 'Error saving to database on validate server.' }
- print('Error saving to database on validate server.', file=sys.stderr)
- self.conf.logger.error('ValidateD: Saving to database: {}'.format(e))
- else:
- resp['validate'] = { 'Ok': l3qd_key }
- print('Validation SUCCESS with host: {}'.format(host))
- print('Restart the l3q daemon: l3qd to use the new configuration')
- else:
- resp = {}
- resp['validate'] = { 'Err': "Wrong password" }
- print('Wrong password from node: {}'.format(request.host.split(':')[0]), file=sys.stderr)
- else:
- resp = {}
- self.conf.logger.warning('ValidateD: Wrong json data from node: {}, data: {}'.format(request.host, data))
- else:
- resp = {}
- self.conf.logger.warning('ValidateD: Wrong request method: {} or wrong path: {} from host: {}'.format(request.method, request.path, request.host))
-
- return Response(json.dumps(resp))
- def wsgi_app(self, environ, start_response):
- '''
- wsgi app
- Called on each request
- '''
- request = Request(environ)
- response = self.despatch_request(request)
- return response(environ, start_response)
- def __call__(self, environ, start_response):
- '''Called on each request'''
- return self.wsgi_app(environ, start_response)
- def create_app(conf):
- return L3Qd(conf)
- def start_validate_server(conf):
- '''
- Will try to start the validate server.
- Will start to try on a port number one
- higher then l3q daemon and continue
- to try and increase port number with one.
- Until it works or if it has failed 20 times
- it will exit with an error.
- '''
- from werkzeug.serving import run_simple
- logging.getLogger('werkzeug').disabled = True
- conf.port += 1
- start_port = conf.port
-
- while(conf.port <= start_port + 20):
- try:
- print('')
- print('Starting validate server on port: {} ...'.format(conf.port))
- run_simple('', conf.port, ValidateD(conf), use_debugger=False, ssl_context='adhoc')
- break
- except Exception as e:
- print('Failed to start server on port {}.'.format(conf.port), file=sys.stderr)
- print('{}'.format(e))
- conf.port += 1
- if(conf.port >= start_port + 21):
- print('', file=sys.stderr)
- print('Error starting validate server', file=sys.stderr)
- print('Failed to start server on all ports in range: {} - {}'.format(start_port, conf.port - 1), file=sys.stderr)
- exit(1)
- def list_validate_host(conf):
- '''
- List IP numbers of all validated hosts
- '''
- try:
- rows = conf.db_cur.execute('SELECT * FROM validate_host ORDER BY id')
- headline = '=' * 91
- line = '-' * 91
- print(headline)
- sfrmt='| {:^6s} | {:^9s} | {:17s} | {:30s} | {}'
- print(sfrmt.format('Id', 'Host type', 'IP', 'Name', 'Hostname'))
- print(headline)
- for row in rows:
- print(sfrmt.format(str(row['id']), row['host_type'],
- row['ip'], row['name'], row['hostname']))
- print(line)
- except Exception as e:
- print('Error retrieving validated IP number', file=sys.stderr)
- print(e, file=sys.stderr)
- exit(2)
-
- def rm_validate_host(hostid, conf):
- '''
- Remove host with ID number from
- validated hosts.
- '''
- try:
- rows = conf.db_cur.execute('DELETE FROM validate_host where id = ?', (hostid,))
- no_del = conf.db_cur.rowcount
- if(no_del > 0):
- print('Deleted {} host from validated hosts, with ID: {}.'.format(no_del, hostid))
- else:
- print('Failed to delete ID number: {} from validated hosts.'.format(hostid), file=sys.stderr)
- exit(2)
- except Exception as e:
- print('Error deleting ID number: {} from validated hosts.'.format(hostid), file=sys.stderr)
- print(e, file=sys.stderr)
- exit(3)
-
-
- def main():
- '''
- Main function where program starts.
- Parse args and starts daemon in different
- modes depending on arguments.
- '''
- from werkzeug.serving import run_simple
- conf = ''
- if(len(sys.argv) == 2 and sys.argv[1] == '--validate-host'):
- conf = L3QdConfig()
- '''User must be root or daemon user to run this command.'''
- usr = pwd.getpwuid(os.getuid())
- if(not ((usr[0] == 'root' and usr[2] == 0) or usr[0] == L3QdConfig.validate_user)):
- print('Permission denied.', file=sys.stderr)
- exit(2)
-
- start_validate_server(conf)
- exit(0)
- elif(len(sys.argv) == 2 and sys.argv[1] == '--list-validate-host'):
- '''User must be root or daemon user to run this command.'''
- usr = pwd.getpwuid(os.getuid())
- if(not ((usr[0] == 'root' and usr[2] == 0) or usr[0] == L3QdConfig.validate_user)):
- print('Permission denied.', file=sys.stderr)
- exit(2)
- conf = L3QdConfig()
- list_validate_host(conf)
- exit(0)
- elif(len(sys.argv) == 2 and sys.argv[1] == '--rm-validate-host'):
- '''User must be root or daemon user to run this command.'''
- usr = pwd.getpwuid(os.getuid())
- if(not ((usr[0] == 'root' and usr[2] == 0) or usr[0] == L3QdConfig.validate_user)):
- print('Permission denied.', file=sys.stderr)
- exit(2)
- print('l3qd --rm-validate-host requires the ID number of host as argument.')
- exit(2)
- elif(len(sys.argv) == 3 and sys.argv[1] == '--rm-validate-host'):
- '''User must be root or daemon user to run this command.'''
- usr = pwd.getpwuid(os.getuid())
- if(not ((usr[0] == 'root' and usr[2] == 0) or usr[0] == L3QdConfig.validate_user)):
- print('Permission denied.', file=sys.stderr)
- exit(2)
- if(not sys.argv[2].isnumeric()):
- print('Argument must be a postive integer.')
- exit(3)
- if(sys.argv[2].isnumeric() and int(sys.argv[2]) <= 0):
- print('Argument must be a postive integer.')
- exit(4)
- conf = L3QdConfig()
- print('Removing validated host with ID number: {}'.format(sys.argv[2]))
- rm_validate_host(sys.argv[2], conf)
- exit(0)
- elif(len(sys.argv) == 2 and (sys.argv[1] == '--help' or sys.argv[1] == '-h')):
- print('Usage: l3qd [OPTION] ...')
- print('')
- print('Execute the l3q daemon. Starts in normal mode without arguments.')
- print('')
- print('Options:')
- print(' --validate-host Executes the validate daemon.')
- print(' Instructions will be printed at start up.')
- print(' --list-validate-host List all validated hosts.')
- print(' --rm-validate-host ID Removes specified validated host with specified ID.')
- print(' --queue-info Prints information about available queues.')
- print(' --version Prints version text and exit')
- print(' -h, --help Prints this help text and exit')
- print('')
- print('More information is available in man pages:')
- print('l3q(1), l3q.conf(5), l3qd(8), l3qd.conf(5)')
- exit(0)
- elif(len(sys.argv) == 2 and sys.argv[1] == '--test'):
- conf = L3QdConfig(True)
- elif(len(sys.argv) == 2 and sys.argv[1] == '--queue-info'):
- print("Queues available in L3Q")
- print("")
- print("To use a different queue algorithm")
- print("change the config file:")
- print("/etc/l3q/l3qd.conf")
- print("")
- print("Set parameter (default value):")
- print("queue = Default")
- print("to the name of one of the described queue types below.")
- print("")
- print("Queues:")
- queues = [m[0] for m in inspect.getmembers(queue, inspect.isclass)
- if m[1].__module__ == 'libl3q.daemon.queue' and
- issubclass(eval(f"queue.{m[0]}"), queue.Queue) and
- m[0] != 'Queue']
- for q in queues:
- print(q)
- doc = eval(f"queue.{q}.__doc__")
- print(doc)
- exit(0)
- elif(len(sys.argv) == 2 and sys.argv[1] == '--version'):
- ascii_l3q()
- print('l3qd {}'.format(VERSION))
- print('Copyright (C) 2023-2024 Marcus Pedersén')
- print('License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>.')
- print('This is free software: you are free to change and redistribute it.')
- print('There is NO WARRANTY, to the extent permitted by law.')
- print('')
- print('Written by Marcus Pedersén.')
- exit(0)
- elif(len(sys.argv) >= 2):
- print('Wrong arguments.')
- print('Try -h or --help for help.')
- exit(1)
- else:
- conf = L3QdConfig()
-
- worker = L3QWorker(conf)
- p_worker = Process(target=worker.run)
- p_worker.start()
- backup = L3QBackup(conf)
- p_backup = Process(target=backup.schedule_backup)
- p_backup.start()
- app = create_app(conf)
- try:
- if(len(sys.argv) == 2 and sys.argv[1] == '--test'):
- run_simple('', conf.port, app, use_debugger=True, ssl_context='adhoc')
- else:
- logging.getLogger('werkzeug').disabled = True
- run_simple('', conf.port, app, use_debugger=False, ssl_context='adhoc')
- conf.logger.info('L3q daemon terminated.')
- p_worker.terminate()
- p_worker.join()
- p_backup.terminate()
- p_backup.join()
- except Exception as e:
- print('Error starting l3q daemon', file=sys.stderr)
- print('{}'.format(e))
- p_worker.terminate()
- p_worker.join()
- exit(1)
-
- if __name__ == '__main__':
- main()
|