2 Комити 29cddef5d1 ... c05c34bb49

Аутор SHA1 Порука Датум
  marcux c05c34bb49 Updated with gofmt пре 4 месеци
  marcux fb45ab964e Added updateQueue for worker пре 4 месеци
1 измењених фајлова са 133 додато и 2 уклоњено
  1. 133 2
      go/internal/daemon/worker/worker.go

+ 133 - 2
go/internal/daemon/worker/worker.go

@@ -19,6 +19,7 @@
 package worker
 
 import (
+	"bytes"
 	"encoding/json"
 	"fmt"
 	"gorm.io/gorm"
@@ -303,6 +304,7 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
 				if j.Type == common.JobTypeSequence {
 					updJob := common.Job{
 						Id:             j.Id,
+						Type:           j.Type,
 						StartDate:      now.Format("2006-01-02 15:04:05"),
 						NoTasksRunning: j.NoTasksRunning + 1,
 						Nodes:          nStr,
@@ -320,6 +322,11 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
 						r.node = common.Node{
 							Id:        n.Id,
 							CpusAlloc: n.CpusAlloc + j.CoresAlloc,
+							Ip:        n.Ip,
+							Hostname:  n.Hostname,
+							Port:      n.Port,
+							Key:       n.Key,
+							DaemonKey: n.DaemonKey,
 						}
 
 						r.tasks = append(r.tasks, updTask)
@@ -342,6 +349,11 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
 						r.node = common.Node{
 							Id:        n.Id,
 							CpusAlloc: n.CpusAlloc + j.CoresAlloc,
+							Ip:        n.Ip,
+							Hostname:  n.Hostname,
+							Port:      n.Port,
+							Key:       n.Key,
+							DaemonKey: n.DaemonKey,
 						}
 
 						if _, ok := nodesRun[n.Id]; ok == true {
@@ -363,6 +375,7 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
 					if len(j.Tasks) == 1 {
 						updJob := common.Job{
 							Id:             j.Id,
+							Type:           j.Type,
 							StartDate:      j.StartDate,
 							NoTasksRunning: j.NoTasksRunning,
 							Nodes:          j.Nodes,
@@ -401,6 +414,120 @@ func queueDefaultFill(jobs []common.Job, nodes []common.Node, defaultQ bool) (ma
 // and send jobs in map
 // to specified node
 func startJobs(run map[int]Run, updateJobs []common.Job) []common.Job {
+	failedJobs := []int{}
+
+	for _, r := range run {
+		// check jobid list on tasks that should
+		// be removed from r.tasks
+		for _, f := range failedJobs {
+			for {
+				i := slices.IndexFunc(r.tasks, func(t common.Task) bool {
+					return int(t.JobID) == f
+				})
+
+				if i != -1 {
+					slices.Delete(r.tasks, i, i+1)
+				} else {
+					break
+				}
+			}
+		}
+
+		url := fmt.Sprintf("http://%s:%d/task", r.node.Hostname, r.node.Port)
+		ipUrl := fmt.Sprintf("http://%s:%d/task", r.node.Ip, r.node.Port)
+		jBody, err := json.Marshal(r.tasks)
+		if err != nil {
+			slog.Error(fmt.Sprintf("Worker: Failed to convert task list to json for nodeid: %d, %s", r.node.Id, err))
+			// update job
+			for _, t := range r.tasks {
+				j := slices.IndexFunc(updateJobs, func(j common.Job) bool {
+					return j.Id == int(t.JobID)
+				})
+
+				if j != -1 {
+					job := updateJobs[j]
+					if job.Type == common.JobTypeParallel {
+						if !slices.Contains(failedJobs, job.Id) {
+							failedJobs = append(failedJobs, job.Id)
+						}
+					}
+
+					// delete job from updateJobs
+					if j == len(updateJobs)-1 {
+						updateJobs = updateJobs[:j]
+					} else if j == len(updateJobs)-2 {
+						updateJobs = append(updateJobs[:j], updateJobs[j+1])
+					} else {
+						updateJobs = append(updateJobs[:j], updateJobs[j+1:len(updateJobs)-1]...)
+					}
+				}
+			}
+			continue
+		}
+
+		byteJbody := bytes.NewBuffer(jBody)
+		resp, pErr := http.Post(url, "application/json", byteJbody)
+		if pErr != nil {
+			resp, pErr = http.Post(ipUrl, "application/json", byteJbody)
+			if pErr != nil {
+				slog.Error(fmt.Sprintf("Worker: Failed to send job to nodeid: %d, url: %s, %s, %s", r.node.Id, ipUrl, resp.Status, pErr))
+				// update job
+				for _, t := range r.tasks {
+					j := slices.IndexFunc(updateJobs, func(j common.Job) bool {
+						return j.Id == int(t.JobID)
+					})
+
+					if j != -1 {
+						job := updateJobs[j]
+						if job.Type == common.JobTypeParallel {
+							if !slices.Contains(failedJobs, job.Id) {
+								failedJobs = append(failedJobs, job.Id)
+							}
+						}
+
+						// delete job from updateJobs
+						if j == len(updateJobs)-1 {
+							updateJobs = updateJobs[:j]
+						} else if j == len(updateJobs)-2 {
+							updateJobs = append(updateJobs[:j], updateJobs[j+1])
+						} else {
+							updateJobs = append(updateJobs[:j], updateJobs[j+1:len(updateJobs)-1]...)
+						}
+					}
+				}
+				continue
+			}
+		}
+
+		if resp.StatusCode != 200 {
+			slog.Error(fmt.Sprintf("Worker: Failed to send job to nodeid: %d, url: %s, wrong status code in response: %s", r.node.Id, url, resp.Status))
+			// update job
+			for _, t := range r.tasks {
+				j := slices.IndexFunc(updateJobs, func(j common.Job) bool {
+					return j.Id == int(t.JobID)
+				})
+
+				if j != -1 {
+					job := updateJobs[j]
+					if job.Type == common.JobTypeParallel {
+						if !slices.Contains(failedJobs, job.Id) {
+							failedJobs = append(failedJobs, job.Id)
+						}
+					}
+
+					// delete job from updateJobs
+					if j == len(updateJobs)-1 {
+						updateJobs = updateJobs[:j]
+					} else if j == len(updateJobs)-2 {
+						updateJobs = append(updateJobs[:j], updateJobs[j+1])
+					} else {
+						updateJobs = append(updateJobs[:j], updateJobs[j+1:len(updateJobs)-1]...)
+					}
+				}
+			}
+		}
+	}
+
 	return updateJobs
 }
 
@@ -432,9 +559,13 @@ func updateQueue(db *gorm.DB, queue string) {
 	}
 
 	updateJobs = startJobs(run, updateJobs)
-	// TODO
-	//db.....updateJobs(updateJobs)
 
+	for _, j := range updateJobs {
+		err := sql.UpdateJob(db, j.Id, j)
+		if err != nil {
+			slog.Error(fmt.Sprintf("Worker: Failed to update just started jobs: %s", err))
+		}
+	}
 }
 
 // Starts and run the worker