|
@@ -19,6 +19,7 @@
|
|
|
package worker
|
|
|
|
|
|
import (
|
|
|
+ "bytes"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
"gorm.io/gorm"
|
|
@@ -303,6 +304,7 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
|
|
|
if j.Type == common.JobTypeSequence {
|
|
|
updJob := common.Job{
|
|
|
Id: j.Id,
|
|
|
+ Type: j.Type,
|
|
|
StartDate: now.Format("2006-01-02 15:04:05"),
|
|
|
NoTasksRunning: j.NoTasksRunning + 1,
|
|
|
Nodes: nStr,
|
|
@@ -320,6 +322,11 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
|
|
|
r.node = common.Node{
|
|
|
Id: n.Id,
|
|
|
CpusAlloc: n.CpusAlloc + j.CoresAlloc,
|
|
|
+ Ip: n.Ip,
|
|
|
+ Hostname: n.Hostname,
|
|
|
+ Port: n.Port,
|
|
|
+ Key: n.Key,
|
|
|
+ DaemonKey: n.DaemonKey,
|
|
|
}
|
|
|
|
|
|
r.tasks = append(r.tasks, updTask)
|
|
@@ -342,6 +349,11 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
|
|
|
r.node = common.Node{
|
|
|
Id: n.Id,
|
|
|
CpusAlloc: n.CpusAlloc + j.CoresAlloc,
|
|
|
+ Ip: n.Ip,
|
|
|
+ Hostname: n.Hostname,
|
|
|
+ Port: n.Port,
|
|
|
+ Key: n.Key,
|
|
|
+ DaemonKey: n.DaemonKey,
|
|
|
}
|
|
|
|
|
|
if _, ok := nodesRun[n.Id]; ok == true {
|
|
@@ -363,6 +375,7 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
|
|
|
if len(j.Tasks) == 1 {
|
|
|
updJob := common.Job{
|
|
|
Id: j.Id,
|
|
|
+ Type: j.Type,
|
|
|
StartDate: j.StartDate,
|
|
|
NoTasksRunning: j.NoTasksRunning,
|
|
|
Nodes: j.Nodes,
|
|
@@ -401,6 +414,120 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
|
|
|
// and send jobs in map
|
|
|
// to specified node
|
|
|
func startJobs(run map[int]Run, updateJobs []common.Job) []common.Job {
|
|
|
+ failedJobs := []int{}
|
|
|
+
|
|
|
+ for _, r := range run {
|
|
|
+ // check jobid list on tasks that should
|
|
|
+ // be removed from r.tasks
|
|
|
+ for _, f := range failedJobs {
|
|
|
+ for {
|
|
|
+ i := slices.IndexFunc(r.tasks, func(t common.Task) bool {
|
|
|
+ return int(t.JobID) == f
|
|
|
+ })
|
|
|
+
|
|
|
+ if i != -1 {
|
|
|
+ slices.Delete(r.tasks, i, i+1)
|
|
|
+ } else {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ url := fmt.Sprintf("http://%s:%d/task", r.node.Hostname, r.node.Port)
|
|
|
+ ipUrl := fmt.Sprintf("http://%s:%d/task", r.node.Ip, r.node.Port)
|
|
|
+ jBody, err := json.Marshal(r.tasks)
|
|
|
+ if err != nil {
|
|
|
+ slog.Error(fmt.Sprintf("Worker: Failed to convert task list to json for nodeid: %d, %s", r.node.Id, err))
|
|
|
+ // update job
|
|
|
+ for _, t := range r.tasks {
|
|
|
+ j := slices.IndexFunc(updateJobs, func(j common.Job) bool {
|
|
|
+ return j.Id == int(t.JobID)
|
|
|
+ })
|
|
|
+
|
|
|
+ if j != -1 {
|
|
|
+ job := updateJobs[j]
|
|
|
+ if job.Type == common.JobTypeParallel {
|
|
|
+ if !slices.Contains(failedJobs, job.Id) {
|
|
|
+ failedJobs = append(failedJobs, job.Id)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // delete job from updateJobs
|
|
|
+ if j == len(updateJobs)-1 {
|
|
|
+ updateJobs = updateJobs[:j]
|
|
|
+ } else if j == len(updateJobs)-2 {
|
|
|
+ updateJobs = append(updateJobs[:j], updateJobs[j+1])
|
|
|
+ } else {
|
|
|
+ updateJobs = append(updateJobs[:j], updateJobs[j+1:len(updateJobs)-1]...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ byteJbody := bytes.NewBuffer(jBody)
|
|
|
+ resp, pErr := http.Post(url, "application/json", byteJbody)
|
|
|
+ if pErr != nil {
|
|
|
+ resp, pErr = http.Post(ipUrl, "application/json", byteJbody)
|
|
|
+ if pErr != nil {
|
|
|
+ slog.Error(fmt.Sprintf("Worker: Failed to send job to nodeid: %d, url: %s, %s, %s", r.node.Id, ipUrl, resp.Status, pErr))
|
|
|
+ // update job
|
|
|
+ for _, t := range r.tasks {
|
|
|
+ j := slices.IndexFunc(updateJobs, func(j common.Job) bool {
|
|
|
+ return j.Id == int(t.JobID)
|
|
|
+ })
|
|
|
+
|
|
|
+ if j != -1 {
|
|
|
+ job := updateJobs[j]
|
|
|
+ if job.Type == common.JobTypeParallel {
|
|
|
+ if !slices.Contains(failedJobs, job.Id) {
|
|
|
+ failedJobs = append(failedJobs, job.Id)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // delete job from updateJobs
|
|
|
+ if j == len(updateJobs)-1 {
|
|
|
+ updateJobs = updateJobs[:j]
|
|
|
+ } else if j == len(updateJobs)-2 {
|
|
|
+ updateJobs = append(updateJobs[:j], updateJobs[j+1])
|
|
|
+ } else {
|
|
|
+ updateJobs = append(updateJobs[:j], updateJobs[j+1:len(updateJobs)-1]...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if resp.StatusCode != 200 {
|
|
|
+ slog.Error(fmt.Sprintf("Worker: Failed to send job to nodeid: %d, url: %s, wrong status code in response: %s", r.node.Id, url, resp.Status))
|
|
|
+ // update job
|
|
|
+ for _, t := range r.tasks {
|
|
|
+ j := slices.IndexFunc(updateJobs, func(j common.Job) bool {
|
|
|
+ return j.Id == int(t.JobID)
|
|
|
+ })
|
|
|
+
|
|
|
+ if j != -1 {
|
|
|
+ job := updateJobs[j]
|
|
|
+ if job.Type == common.JobTypeParallel {
|
|
|
+ if !slices.Contains(failedJobs, job.Id) {
|
|
|
+ failedJobs = append(failedJobs, job.Id)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // delete job from updateJobs
|
|
|
+ if j == len(updateJobs)-1 {
|
|
|
+ updateJobs = updateJobs[:j]
|
|
|
+ } else if j == len(updateJobs)-2 {
|
|
|
+ updateJobs = append(updateJobs[:j], updateJobs[j+1])
|
|
|
+ } else {
|
|
|
+ updateJobs = append(updateJobs[:j], updateJobs[j+1:len(updateJobs)-1]...)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
return updateJobs
|
|
|
}
|
|
|
|
|
@@ -432,9 +559,13 @@ func updateQueue(db *gorm.DB, queue string) {
|
|
|
}
|
|
|
|
|
|
updateJobs = startJobs(run, updateJobs)
|
|
|
- // TODO
|
|
|
- //db.....updateJobs(updateJobs)
|
|
|
|
|
|
+ for _, j := range updateJobs {
|
|
|
+ err := sql.UpdateJob(db, j.Id, j)
|
|
|
+ if err != nil {
|
|
|
+ slog.Error(fmt.Sprintf("Worker: Failed to update just started jobs: %s", err))
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Starts and run the worker
|