l3qd 86 KB


  1. #!/usr/bin/python3
  2. ###############################################################################
  3. # #
  4. # L3q - Light, light, lightweight queue #
  5. # Copyright (C) 2023-2024 Marcus Pedersén marcus.pedersen@slu.se #
  6. # #
  7. # This program is free software: you can redistribute it and/or modify #
  8. # it under the terms of the GNU General Public License as published by #
  9. # the Free Software Foundation, either version 3 of the License, or #
  10. # (at your option) any later version. #
  11. # #
  12. # This program is distributed in the hope that it will be useful, #
  13. # but WITHOUT ANY WARRANTY; without even the implied warranty of #
  14. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
  15. # GNU General Public License for more details. #
  16. # #
  17. # You should have received a copy of the GNU General Public License #
  18. # along with this program. If not, see <http://www.gnu.org/licenses/>. #
  19. # #
  20. ###############################################################################
  21. from libl3q.daemon.daemon import L3QdConfig
  22. from libl3q.daemon.daemon import JobState, JobType, NodeState
  23. from libl3q.common.common import L3QAction
  24. from libl3q.daemon.l3qworker import L3QWorker
  25. from libl3q.daemon.sqlbackup import L3QBackup
  26. from libl3q.common.version import VERSION, ascii_l3q
  27. from libl3q.daemon import sql, queue
  28. from werkzeug import Response, Request
  29. from multiprocessing import Process
  30. from math import ceil
  31. import json
  32. import sys
  33. import pwd
  34. import os
  35. import getpass
  36. import string
  37. import random
  38. import logging
  39. import datetime
  40. import signal
  41. import socket
  42. import inspect
  43. import sqlite3
  44. from typing import Any, List
  45. class L3Qd:
  46. def __init__(self, conf: L3QdConfig) -> None:
  47. '''
  48. Init of l3q daemon.
  49. Init of needed resources for daemon to run.
  50. '''
  51. self.conf = conf
  52. self.conf.logger.info('Starting l3q daemon ...')
  53. # load validate_host
  54. self.validate = {}
  55. try:
  56. for r in self.conf.db_cur.execute('''SELECT * FROM validate_host'''):
  57. if(len(r) == 8):
  58. self.validate[r['key']] = r['l3qd_key']
  59. else:
  60. conf.logger.error('Row in validate_host has wrong data: {}'.format(tuple(r)))
  61. except Exception as e:
  62. print('Error reading validate_host table from database', file=sys.stderr)
  63. conf.logger.error('Reading validate_host table from database: {}'.format(e))
  64. exit(1)
  65. def despatch_request(self, request):
  66. '''
  67. Handles incomming requests and responds
  68. depending to type of action specified in
  69. request.
  70. If invalid request or request from non
  71. validated host an empty json dict is returned.
  72. '''
  73. if(request.method == 'POST' and request.path == '/'):
  74. try:
  75. data = json.loads(request.data)
  76. except Exception as e:
  77. print('Error decoding request.data from json')
  78. self.conf.logger.error('Decoding request.data from json: {}'.format(e))
  79. return Response(json.dumps({}))
  80. if(data.get('key') in self.validate):
  81. if(L3QAction.ADD_PARA == data.get('action')):
  82. resp = self.__add_para(data)
  83. elif(L3QAction.ADD_SEQ == data.get('action')):
  84. resp = self.__add_seq(data)
  85. elif(L3QAction.CANCEL_JOB == data.get('action')):
  86. resp = self.__cancel_job(data)
  87. elif(L3QAction.GET_QUEUE == data.get('action')):
  88. resp = self.__get_queue(data)
  89. elif(L3QAction.GET_JOB_INFO == data.get('action')):
  90. resp = self.__get_job_info(data)
  91. elif(L3QAction.GET_TASK_LIST == data.get('action')):
  92. resp = self.__get_task_list(data)
  93. elif(L3QAction.GET_TASK_INFO == data.get('action')):
  94. resp = self.__get_task_info(data)
  95. elif(L3QAction.GET_HISTORY == data.get('action')):
  96. resp = self.__get_history(data)
  97. elif(L3QAction.GET_NODE_STATUS == data.get('action')):
  98. resp = self.__get_node_status(data)
  99. elif(L3QAction.SET_NODE_OFFLINE == data.get('action')):
  100. resp = self.__set_node_offline(data)
  101. elif(L3QAction.SET_NODE_ONLINE == data.get('action')):
  102. resp = self.__set_node_online(data)
  103. else:
  104. resp = {}
  105. self.conf.logger.warning('Non valid action value from host: {}'.format(request.host))
  106. else:
  107. resp = {}
  108. self.conf.logger.warning('Non valid or missing key from host: {}'.format(request.host))
  109. # node post
  110. elif(request.method == 'POST' and request.path == '/node-l3qd'):
  111. try:
  112. data = json.loads(request.data)
  113. except Exception as e:
  114. print('Error decoding request.data from json')
  115. self.conf.logger.error('Decoding request.data from json: {}'.format(e))
  116. return Response(json.dumps({}))
  117. if(data.get('validate') in self.validate):
  118. if('data' in data and 'tasks' in data['data']['StatusTasks']):
  119. resp = self.__node_status_tasks(data['data']['StatusTasks']['tasks'], data['validate'])
  120. else:
  121. resp = {}
  122. self.conf.logger.warning('Wrong data in post from node: {}, data: {}'.format(request.host, data))
  123. else:
  124. resp = {}
  125. self.conf.logger.warning('Non valid or missing key from host: {}'.format(request.host))
  126. # end node
  127. else:
  128. resp = {}
  129. self.conf.logger.warning('Wrong request method: {} or wrong path: {} from client: {}'.format(request.method, request.path, request.host))
  130. return Response(json.dumps(resp))
  131. def __add_para(self, data):
  132. '''
  133. Call when L3QAction.ADD_PARA is requested.
  134. Will parse data dict and do required functionallity.
  135. Returns response dict.
  136. '''
  137. resp = {'action': L3QAction.ADD_PARA }
  138. if('tasks' in data and 'user' in data and 'name' in data
  139. and 'cores' in data and 'nodes' in data and 'depend' in data):
  140. dt_now = datetime.datetime.now()
  141. dt_now = dt_now.strftime('%Y-%m-%d %H:%M:%S')
  142. jobid = -1
  143. adjusting = ''
  144. nodes_needed = ceil(len(data.get('tasks'))/data.get('cores'))
  145. cores_needed = ceil(len(data.get('tasks'))/data.get('nodes'))
  146. if((nodes_needed * data.get('cores')) < (cores_needed * data.get('nodes'))):
  147. if(nodes_needed < data.get('nodes')):
  148. adjusting = 'Reduced the number of nodes from {} to {} as all nodes where not required.'.format(data.get('nodes'), nodes_needed)
  149. cores_needed = data.get('cores')
  150. else:
  151. nodes_needed = data.get('nodes')
  152. cores_needed = data.get('cores')
  153. else:
  154. if(cores_needed < data.get('cores')):
  155. adjusting = 'Reduced the number of cores from {} to {} as all cores where not required.'.format(data.get('cores'), cores_needed)
  156. nodes_needed = data.get('nodes')
  157. else:
  158. nodes_needed = data.get('nodes')
  159. cores_needed = data.get('cores')
  160. try:
  161. self.conf.db_cur.execute('SELECT total_cpus from node')
  162. no_nodes = 0
  163. for row in self.conf.db_cur:
  164. if(row['total_cpus'] != None):
  165. if(row['total_cpus'] >= cores_needed):
  166. no_nodes += 1
  167. if(no_nodes < cores_needed):
  168. resp['error'] = 'Error, failed to add parallel job, the amount of requested resources are not available in l3q'
  169. return resp
  170. except Exception as e:
  171. self.conf.logger.error('ADD_PARA: Getting cores for nodes from database: {}'.format(e))
  172. resp['error'] = 'Error, failed to add parallel job'
  173. return resp
  174. if(len(data.get('depend')) > 0):
  175. try:
  176. dep = data.get('depend').strip().split(',')
  177. sql_where = 'WHERE'
  178. for i, d in enumerate(dep):
  179. if(i >= len(dep) - 1):
  180. sql_where += ' id = {}'.format(d)
  181. else:
  182. sql_where += ' id = {} or'.format(d)
  183. sql = 'SELECT * FROM job {}'.format(sql_where)
  184. self.conf.db_cur.execute(sql)
  185. rows = self.conf.db_cur.fetchall()
  186. for r in rows:
  187. if(r['state'] == JobState.ERROR or
  188. r['state'] == JobState.CANCEL or
  189. r['state'] == JobState.CANCELED):
  190. resp['error'] = 'Error, failed to add parallel job, one or more dependent jobs are not in a valid state.'
  191. return resp
  192. if(len(rows) != len(dep)):
  193. resp['error'] = 'Error, failed to add parallel job, not all depenent jobs exist'
  194. return resp
  195. except Exception as e:
  196. self.conf.logger.error('ADD_PARA: Retrieving depend jobs from database: {}'.format(e))
  197. resp['error'] = 'Error, failed to add parallel job'
  198. return resp
  199. try:
  200. state = JobState.QUEUED
  201. if(len(data.get('depend')) > 0):
  202. state = JobState.DEPEND
  203. self.conf.db_cur.execute('''INSERT INTO job (name, type, init_date, hosts_alloc, cores_alloc, no_tasks_running, no_tasks_finished, no_tasks_err, depend, user, state)
  204. VALUES (?, ?, ?, ?, ?, 0, 0, 0, ?, ?, ?)''', (data.get('name'), JobType.PARALLEL, dt_now, nodes_needed, cores_needed, data.get('depend'), data.get('user'), state))
  205. jobid = self.conf.db_cur.lastrowid
  206. for t in data.get('tasks'):
  207. if(len(t) == 1):
  208. self.conf.db_cur.execute('''INSERT INTO task (command, workdir, jobid) VALUES (?, ?, ?)''', (t[0], "$HOME", jobid))
  209. elif(len(t) == 2):
  210. self.conf.db_cur.execute('''INSERT INTO task (command, workdir, jobid) VALUES (?, ?, ?)''', (t[0], t[1], jobid))
  211. else:
  212. self.conf.logger.error('ADD_SEQ: Failed to parse task from client: {}'.format(t))
  213. resp['error'] = 'Error, failed to add sequence job'
  214. return resp
  215. except Exception as e:
  216. self.conf.logger.error('ADD_PARA: Saving to database: {}'.format(e))
  217. resp['error'] = 'Error, failed to add parallel job'
  218. resp['response'] = 'Parallel job with jobid: {} successfully added to the l3q queue'.format(jobid)
  219. resp['response_id'] = jobid
  220. if(len(adjusting) > 0):
  221. resp['response'] += '\n{}'.format(adjusting)
  222. else:
  223. del data['key']
  224. self.conf.logger.error('ADD_PARA: wrong variables sent in request: {}'.format(data))
  225. resp['error'] = 'Error wrong variables sent in request'
  226. return resp
  227. def __add_seq(self, data):
  228. '''
  229. Call when L3QAction.ADD_SEQ is requested.
  230. Will parse data dict and do required functionallity.
  231. Returns response dict.
  232. '''
  233. resp = {'action': L3QAction.ADD_SEQ }
  234. if('tasks' in data and 'user' in data and 'name' in data and 'depend' in data):
  235. dt_now = datetime.datetime.now()
  236. dt_now = dt_now.strftime('%Y-%m-%d %H:%M:%S')
  237. jobid = -1
  238. if(len(data.get('depend')) > 0):
  239. try:
  240. dep = data.get('depend').strip().split(',')
  241. sql_where = 'WHERE'
  242. for i, d in enumerate(dep):
  243. if(i >= len(dep) - 1):
  244. sql_where += ' id = {}'.format(d)
  245. else:
  246. sql_where += ' id = {} or'.format(d)
  247. sql = 'SELECT * FROM job {}'.format(sql_where)
  248. self.conf.db_cur.execute(sql)
  249. rows = self.conf.db_cur.fetchall()
  250. for r in rows:
  251. if(r['state'] == JobState.ERROR or
  252. r['state'] == JobState.CANCEL or
  253. r['state'] == JobState.CANCELED):
  254. resp['error'] = 'Error, failed to add sequence job, one or more dependent jobs are not in a valid state.'
  255. return resp
  256. if(len(rows) != len(dep)):
  257. resp['error'] = 'Error, failed to add sequence job, not all depenent jobs exist'
  258. return resp
  259. except Exception as e:
  260. self.conf.logger.error('ADD_SEQ: Retrieving depend jobs from database: {}'.format(e))
  261. resp['error'] = 'Error, failed to add sequence job'
  262. return resp
  263. try:
  264. state = JobState.QUEUED
  265. if(len(data.get('depend')) > 0):
  266. state = JobState.DEPEND
  267. self.conf.db_cur.execute('''INSERT INTO job (name, type, init_date, hosts_alloc, cores_alloc, no_tasks_running, no_tasks_finished, no_tasks_err, depend, user, state)
  268. VALUES (?, ?, ?, 1, 1, 0, 0, 0, ?, ?, ?)''', (data.get('name'), JobType.SEQUENCE, dt_now, data.get('depend'), data.get('user'), state))
  269. jobid = self.conf.db_cur.lastrowid
  270. for t in data.get('tasks'):
  271. if(len(t) == 1):
  272. self.conf.db_cur.execute('''INSERT INTO task (command, workdir, jobid) VALUES (?, ?, ?)''', (t[0], "$HOME", jobid))
  273. elif(len(t) == 2):
  274. self.conf.db_cur.execute('''INSERT INTO task (command, workdir, jobid) VALUES (?, ?, ?)''', (t[0], t[1], jobid))
  275. else:
  276. self.conf.logger.error('ADD_SEQ: Failed to parse task from client: {}'.format(t))
  277. resp['error'] = 'Error, failed to add sequence job'
  278. return resp
  279. resp['response'] = 'Sequence job with jobid: {} successfully added to the l3q queue'.format(jobid)
  280. resp['response_id'] = jobid
  281. except Exception as e:
  282. self.conf.logger.error('ADD_SEQ: Saving to database: {}'.format(e))
  283. resp['error'] = 'Error, failed to add sequence job'
  284. else:
  285. del data['key']
  286. self.conf.logger.error('ADD_SEQ: wrong variables sent in request: {}'.format(data))
  287. resp['error'] = 'Error wrong variables sent in request'
  288. return resp
  289. def __cancel_job(self, data):
  290. '''
  291. Call when L3QAction.CANCEL_JOB is requested.
  292. Will parse data dict and do required functionallity.
  293. Returns response dict.
  294. '''
  295. resp = {'action': L3QAction.CANCEL_JOB}
  296. if('jobid' in data and 'user' in data):
  297. try:
  298. self.conf.db_cur.execute('SELECT * FROM job where id = ?', (data.get('jobid'),))
  299. rows = self.conf.db_cur.fetchall()
  300. if(len(rows) == 0):
  301. resp['error'] = 'No job with jobid: {} found.'.format(data.get('jobid'))
  302. else:
  303. if(rows[0]['user'] == data.get('user') or data.get('user') == 'root'):
  304. if(rows[0]['state'] == JobState.RUNNING or rows[0]['state'] == JobState.QUEUED or rows[0]['state'] == JobState.DEPEND):
  305. self.conf.db_cur.execute('SELECT id,depend FROM job WHERE state = ? and depend LIKE ?', (JobState.DEPEND, "%{}%".format(data.get('jobid'))))
  306. depend_rows = self.conf.db_cur.fetchall()
  307. depend: list[int] = []
  308. for d in depend_rows:
  309. dep = d['depend'].strip()
  310. dep = dep.split(',')
  311. if(str(data.get('jobid')) in dep):
  312. depend.append(d['id'])
  313. if(len(depend) == 0):
  314. self.conf.db_cur.execute('UPDATE job SET state = ? WHERE id = ?', (JobState.CANCEL,data.get('jobid')))
  315. resp['response'] = 'Job with jobid: {} successfully canceled.'.format(data.get('jobid'))
  316. else:
  317. d_str = ""
  318. for d in depend:
  319. if(len(d_str) == 0):
  320. d_str += f"{d}"
  321. else:
  322. d_str += f",{d}"
  323. resp['error'] = 'Job with jobid: {} has depening jobs (id): {}\nDepending jobs must be cancelled first.'.format(data.get('jobid'), d_str)
  324. elif(rows[0]['state'] == JobState.CANCEL):
  325. resp['error'] = 'Job with jobid: {} is already in Cancel state and is cancelling.'.format(data.get('jobid'))
  326. else:
  327. resp['error'] = 'Job with jobid: {} is in state {} and can not be cancelled.'.format(data.get('jobid'), rows[0]['state'])
  328. else:
  329. resp['response'] = 'Permission denied to cancel job with id: {}'.format(data.get('jobid'))
  330. except Exception as e:
  331. self.conf.logger.error('CANCEL_JOB: Saving to database: {}'.format(e))
  332. resp['error'] = 'Error cancel job with jobid: {}'.format(data.get('jobid'))
  333. else:
  334. del data['key']
  335. self.conf.logger.error('CANCEL_JOB: wrong variables sent in request: {}'.format(data))
  336. resp['error'] = 'Error wrong variables sent in request'
  337. return resp
  338. def __get_queue(self, data):
  339. '''
  340. Call when L3QAction.GET_QUEUE is requested.
  341. Will parse data dict and do required functionallity.
  342. Returns response dict.
  343. '''
  344. resp = {'action': L3QAction.GET_QUEUE }
  345. queue = []
  346. try:
  347. self.conf.db_cur.execute('SELECT * FROM job WHERE (state = ? OR state = ? OR state = ? OR state = ?) order by id', (JobState.RUNNING, JobState.QUEUED, JobState.CANCEL, JobState.DEPEND))
  348. for row in self.conf.db_cur:
  349. r = {'jobid': row['id']}
  350. r['user'] = row['user']
  351. r['init_date'] = row['init_date']
  352. r['cores_alloc'] = row['cores_alloc']
  353. r['hosts_alloc'] = row['hosts_alloc']
  354. r['tasks'] = str(row['no_tasks_running']) +'/' + str(row['no_tasks_finished']) + '/' + str(row['no_tasks_err'])
  355. #r['nodes'] = row['nodes']
  356. r['state'] = row['state']
  357. r['depend'] = row['depend']
  358. if(row['name']):
  359. r['name'] = row['name']
  360. else:
  361. r['name'] = '-'
  362. if(row['start_date']):
  363. r['start_date'] = row['start_date']
  364. else:
  365. r['start_date'] = '-'
  366. c = self.conf.db_conn.execute('''SELECT *
  367. FROM task
  368. LEFT JOIN node ON task.nodeid = node.id
  369. LEFT JOIN validate_host ON node.validate = validate_host.id
  370. WHERE task.jobid = ? ORDER BY task.id''', (r['jobid'],))
  371. c_row = c.fetchall()
  372. if(len(c_row) > 0):
  373. r['tasks'] += ':' + str(len(c_row))
  374. else:
  375. r['tasks'] += ':-'
  376. nodes = set()
  377. for t in c_row:
  378. if(t['name']):
  379. nodes.add(t['name'])
  380. if(len(nodes) == 0):
  381. r['nodes'] = '-'
  382. else:
  383. str_n = ''
  384. for n in nodes:
  385. if(len(str_n) > 0 ):
  386. str_n += f",{n}"
  387. else:
  388. str_n = n
  389. r['nodes'] = str_n
  390. c.close()
  391. queue.append(r)
  392. except Exception as e:
  393. self.conf.logger.error('GET_QUEUE: Reading from database: {}'.format(e))
  394. resp['error'] = 'Error reading queue.'
  395. resp['queue'] = queue
  396. return resp
  397. def __get_job_info(self, data):
  398. '''
  399. Call when L3QAction.GET_JOB_INFO is requested.
  400. Will parse data dict and do required functionallity.
  401. Returns response dict.
  402. '''
  403. resp = {'action': L3QAction.GET_JOB_INFO }
  404. if('jobid' in data):
  405. try:
  406. self.conf.db_cur.execute('SELECT * FROM job WHERE id = ?', (data.get('jobid'),))
  407. rows = self.conf.db_cur.fetchall()
  408. if(len(rows) == 0):
  409. resp['error'] = 'No job with jobid: {} found.'.format(data.get('jobid'))
  410. else:
  411. r = rows[0]
  412. resp['jobid'] = r['id']
  413. resp['init_date'] = r['init_date']
  414. resp['type'] = r['type']
  415. resp['hosts_alloc'] = r['hosts_alloc']
  416. resp['cores_alloc'] = r['cores_alloc']
  417. resp['no_tasks_running'] = r['no_tasks_running']
  418. resp['no_tasks_finished'] = r['no_tasks_finished']
  419. resp['no_tasks_err'] = r['no_tasks_err']
  420. resp['user'] = r['user']
  421. resp['state'] = r['state']
  422. resp['depend'] = r['depend']
  423. if(r['name']):
  424. resp['name'] = r['name']
  425. else:
  426. resp['name'] = '-'
  427. if(r['start_date']):
  428. resp['start_date'] = r['start_date']
  429. else:
  430. resp['start_date'] = '-'
  431. if(r['end_date']):
  432. resp['end_date'] = r['end_date']
  433. else:
  434. resp['end_date'] = '-'
  435. if(r['start_date'] and not r['end_date']):
  436. dt_now = datetime.datetime.now()
  437. start_d = datetime.datetime.strptime(r['start_date'], '%Y-%m-%d %H:%M:%S')
  438. resp['runtime'] = f"{dt_now-start_d}"
  439. elif(r['start_date'] and r['end_date']):
  440. start_d = datetime.datetime.strptime(r['start_date'], '%Y-%m-%d %H:%M:%S')
  441. end_d = datetime.datetime.strptime(r['end_date'], '%Y-%m-%d %H:%M:%S')
  442. resp['runtime'] = f"{end_d-start_d}"
  443. else:
  444. resp['runtime'] = '-'
  445. nodes = set()
  446. if(r['start_date']):
  447. c = self.conf.db_conn.execute(
  448. 'SELECT * FROM task LEFT JOIN node ON task.nodeid = node.id LEFT JOIN validate_host ON node.validate = validate_host.id WHERE task.jobid = ? ORDER BY seqno',
  449. (data.get('jobid'),))
  450. tasks = []
  451. for t in c:
  452. if(t['seqno']):
  453. task = { 'seq': t['seqno'] }
  454. else:
  455. task = { 'seq': '-' }
  456. task['taskid'] = t['id']
  457. if(t['exit_code'] != None):
  458. task['exit_code'] = t['exit_code']
  459. else:
  460. task['exit_code'] = '-'
  461. if(t['name']):
  462. task['node'] = t['name']
  463. nodes.add(t['name'])
  464. else:
  465. task['node'] = '-'
  466. task['command'] = t['command']
  467. if(t['start_time']):
  468. task['start_date'] = t['start_time']
  469. else:
  470. task['start_date'] = '-'
  471. if(data.get('details')):
  472. if(t['end_time']):
  473. task['end_date'] = t['end_time']
  474. else:
  475. task['end_date'] = '-'
  476. if(t['start_time'] and not t['end_time']):
  477. dt_now = datetime.datetime.now()
  478. start_d = datetime.datetime.strptime(t['start_time'], '%Y-%m-%d %H:%M:%S')
  479. task['runtime'] = f"{dt_now-start_d}"
  480. elif(t['start_time'] and t['end_time']):
  481. start_d = datetime.datetime.strptime(t['start_time'], '%Y-%m-%d %H:%M:%S')
  482. end_d = datetime.datetime.strptime(t['end_time'], '%Y-%m-%d %H:%M:%S')
  483. task['runtime'] = f"{end_d-start_d}"
  484. else:
  485. task['runtime'] = '-'
  486. if(t['workdir']):
  487. task['workdir'] = t['workdir']
  488. else:
  489. task['workdir'] = '-'
  490. if(t['stdout']):
  491. task['stdout'] = t['stdout']
  492. else:
  493. task['stdout'] = '-'
  494. if(t['stderr']):
  495. task['stderr'] = t['stderr']
  496. else:
  497. task['stderr'] = '-'
  498. tasks.append(task)
  499. resp['tasks'] = tasks
  500. if(len(nodes) == 0):
  501. resp['nodes'] = '-'
  502. else:
  503. str_n = ''
  504. for n in nodes:
  505. if(len(str_n) > 0 ):
  506. str_n += f",{n}"
  507. else:
  508. str_n = n
  509. resp['nodes'] = str_n
  510. except Exception as e:
  511. self.conf.logger.error('GET_JOB_INFO: Reading from database: {}'.format(e))
  512. resp['error'] = 'Error reading job.'
  513. else:
  514. self.conf.logger.error('GET_JOB_INFO: wrong variables sent in request: {}'.format(data))
  515. resp['error'] = 'Error wrong variables sent in request'
  516. return resp
  517. def __get_task_list(self, data):
  518. '''
  519. Call when L3QAction.GET_TASK_LIST is requested.
  520. Will parse data dict and do required functionallity.
  521. Returns response dict.
  522. '''
  523. resp = {'action': L3QAction.GET_TASK_LIST }
  524. if('jobid' in data):
  525. try:
  526. c = self.conf.db_conn.execute('SELECT * FROM task WHERE jobid = ? ORDER BY id', (data.get('jobid'),))
  527. rows = c.fetchall()
  528. if(len(rows) == 0):
  529. resp['error'] = 'No job with jobid: {} found.'.format(data.get('jobid'))
  530. else:
  531. resp['jobid'] = data.get('jobid')
  532. tasks = []
  533. for t in rows:
  534. if(t['start_time']):
  535. c = self.conf.db_conn.execute(
  536. 'SELECT * FROM task INNER JOIN node ON task.nodeid = node.id INNER JOIN validate_host ON node.validate = validate_host.id WHERE task.id = ? ORDER BY task.id',
  537. (t['id'],))
  538. t = c.fetchall()[0]
  539. if(t['seqno']):
  540. task = { 'seq': t['seqno'] }
  541. else:
  542. task = { 'seq': '-' }
  543. task['taskid'] = t['id']
  544. if(t['exit_code'] != None):
  545. task['exit_code'] = t['exit_code']
  546. else:
  547. task['exit_code'] = '-'
  548. if('name' in t.keys()):
  549. task['node'] = t['name']
  550. else:
  551. task['node'] = '-'
  552. task['command'] = t['command']
  553. if(t['start_time']):
  554. task['start_date'] = t['start_time']
  555. else:
  556. task['start_date'] = '-'
  557. if(t['end_time']):
  558. task['end_date'] = t['end_time']
  559. else:
  560. task['end_date'] = '-'
  561. if(t['start_time'] and not t['end_time']):
  562. dt_now = datetime.datetime.now()
  563. start_d = datetime.datetime.strptime(t['start_time'], '%Y-%m-%d %H:%M:%S')
  564. task['runtime'] = f"{dt_now-start_d}"
  565. elif(t['start_time'] and t['end_time']):
  566. start_d = datetime.datetime.strptime(t['start_time'], '%Y-%m-%d %H:%M:%S')
  567. end_d = datetime.datetime.strptime(t['end_time'], '%Y-%m-%d %H:%M:%S')
  568. task['runtime'] = f"{end_d-start_d}"
  569. else:
  570. task['runtime'] = '-'
  571. tasks.append(task)
  572. resp['tasks'] = tasks
  573. except Exception as e:
  574. self.conf.logger.error('GET_TASK_LIST: Reading from database: {}'.format(e))
  575. resp['error'] = 'Error reading tasks.'
  576. else:
  577. self.conf.logger.error('GET_TASK_LIST: wrong variables sent in request: {}'.format(data))
  578. resp['error'] = 'Error wrong variables sent in request'
  579. return resp
  580. def __get_task_info(self, data):
  581. '''
  582. Call when L3QAction.GET_TASK_INFO is requested.
  583. Will parse data dict and do required functionallity.
  584. Returns response dict.
  585. '''
  586. resp = {'action': L3QAction.GET_TASK_INFO }
  587. if('taskid' in data):
  588. try:
  589. self.conf.db_cur.execute('SELECT * FROM task WHERE id = ?', (data.get('taskid'),))
  590. rows = self.conf.db_cur.fetchall()
  591. if(len(rows) == 0):
  592. resp['error'] = 'No task with taskid: {} found.'.format(data.get('taskid'))
  593. else:
  594. r = rows[0]
  595. if(r['start_time']):
  596. self.conf.db_cur.execute(
  597. 'SELECT * FROM task INNER JOIN node ON task.nodeid = node.id INNER JOIN validate_host ON node.validate = validate_host.id WHERE task.id = ? ORDER BY seqno',
  598. (data.get('taskid'),))
  599. c = self.conf.db_cur.fetchall()
  600. if(len(c) == 0):
  601. resp['error'] = 'No task with taskid: {} found.'.format(data.get('taskid'))
  602. return resp
  603. else:
  604. r = c[0]
  605. if(r['seqno']):
  606. resp['seq'] = r['seqno']
  607. else:
  608. resp['seq'] = '-'
  609. resp['taskid'] = r['id']
  610. if(r['exit_code'] != None):
  611. resp['exit_code'] = r['exit_code']
  612. else:
  613. resp['exit_code'] = '-'
  614. if('name' in r):
  615. resp['node'] = r['name']
  616. else:
  617. resp['node'] = '-'
  618. resp['command'] = r['command']
  619. if(r['start_time']):
  620. resp['start_date'] = r['start_time']
  621. else:
  622. resp['start_date'] = '-'
  623. if(r['end_time']):
  624. resp['end_date'] = r['end_time']
  625. else:
  626. resp['end_date'] = '-'
  627. if(r['start_time'] and not r['end_time']):
  628. dt_now = datetime.datetime.now()
  629. start_d = datetime.datetime.strptime(r['start_time'], '%Y-%m-%d %H:%M:%S')
  630. resp['runtime'] = f"{dt_now-start_d}"
  631. elif(r['start_time'] and r['end_time']):
  632. start_d = datetime.datetime.strptime(r['start_time'], '%Y-%m-%d %H:%M:%S')
  633. end_d = datetime.datetime.strptime(r['end_time'], '%Y-%m-%d %H:%M:%S')
  634. resp['runtime'] = f"{end_d-start_d}"
  635. else:
  636. resp['runtime'] = '-'
  637. if(r['workdir']):
  638. resp['workdir'] = r['workdir']
  639. else:
  640. resp['workdir'] = '-'
  641. if(r['unitname']):
  642. resp['unitname'] = r['unitname']
  643. else:
  644. resp['unitname'] = '-'
  645. if(r['active_state']):
  646. resp['active_state'] = r['active_state']
  647. else:
  648. resp['active_state'] = '-'
  649. if(r['sub_state']):
  650. resp['sub_state'] = r['sub_state']
  651. else:
  652. resp['sub_state'] = '-'
  653. if(r['exit_code']):
  654. resp['exit_code'] = r['exit_code']
  655. else:
  656. resp['exit_code'] = '-'
  657. if(r['memory_peak']):
  658. resp['memory_peak'] = r['memory_peak']
  659. else:
  660. resp['memory_peak'] = '-'
  661. if(r['jobid']):
  662. resp['jobid'] = r['jobid']
  663. else:
  664. resp['active_state'] = '-'
  665. if(r['stdout']):
  666. resp['stdout'] = r['stdout']
  667. else:
  668. resp['stdout'] = '-'
  669. if(r['stderr']):
  670. resp['stderr'] = r['stderr']
  671. else:
  672. resp['stderr'] = '-'
  673. except Exception as e:
  674. self.conf.logger.error('GET_TASK_INFO: Reading from database: {}'.format(e))
  675. resp['error'] = 'Error reading task.'
  676. else:
  677. self.conf.logger.error('GET_TASK_INFO: wrong variables sent in request: {}'.format(data))
  678. resp['error'] = 'Error wrong variables sent in request'
  679. return resp
  680. def __get_history(self, data):
  681. '''
  682. Call when L3QAction.GET_HISTORY is requested.
  683. Will parse data dict and do required functionallity.
  684. Returns response dict.
  685. '''
  686. resp = {'action': L3QAction.GET_HISTORY }
  687. history = []
  688. try:
  689. if('number' in data):
  690. self.conf.db_cur.execute('SELECT * FROM job WHERE not (state = ? OR state = ? OR state = ? OR state = ?) ORDER BY id DESC LIMIT ?', (JobState.RUNNING, JobState.QUEUED, JobState.CANCEL, JobState.DEPEND, data.get('number')))
  691. else:
  692. self.conf.db_cur.execute('SELECT * FROM job WHERE not (state = ? OR state = ? OR state = ? OR state = ?) ORDER BY id DESC', (JobState.RUNNING, JobState.QUEUED, JobState.CANCEL, JobState.DEPEND))
  693. for row in self.conf.db_cur:
  694. r = {'jobid': row['id']}
  695. r['user'] = row['user']
  696. r['init_date'] = row['init_date']
  697. r['cores_alloc'] = row['cores_alloc']
  698. r['hosts_alloc'] = row['hosts_alloc']
  699. r['tasks'] = str(row['no_tasks_running']) +'/' + str(row['no_tasks_finished']) + '/' + str(row['no_tasks_err'])
  700. r['state'] = row['state']
  701. r['depend'] = row['depend']
  702. if(row['name']):
  703. r['name'] = row['name']
  704. else:
  705. r['name'] = '-'
  706. if(row['start_date']):
  707. r['start_date'] = row['start_date']
  708. else:
  709. r['start_date'] = '-'
  710. if(row['end_date']):
  711. r['end_date'] = row['end_date']
  712. else:
  713. r['end_date'] = '-'
  714. c = self.conf.db_conn.execute('''SELECT *
  715. FROM task
  716. LEFT JOIN node ON task.nodeid = node.id
  717. LEFT JOIN validate_host ON node.validate = validate_host.id
  718. WHERE task.jobid = ? ORDER BY task.id''', (r['jobid'],))
  719. c_row = c.fetchall()
  720. if(len(c_row) > 0):
  721. r['tasks'] += ':' + str(len(c_row))
  722. else:
  723. r['tasks'] += ':-'
  724. nodes = set()
  725. for t in c_row:
  726. if(t['name']):
  727. nodes.add(t['name'])
  728. if(len(nodes) == 0):
  729. r['nodes'] = '-'
  730. else:
  731. str_n = ''
  732. for n in nodes:
  733. if(len(str_n) > 0 ):
  734. str_n += f",{n}"
  735. else:
  736. str_n = n
  737. r['nodes'] = str_n
  738. c.close()
  739. history.append(r)
  740. except Exception as e:
  741. self.conf.logger.error('GET_HISTORY: Reading from database: {}'.format(e))
  742. resp['error'] = 'Error reading history.'
  743. resp['history'] = history
  744. return resp
  745. def __get_node_status(self, data):
  746. '''
  747. Call when L3QAction.GET_NODE_STATUS is requested.
  748. Will parse data dict and do required functionallity.
  749. Returns response dict.
  750. '''
  751. resp = {'action': L3QAction.GET_NODE_STATUS }
  752. nodes = []
  753. try:
  754. self.conf.db_cur.execute('SELECT * FROM node INNER JOIN validate_host ON node.validate = validate_host.id order by name')
  755. for row in self.conf.db_cur:
  756. r = {'name': row['name']}
  757. if(row['total_cpus']):
  758. r['cores'] = row['total_cpus']
  759. else:
  760. r['cores'] = '-'
  761. if(row['alloc_cpus'] != None):
  762. r['cores_alloc'] = row['alloc_cpus']
  763. else:
  764. r['cores_alloc'] = '-'
  765. if(row['used_cpus'] != None):
  766. r['cores_in_use'] = row['used_cpus']
  767. else:
  768. r['cores_in_use'] = '-'
  769. if(row['total_memory']):
  770. r['ram'] = str(row['total_memory'])
  771. else:
  772. r['ram'] = '-'
  773. if(row['used_memory']):
  774. r['ram_used'] = str(row['used_memory'])
  775. else:
  776. r['ram_used'] = '-'
  777. if(row['total_swap']):
  778. r['total_swap'] = str(row['total_swap'])
  779. else:
  780. r['total_swap'] = '-'
  781. if(row['used_swap']):
  782. r['used_swap'] = str(row['used_swap'])
  783. else:
  784. r['used_swap'] = '-'
  785. if(row['state']):
  786. r['state'] = row['state']
  787. else:
  788. r['state'] = '-'
  789. nodes.append(r)
  790. except Exception as e:
  791. self.conf.logger.error('GET_NODE_STATUS: Reading from database: {}'.format(e))
  792. resp['error'] = 'Error reading node status.'
  793. resp['nodes'] = nodes
  794. return resp
  795. def __add_node(self, data):
  796. '''
  797. Call when L3QAction.ADD_NODE is requested.
  798. Will parse data dict and do required functionallity.
  799. Returns response dict.
  800. '''
  801. resp = {'action': L3QAction.ADD_NODE }
  802. if('node_name' in data):
  803. try:
  804. self.conf.db_cur.execute('SELECT * FROM node where name = ?', (data.get('node_name'),))
  805. rows = self.conf.db_cur.fetchall()
  806. if(len(rows) != 0):
  807. resp['error'] = 'Node: {} does already exist.'.format(data.get('node_name'))
  808. else:
  809. self.conf.db_cur.execute('INSERT INTO node (name) values (?)', (data.get('node_name'),))
  810. resp['node'] = 'Node: {} successfully added.'.format(data.get('node_name'))
  811. except Exception as e:
  812. self.conf.logger.error('ADD_NODE: Saving to database: {}'.format(e))
  813. resp['error'] = 'Error saving node: {}'.format(data.get('node_name'))
  814. else:
  815. del data['key']
  816. self.conf.logger.error('ADD_NODE: wrong variables sent in request: {}'.format(data))
  817. resp['error'] = 'Error wrong variables sent in request'
  818. return resp
  819. def __remove_node(self, data):
  820. '''
  821. Call when L3QAction.REMOVE_NODE is requested.
  822. Will parse data dict and do required functionallity.
  823. Returns response dict.
  824. '''
  825. resp = {'action': L3QAction.REMOVE_NODE }
  826. if('node_name' in data):
  827. try:
  828. self.conf.db_cur.execute('SELECT * FROM node where name = ?', (data.get('node_name'),))
  829. rows = self.conf.db_cur.fetchall()
  830. if(len(rows) == 0):
  831. resp['error'] = 'Node: {} does not exist.'.format(data.get('node_name'))
  832. else:
  833. self.conf.db_cur.execute('DELETE FROM node where name = (?)', (data.get('node_name'),))
  834. resp['node'] = 'Node: {} successfully removed.'.format(data.get('node_name'))
  835. except Exception as e:
  836. self.conf.logger.error('REMOVE_NODE: Deleting from database: {}'.format(e))
  837. resp['error'] = 'Error removing node: {}'.format(data.get('node_name'))
  838. else:
  839. del data['key']
  840. self.conf.logger.error('REMOVE_NODE: wrong variables sent in request: {}'.format(data))
  841. resp['error'] = 'Error wrong variables sent in request'
  842. return resp
  843. def __set_node_offline(self, data):
  844. '''
  845. Call when L3QAction.SET_NODE_OFFLINE is requested.
  846. Will parse data dict and do required functionallity.
  847. Returns response dict.
  848. '''
  849. resp = {'action': L3QAction.SET_NODE_OFFLINE }
  850. if('node_name' in data):
  851. try:
  852. self.conf.db_cur.execute('SELECT id FROM validate_host where name = ?', (data.get('node_name'),))
  853. rows = self.conf.db_cur.fetchall()
  854. if(len(rows) == 0):
  855. resp['error'] = 'Node: {} does not exist.'.format(data.get('node_name'))
  856. else:
  857. for r in rows:
  858. self.conf.db_cur.execute("UPDATE node SET state = 'Maintenance draining' where validate = (?)", (r['id'],))
  859. resp['node'] = 'Node: {} successfully changed state to offline (Maintenance draining).'.format(data.get('node_name'))
  860. except Exception as e:
  861. self.conf.logger.error('SET_NODE_OFFLINE: Saving to database: {}'.format(e))
  862. resp['error'] = 'Error changing state to offline on node: {}'.format(data.get('node_name'))
  863. else:
  864. del data['key']
  865. self.conf.logger.error('SET_NODE_OFFLINE: wrong variables sent in request: {}'.format(data))
  866. resp['error'] = 'Error wrong variables sent in request'
  867. return resp
  868. def __set_node_online(self, data):
  869. '''
  870. Call when L3QAction.SET_NODE_ONLINE is requested.
  871. Will parse data dict and do required functionallity.
  872. Returns response dict.
  873. '''
  874. resp = {'action': L3QAction.SET_NODE_ONLINE }
  875. if('node_name' in data):
  876. try:
  877. self.conf.db_cur.execute('SELECT id FROM validate_host where name = ?', (data.get('node_name'),))
  878. rows = self.conf.db_cur.fetchall()
  879. if(len(rows) == 0):
  880. resp['error'] = 'Node: {} does not exist.'.format(data.get('node_name'))
  881. else:
  882. for r in rows:
  883. self.conf.db_cur.execute("UPDATE node SET state = 'Soft Online' where validate = (?)", (r['id'],))
  884. resp['node'] = 'Node: {} successfully changed state to online (Soft Online).'.format(data.get('node_name'))
  885. except Exception as e:
  886. self.conf.logger.error('SET_NODE_ONLINE: Saving to database: {}'.format(e))
  887. resp['error'] = 'Error changing state to online on node: {}'.format(data.get('node_name'))
  888. else:
  889. del data['key']
  890. self.conf.logger.error('SET_NODE_ONLINE: wrong variables sent in request: {}'.format(data))
  891. resp['error'] = 'Error wrong variables sent in request'
  892. return resp
  893. def __send_task_to_node(self, key: str, hostname: str, ip: str, port: int, job: List[Any], task: List[Any]) -> bool:
  894. '''
  895. Creates the required json data
  896. to send to node. Send task to
  897. node and verify response from node.
  898. '''
  899. task = {
  900. 'taskid': task['id'],
  901. 'command': task['command'],
  902. 'working_directory': task['workdir']
  903. }
  904. node_job = {
  905. 'cmd':
  906. {
  907. 'ExecuteL3qdJob':
  908. {
  909. 'job':
  910. {
  911. 'jobid': job['id'],
  912. 'cores_alloc': job['cores_alloc'],
  913. 'memory_alloc': 0,
  914. 'exec_type': job['type'],
  915. 'tasks': [task],
  916. 'user': job['user'],
  917. 'group': '',
  918. }
  919. }
  920. },
  921. 'validate': key
  922. }
  923. # test
  924. print(f"Send task to node: {node_job}")
  925. try:
  926. json_data = json.dumps(node_job)
  927. except Exception as e:
  928. self.logger.error('Error encoding data into json string:\n{}, __send_task_to_node'.format(e))
  929. return False
  930. try:
  931. resp = requests.post('https://' + hostname + ':' + str(port) + '/l3qd/request',
  932. data=json_data, verify=False)
  933. resp.raise_for_status()
  934. except:
  935. try:
  936. resp = requests.post('https://' + ip + ':' + str(port) + '/l3qd/request',
  937. data=json_data, verify=False)
  938. resp.raise_for_status()
  939. except Exception as e:
  940. self.logger.error('Failed to send jobid: {} to node name: {}, {}, __send_task_to_node'
  941. .format(job['id'], hostname, e))
  942. return False
  943. try:
  944. data = json.loads(resp.text)
  945. except Exception as e:
  946. self.logger.error('Failed to parse response from node name: {} when sending job with id: {}, __send_task_to_node'
  947. .format(hostname, job['id']))
  948. return False
  949. if(not data):
  950. self.logger.error('Error response from node name: {} when sending job with id: {}\nEmpty respone from node, __send_task_to_node'
  951. .format(hostname, job['id']))
  952. return False
  953. elif('Error' in data['data']):
  954. self.logger.error('Error response from node name: {} when sending job with id: {}\nNode error: {}, __send_task_to_node'
  955. .format(hostname, job['id'], data['data']['Error']))
  956. return False
  957. elif(('TaskExecuted' in data['data']) and ('validate' in data)):
  958. if(data['validate'] in self.validate):
  959. if(data['data']['TaskExecuted']['jobid'] == job['id']):
  960. sent_tasks = []
  961. for t in node_job['tasks']:
  962. sent_tasks.append(t['taskid'])
  963. resp_tasks = data['data']['TaskExecuted']['task']
  964. if(sorted(sent_tasks) == sorted(resp_tasks)):
  965. return True
  966. else:
  967. self.logger.error('Wrong taskids: {} in answer from node name: {} when sending job with id: {} and taskids: {}, __send_task_to_node'
  968. .format(resp_tasks,
  969. hostname,
  970. job['id'],
  971. sent_tasks))
  972. return False
  973. else:
  974. self.logger.error('Wrong jobid: {} in answer from node name: {} when sending job with id: {}, __send_task_to_node'
  975. .format(data['data']['TaskExecuted']['jobid'],
  976. hostname,
  977. job['id']))
  978. return False
  979. else:
  980. self.logger.error('Validation failure from node name: {} when sending job with id: {}'
  981. .format(hostname, job['cmd']['ExecuteL3qdJob']['job']['jobid']))
  982. return False
  983. return False
  984. # def __prepare_jobs_to_send(self, jobs: list[queue.QRun]) -> None:
  985. # '''
  986. # Collect required data
  987. # from database and sends
  988. # jobs to specified nodes.
  989. # '''
  990. #
  991. # node_jobs: dict[str, Any] = {}
  992. # for j in jobs:
  993. # try:
  994. # self.conf.db_cur.execute('SELECT hosts_alloc,cores_alloc,type FROM job WHERE id = ?', (j.jobid,))
  995. # job = self.conf.db_cur.fetchall()
  996. # if(job[0].get('hosts_alloc') != len(j.node_ids)):
  997. # self.conf.logger.error(f'Wrong number of nodes: {len(j.node_ids)} returned from queue, required: {job[0].get("hosts_alloc")}, in send_job_to_nodes')
  998. # continue
  999. # except Exception as e:
  1000. # self.conf.logger.error(f'Failed to get jobid: {j.jobid} from database: {e}, in send_job_to_nodes')
  1001. # continue
  1002. #
  1003. # try:
  1004. # self.conf.db_cur.execute('SELECT id,command,seqno,workdir FROM task WHERE jobid = ?', (j.jobid,))
  1005. # tasks = self.conf.db_cur.fetchall()
  1006. # except Exception as e:
  1007. # self.conf.logger.error(f'Failed to get tasks for jobid: {j.jobid} from database: {e}, in send_job_to_nodes')
  1008. # continue
  1009. #
  1010. # for i in j.node_ids:
  1011. # try:
  1012. # self.conf.db_cur.execute('SELECT hostname,ip FROM validate_host INNER JOIN node ON validate_host.id = node.validate WHERE node.id = ?)', (i,))
  1013. # host = self.conf.db_cur.fetchall()
  1014. # node_jobs[host[0]['hostname']] = {}
  1015. # node_jobs[host[0]['hostname']]['ip'] = host[0]['ip']
  1016. # except Exception as e:
  1017. # self.conf.logger.error(f'Failed to get node with id: {i} from database: {e}, in send_job_to_nodes')
  1018. # continue
  1019. #
  1020. #
  1021. #
  1022. # tasks_per_node = ceil(len(tasks)/len(node_jobs))
  1023. #
  1024. # c = 1
  1025. # for n in node_jobs:
  1026. # start = c - 1
  1027. # end = c * tasks_per_node - 1
  1028. #
  1029. # if(start >= len(tasks)):
  1030. # break
  1031. #
  1032. # elif(end >= len(tasks)):
  1033. # node_jobs[n]['tasks'] = tasks[start:]
  1034. # break
  1035. # else:
  1036. # node_jobs[n]['tasks'] = tasks[start:end]
  1037. # c += 1
  1038. #
  1039. # # self.__send_to_node(hostname, ip, tasks)
  1040. # # send to nodes
  1041. # # if send to node success update start_date for job
  1042. #
  1043. # def __update_queue(self) -> None:
  1044. # '''
  1045. # Update configured queue.
  1046. # Sends all queued and depending
  1047. # jobs and node info to queue
  1048. # object that returns jobs to
  1049. # be launched on the nodes.
  1050. # Send jobs to the nodes and
  1051. # updates the database.
  1052. # '''
  1053. #
  1054. # queue = eval(f"queue.{self.conf.queue}()")
  1055. # job_queue = []
  1056. # try:
  1057. # self.conf.db_cur.execute('SELECT id,init_date,hosts_alloc,cores_alloc,depend,state FROM job WHERE state = ?', (JobState.QUEUED, JobState.DEPEND))
  1058. # db_job_queue = self.conf.db_cur.fetchall()
  1059. #
  1060. # for j in db_job_queue:
  1061. # if(j.get("state") == JobState.QUEUED):
  1062. # qj = queue.QJob.from_dict(j)
  1063. # if(qj != None):
  1064. # job_queue.append(qj)
  1065. # else:
  1066. # depend = j.get("depend")
  1067. #
  1068. # if(depend != None):
  1069. # all_depend_term = True
  1070. # depend_err = False
  1071. # depend_cancel = False
  1072. # for d in depend.trim().split(','):
  1073. # try:
  1074. # self.conf.db_cur.execute('SELECT state FROM job WHERE id = ?"', (d,))
  1075. # job = self.conf.db_cur.fetchall()
  1076. #
  1077. # if(len(job) == 0):
  1078. # self.conf.logger.error('Failed to get job with jobid: {} from database'.format(d))
  1079. # else:
  1080. # for dj in job:
  1081. # if(dj.get('state') != JobState.TERMINATED):
  1082. # all_depend_term = False
  1083. # if(dj.get('state') == JobState.ERROR or dj.get('state') == JobState.NODE_ERROR):
  1084. # depend_err = True
  1085. # elif(dj.get('state') == JobState.CANCEL or dj.get('state') == JobState.CANCELED):
  1086. # depend_cancel = True
  1087. # except Exception as e:
  1088. # self.conf.logger.error('Failed to get depend jobs to jobid:{} from database: {}'.format(j.get("id"),e))
  1089. #
  1090. # if(all_depend_term):
  1091. # qj = queue.QJob.from_dict(dj)
  1092. # if(qj != None):
  1093. # job_queue.append(qj)
  1094. # elif(depend_err):
  1095. # try:
  1096. # self.conf.db_cur.execute('UPDATE job SET state = ? where jobid = ?', (JobState.ERROR, j.get('id')))
  1097. # except Exception as e:
  1098. # self.conf.logger.error(f'Failed to update job state for jobid: {j.get("id")} in database: {e}')
  1099. # elif(depend_cancel):
  1100. # try:
  1101. # self.conf.db_cur.execute('UPDATE job SET state = ? where jobid = ?', (JobState.CANCELED, j.get('id')))
  1102. # except Exception as e:
  1103. # self.conf.logger.error(f'Failed to update job state for jobid: {j.get("id")} in database: {e}')
  1104. # except Exception as e:
  1105. # self.conf.logger.error(f'Failed to get all queued jobs from database: {e}')
  1106. #
  1107. #
  1108. # online_nodes = []
  1109. # try:
  1110. # self.conf.db_cur.execute('SELECT id,total_cpus,alloc_cpus FROM node WHERE state = ?"', (NodeState.ONLINE,))
  1111. #
  1112. # nodes = self.conf.db_cur.fetchall()
  1113. #
  1114. # for n in nodes:
  1115. # node = queue.QNode.from_dict(n)
  1116. #
  1117. # if(node != None):
  1118. # online_nodes.append(node)
  1119. #
  1120. # except Exception as e:
  1121. # self.conf.logger.error(f'Failed to get all online nodes from database: {e}')
  1122. #
  1123. #
  1124. # run = queue.run_queue(job_queue, online_nodes)
  1125. # self.__prepare_jobs_to_send(run)
  1126. def __update_jobs(self):
  1127. '''
  1128. Updates status for running jobs.
  1129. Should be run after node update
  1130. of tasks.
  1131. Goes though all tasks for running
  1132. jobs and if all tasks for a job
  1133. has terminated job status is updated.
  1134. '''
  1135. try:
  1136. self.conf.db_cur.execute('SELECT * FROM job where state = ?', (JobState.RUNNING,))
  1137. rows = self.conf.db_cur.fetchall()
  1138. for row in rows:
  1139. try:
  1140. self.conf.db_cur.execute('SELECT * FROM task where jobid = ?', (row['id'],))
  1141. tasks = self.conf.db_cur.fetchall()
  1142. all_tasks_done = True
  1143. failed_tasks = False
  1144. no_tasks_finished = 0
  1145. no_tasks_err = 0
  1146. no_tasks_running = 0
  1147. nodes = set()
  1148. for t in tasks:
  1149. nodes.add(t['nodeid'])
  1150. if(t['sub_state'] == 'failed'):
  1151. failed_tasks = True
  1152. no_tasks_err += 1
  1153. elif(t['sub_state'] == 'exited'):
  1154. no_tasks_finished += 1
  1155. elif(t['sub_state'] != 'exited'):
  1156. all_tasks_done = False
  1157. running = 0
  1158. tot_run = len(tasks) - no_tasks_finished - no_tasks_err
  1159. tot_alloc = row['hosts_alloc'] * row['cores_alloc']
  1160. if(tot_alloc > tot_run):
  1161. running = tot_run
  1162. else:
  1163. running = tot_alloc
  1164. try:
  1165. self.conf.db_cur.execute('UPDATE job SET no_tasks_running = ?,no_tasks_finished = ?,no_tasks_err =? where id = ?', (running,no_tasks_finished,no_tasks_err, row['id']))
  1166. except Exception as e:
  1167. self.conf.logger.error(f'Failed to update job task run/fin/err for jobid: {row["id"]} in database: {e}')
  1168. try:
  1169. if(all_tasks_done and not failed_tasks):
  1170. self.conf.db_cur.execute('UPDATE job SET state = ? where jobid = ?', (JobState.TERMINATED, row['id']))
  1171. elif(all_tasks_done and failed_tasks):
  1172. self.conf.db_cur.execute('UPDATE job SET state = ? where jobid = ?', (JobState.ERROR, row['id']))
  1173. except Exception as e:
  1174. self.conf.logger.error(f'Failed to update job state for jobid: {row["id"]} in database: {e}')
  1175. try:
  1176. if(all_tasks_done):
  1177. cores_alloc = row['cores_alloc']
  1178. for n in nodes:
  1179. self.conf.db_cur.execute('SELECT alloc_cpus FROM node WHERE id = ?', (n,))
  1180. node_alloc = self.conf.db_cur.fetchall()
  1181. if(len(node_alloc) != 1):
  1182. self.conf.logger.error(f'Failed to get node allocation with node id {n} from database')
  1183. continue
  1184. alloc = node_alloc['alloc_cpus']
  1185. alloc = alloc - cores_alloc
  1186. if(alloc < 0):
  1187. alloc = 0
  1188. self.conf.db_cur.execute('UPDATE node SET alloc_cpus = ? where id = ?', (alloc, n))
  1189. except Exception as e:
  1190. self.conf.logger.error(f'Failed to update node allocation for jobid: {row["id"]} in database: {e}')
  1191. except Exception as e:
  1192. self.conf.logger.error(f'Failed to get all tasks for jobid: {row["id"]} from database: {e}')
  1193. except Exception as e:
  1194. self.conf.logger.error('Failed to get all running jobs from database: {}'.format(e))
  1195. def __update_running_job(self, jobid: str, host_key: str) -> None:
  1196. '''
  1197. Updates running jobs.
  1198. If there are still tasks that
  1199. has not been sent to nodes
  1200. this method sends a new task
  1201. to node that just terminanted
  1202. a task and is available for
  1203. another task.
  1204. '''
  1205. # test
  1206. print(f"uptading running task: {jobid}")
  1207. try:
  1208. self.conf.db_cur.execute('SELECT job.id,type, cores_alloc, user,task.nodeid AS nodeid FROM job INNER JOIN task ON job.id = task.jobid WHERE job.id = ?', (jobid,))
  1209. job = self.conf.db_cur.fetchall()
  1210. if(len(job) != 1):
  1211. self.conf.logger.error(f'Job not found in database where jobid: {jobid}, in __update_running_jobs')
  1212. else:
  1213. try:
  1214. self.conf.db_cur.execute('SELECT hostname,ip,port FROM validate_host INNER JOIN node ON validate_host.id = node.validate WHERE node.id = ?', (job[0]['nodeid'],))
  1215. node = self.conf.db_cur.fetchall()
  1216. except Exception as e:
  1217. self.conf.logger.error(f'Failed to get node where previous task executed from database where jobid: {job[0]["id"]}, nodeid: {job[0]["nodeid"]}, in __update_running_para_jobs: {e}')
  1218. try:
  1219. self.conf.db_cur.execute('SELECT id,command,seqno,workdir FROM task WHERE task.jobid = ? AND start_time IS NULL LIMIT 1', (job[0]['id'],))
  1220. unstarted_task = self.conf.db_cur.fetchall()
  1221. if(len(unstarted_task) > 0 and len(node) > 0):
  1222. task = [ unstarted_task[0] ]
  1223. self.__send_task_to_node(host_key, node[0]['hostname'], node[0]['ip'], node[0]['port'], job[0], task)
  1224. except Exception as e:
  1225. self.conf.logger.error(f'Failed to get all non started tasks from database where jobid: {job_type[0]["id"]}, in __update_running_para_jobs: {e}')
  1226. except Exception as e:
  1227. self.conf.logger.error(f'Failed to get job from database where jobid: {jobid}, in __update_running_para_jobs: {e}')
  1228. def __node_status_tasks(self, tasks: list[dict[str,str]], key: str) -> dict[Any, Any]:
  1229. '''
  1230. Updates database with task values received from node.
  1231. Updates job status and queue if needed.
  1232. '''
  1233. resp: dict[Any,Any] = {}
  1234. resp_err = ""
  1235. # update values in db
  1236. for t in tasks:
  1237. try:
  1238. self.conf.db_cur.execute('SELECT * FROM task where id = ? and jobid = ?', (t.get('taskid'), t.get('jobid')))
  1239. rows = self.conf.db_cur.fetchall()
  1240. if(len(rows) == 0):
  1241. resp['Err'] = 'Taskid: {}, Jobid: {} does not exist.'.format(t.get('taskid'), t.get('jobid'))
  1242. else:
  1243. if(t.get('sub_state') == "failed" or t.get('sub_state') == "exited"):
  1244. self.conf.db_cur.execute(
  1245. '''UPDATE task SET
  1246. unitname = ?,
  1247. start_time = ?,
  1248. end_time = ?,
  1249. mono_start_time = ?,
  1250. mono_end_time = ?,
  1251. active_state = ?,
  1252. sub_state = ?,
  1253. exit_code = ?,
  1254. stdout = ?,
  1255. stderr = ?
  1256. where id = ? and jobid = ?''',
  1257. (
  1258. t.get('unitname'),
  1259. t.get('start_time'),
  1260. t.get('end_time'),
  1261. t.get('mono_start_time'),
  1262. t.get('mono_end_time'),
  1263. t.get('active_state'),
  1264. t.get('sub_state'),
  1265. t.get('exit_code'),
  1266. t.get('stdout'),
  1267. t.get('stderr'),
  1268. t.get('taskid'),
  1269. t.get('jobid'),
  1270. ))
  1271. self.__update_running_job(t['jobid'], key)
  1272. # create
  1273. # self.__update_job_info(t.get('sub_state')), update task count exit, error aso
  1274. # self.__update_node_info....., update alloc if job is done
  1275. else:
  1276. self.conf.db_cur.execute(
  1277. '''UPDATE task SET
  1278. unitname = ?,
  1279. start_time = ?,
  1280. end_time = ?,
  1281. mono_start_time = ?,
  1282. mono_end_time = ?,
  1283. active_state = ?,
  1284. sub_state = ?,
  1285. exit_code = ?,
  1286. main_pid = ?,
  1287. memory_peak = ?,
  1288. pids_peak = ?,
  1289. stdout = ?,
  1290. stderr = ?
  1291. where id = ? and jobid = ?''',
  1292. (
  1293. t.get('unitname'),
  1294. t.get('start_time'),
  1295. t.get('end_time'),
  1296. t.get('mono_start_time'),
  1297. t.get('mono_end_time'),
  1298. t.get('active_state'),
  1299. t.get('sub_state'),
  1300. t.get('exit_code'),
  1301. t.get('main_pid'),
  1302. t.get('memory_peak'),
  1303. t.get('pids_peak'),
  1304. t.get('stdout'),
  1305. t.get('stderr'),
  1306. t.get('taskid'),
  1307. t.get('jobid'),
  1308. ))
  1309. except Exception as e:
  1310. self.conf.logger.error('Failed to save task to database, taskid: {}, jobid: {}, {}'
  1311. .format(t.get('taskid'), t.get('jobid'), e))
  1312. if(len(resp_err) == 0):
  1313. resp_err = f"Taskid: {t.get('taskid')}, Jobid: {t.get('jobid')}; "
  1314. else:
  1315. resp_err = resp_err + f"Taskid: {t.get('taskid')}, Jobid: {t.get('jobid')}; "
  1316. if(len(resp_err) != 0):
  1317. resp_err = "Failed to save tasks to database: " + resp_err
  1318. self.__update_jobs()
  1319. # Update and remove queue code, done in worker today
  1320. #try:
  1321. # c = self.conf.db_cur.execute('SELECT count(*) FROM node WHERE state = ?', (NodeState.ONLINE,))
  1322. # c_row = c.fetchall()
  1323. #
  1324. # if(len(c_row) == 1):
  1325. # if(len(self.update_from_hosts) >= c_row[0]['count(*)']):
  1326. # # TODO
  1327. # self.__update_jobs()
  1328. # #self.__update_queue()
  1329. # self.update_from_hosts.clear()
  1330. # else:
  1331. # # TODO
  1332. # self.__update_jobs()
  1333. # #self.__update_queue()
  1334. # self.update_from_hosts.clear()
  1335. #
  1336. #except Exception as e:
  1337. # self.conf.logger.error('Failed to get all nodes with state online from database, {}'.format(e))
  1338. # # TODO
  1339. # self.__update_jobs()
  1340. # #self.__update_queue()
  1341. # self.update_from_hosts.clear()
  1342. resp['validate'] = self.validate[key]
  1343. if(len(resp_err) != 0):
  1344. resp['result'] = { 'Err': resp_err }
  1345. else:
  1346. resp['result'] = { 'Ok': 'Tasks successfully updated' }
  1347. return resp
  1348. def wsgi_app(self, environ, start_response):
  1349. '''
  1350. wsgi app
  1351. Called on each request
  1352. '''
  1353. request = Request(environ)
  1354. response = self.despatch_request(request)
  1355. return response(environ, start_response)
  1356. def __call__(self, environ, start_response):
  1357. '''Called on each request'''
  1358. return self.wsgi_app(environ, start_response)
  1359. class ValidateD:
  1360. def __init__(self, conf):
  1361. '''
  1362. Init of validation daemon.
  1363. Prompt for a temporary password to be
  1364. used from client side.
  1365. '''
  1366. self.conf = conf
  1367. print('')
  1368. print('Validate server is running on port: {}'.format(self.conf.port))
  1369. print('')
  1370. print('#### INSTRUCTIONS ####')
  1371. print('Specify temporary password that should be used on the client host')
  1372. print('when promped for.')
  1373. pw = getpass.getpass('Enter temporary password: ')
  1374. self.password = pw
  1375. print('')
  1376. print('Go to client host or compute node and run as root:')
  1377. print('Client: l3q validate --port {}'.format(self.conf.port))
  1378. print('Node: node-l3qd --validate')
  1379. print('Enter the same password at the prompt.')
  1380. print('')
  1381. print('When command prints: Validation SUCCESS!!')
  1382. print('Validation is done and you can enter CTRL+C to stop this validation server.')
  1383. print('')
  1384. def despatch_request(self, request):
  1385. '''
  1386. Handles incomming requests and responds
  1387. depending if sent password was correct.
  1388. If invalid request an empty json dict is
  1389. returned.
  1390. '''
  1391. #Validation request from client
  1392. if(request.method == 'POST' and request.path == '/'):
  1393. try:
  1394. data = json.loads(request.data)
  1395. except Exception as e:
  1396. print('Error decoding request.data from json')
  1397. self.conf.logger.error('ValidateD: Decoding request.data from json: {}'.format(e))
  1398. return Response(json.dumps({}))
  1399. if(L3QAction.VALIDATE_KEY == data.get('action')):
  1400. if(data.get('password') == self.password):
  1401. source = string.ascii_letters + string.digits
  1402. key = ''.join((random.choice(source) for i in range(50)))
  1403. resp = {}
  1404. host = request.remote_addr
  1405. hostname = socket.gethostbyaddr(host)[0]
  1406. name = data.get('hostname')
  1407. try:
  1408. self.conf.db_cur.execute('''SELECT * FROM validate_host where ip = ? AND host_type = ?''', (host, "client"))
  1409. rows = self.conf.db_cur.fetchall()
  1410. if(len(rows) != 0):
  1411. self.conf.db_cur.execute('UPDATE validate_host SET key = ?, hostname = ?, name = ? WHERE ip = ?;', (key, hostname, name, host))
  1412. else:
  1413. self.conf.db_cur.execute('INSERT INTO validate_host (host_type, ip, hostname, name, key) values (?, ?, ?, ?, ?);', ("client", host, hostname, name, key))
  1414. except Exception as e:
  1415. resp['action'] = L3QAction.VALIDATE_KEY
  1416. resp['error'] = 'Error saving to database on validate server.'
  1417. print('Error saving to database on validate server.', file=sys.stderr)
  1418. self.conf.logger.error('ValidateD: Saving to database: {}'.format(e))
  1419. else:
  1420. resp['action'] = L3QAction.VALIDATE_KEY
  1421. resp['key'] = key
  1422. print('Validation SUCCESS with host: {}'.format(host))
  1423. print('Restart the l3q daemon: l3qd to use the new configuration')
  1424. else:
  1425. resp = {}
  1426. resp['action'] = L3QAction.VALIDATE_KEY
  1427. resp['error'] = "Wrong password"
  1428. print('Wrong password from host: {}'.format(request.host.split(':')[0]), file=sys.stderr)
  1429. else:
  1430. resp = {}
  1431. self.conf.logger.warning('ValidateD: Wrong json data from client: {}, data: {}'.format(request.host, data))
  1432. # Validation request from node-l3qd
  1433. elif(request.method == 'POST' and request.path == '/validate'):
  1434. try:
  1435. data = json.loads(request.data)
  1436. except Exception as e:
  1437. print('Error decoding request.data from json')
  1438. self.conf.logger.error('ValidateD: Decoding request.data from json: {}'.format(e))
  1439. return Response(json.dumps({}))
  1440. if(L3QAction.VALIDATE_KEY == data.get('action')):
  1441. if(data.get('password') == self.password):
  1442. source = string.ascii_letters + string.digits
  1443. l3qd_key = ''.join((random.choice(source) for i in range(50)))
  1444. key = data.get('validate')
  1445. resp = {}
  1446. host = request.remote_addr
  1447. hostname = socket.gethostbyaddr(host)[0]
  1448. name = data.get('hostname')
  1449. port = data.get('port')
  1450. try:
  1451. self.conf.db_cur.execute('''SELECT * FROM validate_host where ip = ? AND host_type = ?''', (host, "node"))
  1452. rows = self.conf.db_cur.fetchall()
  1453. if(len(rows) != 0):
  1454. self.conf.db_cur.execute('UPDATE validate_host SET key = ?, l3qd_key = ?, hostname = ?, port = ?, name = ? WHERE ip = ? AND host_type = ?;',
  1455. (key, l3qd_key, hostname, port, name, host, "node"))
  1456. else:
  1457. self.conf.db_cur.execute('INSERT INTO validate_host (host_type, ip, hostname, port, name, key, l3qd_key) values (?, ?, ?, ?, ?, ?, ?);',
  1458. ("node", host, hostname, port, name, key, l3qd_key))
  1459. self.conf.db_cur.execute('''SELECT id FROM validate_host where key = ? AND l3qd_key = ?''', (key, l3qd_key))
  1460. rows = self.conf.db_cur.fetchall()
  1461. if(len(rows) == 1):
  1462. val_id = rows[0]['id']
  1463. self.conf.db_cur.execute('SELECT * FROM node WHERE validate = ?',
  1464. (val_id,))
  1465. rows = self.conf.db_cur.fetchall()
  1466. if(len(rows) == 1):
  1467. self.conf.db_cur.execute('UPDATE node SET state = ? WHERE validate = ?',
  1468. (NodeState.OFFLINE, rows[0]['id']))
  1469. else:
  1470. self.conf.db_cur.execute('INSERT INTO node (state, validate) VALUES (?, ?)',
  1471. (NodeState.OFFLINE, val_id))
  1472. else:
  1473. print('Error fetching newly created record from database on validate server.', file=sys.stderr)
  1474. self.conf.logger.error('ValidateD: Error fetching newly created validate record in database')
  1475. except Exception as e:
  1476. resp['validate'] = { 'Err': 'Error saving to database on validate server.' }
  1477. print('Error saving to database on validate server.', file=sys.stderr)
  1478. self.conf.logger.error('ValidateD: Saving to database: {}'.format(e))
  1479. else:
  1480. resp['validate'] = { 'Ok': l3qd_key }
  1481. print('Validation SUCCESS with host: {}'.format(host))
  1482. print('Restart the l3q daemon: l3qd to use the new configuration')
  1483. else:
  1484. resp = {}
  1485. resp['validate'] = { 'Err': "Wrong password" }
  1486. print('Wrong password from node: {}'.format(request.host.split(':')[0]), file=sys.stderr)
  1487. else:
  1488. resp = {}
  1489. self.conf.logger.warning('ValidateD: Wrong json data from node: {}, data: {}'.format(request.host, data))
  1490. else:
  1491. resp = {}
  1492. self.conf.logger.warning('ValidateD: Wrong request method: {} or wrong path: {} from host: {}'.format(request.method, request.path, request.host))
  1493. return Response(json.dumps(resp))
  1494. def wsgi_app(self, environ, start_response):
  1495. '''
  1496. wsgi app
  1497. Called on each request
  1498. '''
  1499. request = Request(environ)
  1500. response = self.despatch_request(request)
  1501. return response(environ, start_response)
  1502. def __call__(self, environ, start_response):
  1503. '''Called on each request'''
  1504. return self.wsgi_app(environ, start_response)
  1505. def create_app(conf):
  1506. return L3Qd(conf)
  1507. def start_validate_server(conf):
  1508. '''
  1509. Will try to start the validate server.
  1510. Will start to try on a port number one
  1511. higher then l3q daemon and continue
  1512. to try and increase port number with one.
  1513. Until it works or if it has failed 20 times
  1514. it will exit with an error.
  1515. '''
  1516. from werkzeug.serving import run_simple
  1517. logging.getLogger('werkzeug').disabled = True
  1518. conf.port += 1
  1519. start_port = conf.port
  1520. while(conf.port <= start_port + 20):
  1521. try:
  1522. print('')
  1523. print('Starting validate server on port: {} ...'.format(conf.port))
  1524. run_simple('', conf.port, ValidateD(conf), use_debugger=False, ssl_context='adhoc')
  1525. break
  1526. except Exception as e:
  1527. print('Failed to start server on port {}.'.format(conf.port), file=sys.stderr)
  1528. print('{}'.format(e))
  1529. conf.port += 1
  1530. if(conf.port >= start_port + 21):
  1531. print('', file=sys.stderr)
  1532. print('Error starting validate server', file=sys.stderr)
  1533. print('Failed to start server on all ports in range: {} - {}'.format(start_port, conf.port - 1), file=sys.stderr)
  1534. exit(1)
  1535. def list_validate_host(conf):
  1536. '''
  1537. List IP numbers of all validated hosts
  1538. '''
  1539. try:
  1540. rows = conf.db_cur.execute('SELECT * FROM validate_host ORDER BY id')
  1541. headline = '=' * 91
  1542. line = '-' * 91
  1543. print(headline)
  1544. sfrmt='| {:^6s} | {:^9s} | {:17s} | {:30s} | {}'
  1545. print(sfrmt.format('Id', 'Host type', 'IP', 'Name', 'Hostname'))
  1546. print(headline)
  1547. for row in rows:
  1548. print(sfrmt.format(str(row['id']), row['host_type'],
  1549. row['ip'], row['name'], row['hostname']))
  1550. print(line)
  1551. except Exception as e:
  1552. print('Error retrieving validated IP number', file=sys.stderr)
  1553. print(e, file=sys.stderr)
  1554. exit(2)
  1555. def rm_validate_host(hostid, conf):
  1556. '''
  1557. Remove host with ID number from
  1558. validated hosts.
  1559. '''
  1560. try:
  1561. rows = conf.db_cur.execute('DELETE FROM validate_host where id = ?', (hostid,))
  1562. no_del = conf.db_cur.rowcount
  1563. if(no_del > 0):
  1564. print('Deleted {} host from validated hosts, with ID: {}.'.format(no_del, hostid))
  1565. else:
  1566. print('Failed to delete ID number: {} from validated hosts.'.format(hostid), file=sys.stderr)
  1567. exit(2)
  1568. except Exception as e:
  1569. print('Error deleting ID number: {} from validated hosts.'.format(hostid), file=sys.stderr)
  1570. print(e, file=sys.stderr)
  1571. exit(3)
  1572. def main():
  1573. '''
  1574. Main function where program starts.
  1575. Parse args and starts daemon in different
  1576. modes depending on arguments.
  1577. '''
  1578. from werkzeug.serving import run_simple
  1579. conf = ''
  1580. if(len(sys.argv) == 2 and sys.argv[1] == '--validate-host'):
  1581. conf = L3QdConfig()
  1582. '''User must be root or daemon user to run this command.'''
  1583. usr = pwd.getpwuid(os.getuid())
  1584. if(not ((usr[0] == 'root' and usr[2] == 0) or usr[0] == L3QdConfig.validate_user)):
  1585. print('Permission denied.', file=sys.stderr)
  1586. exit(2)
  1587. start_validate_server(conf)
  1588. exit(0)
  1589. elif(len(sys.argv) == 2 and sys.argv[1] == '--list-validate-host'):
  1590. '''User must be root or daemon user to run this command.'''
  1591. usr = pwd.getpwuid(os.getuid())
  1592. if(not ((usr[0] == 'root' and usr[2] == 0) or usr[0] == L3QdConfig.validate_user)):
  1593. print('Permission denied.', file=sys.stderr)
  1594. exit(2)
  1595. conf = L3QdConfig()
  1596. list_validate_host(conf)
  1597. exit(0)
  1598. elif(len(sys.argv) == 2 and sys.argv[1] == '--rm-validate-host'):
  1599. '''User must be root or daemon user to run this command.'''
  1600. usr = pwd.getpwuid(os.getuid())
  1601. if(not ((usr[0] == 'root' and usr[2] == 0) or usr[0] == L3QdConfig.validate_user)):
  1602. print('Permission denied.', file=sys.stderr)
  1603. exit(2)
  1604. print('l3qd --rm-validate-host requires the ID number of host as argument.')
  1605. exit(2)
  1606. elif(len(sys.argv) == 3 and sys.argv[1] == '--rm-validate-host'):
  1607. '''User must be root or daemon user to run this command.'''
  1608. usr = pwd.getpwuid(os.getuid())
  1609. if(not ((usr[0] == 'root' and usr[2] == 0) or usr[0] == L3QdConfig.validate_user)):
  1610. print('Permission denied.', file=sys.stderr)
  1611. exit(2)
  1612. if(not sys.argv[2].isnumeric()):
  1613. print('Argument must be a postive integer.')
  1614. exit(3)
  1615. if(sys.argv[2].isnumeric() and int(sys.argv[2]) <= 0):
  1616. print('Argument must be a postive integer.')
  1617. exit(4)
  1618. conf = L3QdConfig()
  1619. print('Removing validated host with ID number: {}'.format(sys.argv[2]))
  1620. rm_validate_host(sys.argv[2], conf)
  1621. exit(0)
  1622. elif(len(sys.argv) == 2 and (sys.argv[1] == '--help' or sys.argv[1] == '-h')):
  1623. print('Usage: l3qd [OPTION] ...')
  1624. print('')
  1625. print('Execute the l3q daemon. Starts in normal mode without arguments.')
  1626. print('')
  1627. print('Options:')
  1628. print(' --validate-host Executes the validate daemon.')
  1629. print(' Instructions will be printed at start up.')
  1630. print(' --list-validate-host List all validated hosts.')
  1631. print(' --rm-validate-host ID Removes specified validated host with specified ID.')
  1632. print(' --queue-info Prints information about available queues.')
  1633. print(' --version Prints version text and exit')
  1634. print(' -h, --help Prints this help text and exit')
  1635. print('')
  1636. print('More information is available in man pages:')
  1637. print('l3q(1), l3q.conf(5), l3qd(8), l3qd.conf(5)')
  1638. exit(0)
  1639. elif(len(sys.argv) == 2 and sys.argv[1] == '--test'):
  1640. conf = L3QdConfig(True)
  1641. elif(len(sys.argv) == 2 and sys.argv[1] == '--queue-info'):
  1642. print("Queues available in L3Q")
  1643. print("")
  1644. print("To use a different queue algorithm")
  1645. print("change the config file:")
  1646. print("/etc/l3q/l3qd.conf")
  1647. print("")
  1648. print("Set parameter (default value):")
  1649. print("queue = Default")
  1650. print("to the name of one of the described queue types below.")
  1651. print("")
  1652. print("Queues:")
  1653. queues = [m[0] for m in inspect.getmembers(queue, inspect.isclass)
  1654. if m[1].__module__ == 'libl3q.daemon.queue' and
  1655. issubclass(eval(f"queue.{m[0]}"), queue.Queue) and
  1656. m[0] != 'Queue']
  1657. for q in queues:
  1658. print(q)
  1659. doc = eval(f"queue.{q}.__doc__")
  1660. print(doc)
  1661. exit(0)
  1662. elif(len(sys.argv) == 2 and sys.argv[1] == '--version'):
  1663. ascii_l3q()
  1664. print('l3qd {}'.format(VERSION))
  1665. print('Copyright (C) 2023-2024 Marcus Pedersén')
  1666. print('License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>.')
  1667. print('This is free software: you are free to change and redistribute it.')
  1668. print('There is NO WARRANTY, to the extent permitted by law.')
  1669. print('')
  1670. print('Written by Marcus Pedersén.')
  1671. exit(0)
  1672. elif(len(sys.argv) >= 2):
  1673. print('Wrong arguments.')
  1674. print('Try -h or --help for help.')
  1675. exit(1)
  1676. else:
  1677. conf = L3QdConfig()
  1678. worker = L3QWorker(conf)
  1679. p_worker = Process(target=worker.run)
  1680. p_worker.start()
  1681. backup = L3QBackup(conf)
  1682. p_backup = Process(target=backup.schedule_backup)
  1683. p_backup.start()
  1684. app = create_app(conf)
  1685. try:
  1686. if(len(sys.argv) == 2 and sys.argv[1] == '--test'):
  1687. run_simple('', conf.port, app, use_debugger=True, ssl_context='adhoc')
  1688. else:
  1689. logging.getLogger('werkzeug').disabled = True
  1690. run_simple('', conf.port, app, use_debugger=False, ssl_context='adhoc')
  1691. conf.logger.info('L3q daemon terminated.')
  1692. p_worker.terminate()
  1693. p_worker.join()
  1694. p_backup.terminate()
  1695. p_backup.join()
  1696. except Exception as e:
  1697. print('Error starting l3q daemon', file=sys.stderr)
  1698. print('{}'.format(e))
  1699. p_worker.terminate()
  1700. p_worker.join()
  1701. exit(1)
  1702. if __name__ == '__main__':
  1703. main()