FOSSology  4.4.0
Open Source License Compliance by Open Source Software
job.c
Go to the documentation of this file.
1 /*
2  SPDX-FileCopyrightText: © 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3  SPDX-FileCopyrightText: © 2015 Siemens AG
4 
5  SPDX-License-Identifier: GPL-2.0-only
6 */
12 /* local includes */
13 #include <libfossrepo.h>
14 #include <agent.h>
15 #include <database.h>
16 #include <job.h>
17 #include <scheduler.h>
18 
19 /* std library includes */
20 #include <stdlib.h>
21 
22 /* unix library includes */
23 #include <sys/types.h>
24 #include <unistd.h>
25 #include <sys/time.h>
26 #include <sys/resource.h>
27 
28 /* other library includes */
29 #include <glib.h>
30 #include <gio/gio.h>
31 
32 #define TEST_NULV(j) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return; }
33 #define TEST_NULL(j, ret) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return ret; }
34 #define MAX_SQL 512;JOB_STATUS_TYPES
35 
36 /* ************************************************************************** */
37 /* **** Locals ************************************************************** */
38 /* ************************************************************************** */
39 
44 #define SELECT_STRING(passed) MK_STRING_LIT(JOB_##passed),
45 const char* job_status_strings[] = { JOB_STATUS_TYPES(SELECT_STRING) };
46 #undef SELECT_STRING
47 
60 static int is_active(int* job_id, job_t* job, int* counter)
61 {
62  if((job->running_agents != NULL || job->failed_agents != NULL) || job->id < 0)
63  ++(*counter);
64  return 0;
65 }
66 
80 static int job_sstatus(int* job_id, job_t* job, GOutputStream* ostr)
81 {
82  gchar* status_str = g_strdup_printf(
83  "job:%d status:%s type:%s, priority:%d running:%d finished:%d failed:%d\n",
84  job->id,
85  job_status_strings[job->status],
86  job->agent_type,
87  job->priority,
88  g_list_length(job->running_agents),
89  g_list_length(job->finished_agents),
90  g_list_length(job->failed_agents));
91 
92  V_JOB("JOB_STATUS: %s", status_str);
93  g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
94 
95  if(*job_id == 0)
96  g_list_foreach(job->running_agents, (GFunc)agent_print_status, ostr);
97 
98  g_free(status_str);
99  return 0;
100 }
101 
109 static void job_transition(scheduler_t* scheduler, job_t* job, job_status new_status)
110 {
111  /* book keeping */
112  TEST_NULV(job);
113  V_JOB("JOB[%d]: job status changed: %s => %s\n",
114  job->id, job_status_strings[job->status], job_status_strings[new_status]);
115 
116  /* change the job status */
117  job->status = new_status;
118 
119  /* only update database for real jobs */
120  if(job->id >= 0)
121  database_update_job(scheduler, job, new_status);
122 }
123 
135 static gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
136 {
137  return ((job_t*)a)->priority - ((job_t*)b)->priority;
138 }
139 
140 /* ************************************************************************** */
141 /* **** Constructor Destructor ********************************************** */
142 /* ************************************************************************** */
143 
164 job_t* job_init(GTree* job_list, GSequence* job_queue,
165  char* type, char* host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
166 {
167  job_t* job = g_new0(job_t, 1);
168 
169  job->agent_type = g_strdup(type);
170  job->required_host = g_strdup(host);
171  job->running_agents = NULL;
172  job->finished_agents = NULL;
173  job->failed_agents = NULL;
174  job->log = NULL;
175  job->status = JB_CHECKEDOUT;
176  job->data = NULL;
177  job->db_result = NULL;
178  job->lock = NULL;
179  job->idx = 0;
180  job->message = NULL;
181  job->priority = priority;
182  job->verbose = 0;
183  job->parent_id = parent_id;
184  job->id = id;
185  job->user_id = user_id;
186  job->group_id = group_id;
187  job->jq_cmd_args = g_strdup(jq_cmd_args);
188 
189  g_tree_insert(job_list, &job->id, job);
190  if(id >= 0) g_sequence_insert_sorted(job_queue, job, job_compare, NULL);
191  return job;
192 }
193 
201 void job_destroy(job_t* job)
202 {
203  TEST_NULV(job);
204 
205  if(job->db_result != NULL)
206  {
207  SafePQclear(job->db_result);
208 
209  // Lock the mutex to prevent clearing locked mutex
210  g_mutex_lock(job->lock);
211  g_mutex_unlock(job->lock);
212 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
213  g_mutex_clear(job->lock);
214 #else
215  g_mutex_free(job->lock);
216 #endif
217  }
218 
219  if(job->log)
220  log_destroy(job->log);
221 
222  g_list_free(job->running_agents);
223  g_list_free(job->finished_agents);
224  g_list_free(job->failed_agents);
225  g_free(job->message);
226  g_free(job->agent_type);
227  g_free(job->required_host);
228  g_free(job->data);
229  if (job->jq_cmd_args) g_free(job->jq_cmd_args);
230  g_free(job);
231 }
232 
233 /* ************************************************************************** */
234 /* **** Events ************************************************************** */
235 /* ************************************************************************** */
236 
244 void job_verbose_event(scheduler_t* scheduler, job_t* job)
245 {
246  GList* iter;
247 
248  TEST_NULV(job);
249  for(iter = job->running_agents; iter != NULL; iter = iter->next)
250  aprintf(iter->data, "VERBOSE %d\n", job->verbose);
251 }
252 
266 void job_status_event(scheduler_t* scheduler, arg_int* params)
267 {
268  const char end[] = "end\n";
269  GError* error = NULL;
270 
271  int tmp = 0;
272  char buf[1024];
273 
274  if(!params->second)
275  {
276  memset(buf, '\0', sizeof(buf));
277  sprintf(buf, "scheduler:%d revision:%s daemon:%d jobs:%d log:%s port:%d verbose:%d\n",
278  scheduler->s_pid, fo_config_get(scheduler->sysconfig, "BUILD", "COMMIT_HASH", &error),
279  scheduler->s_daemon, g_tree_nnodes(scheduler->job_list), main_log->log_name,
280  scheduler->i_port, verbose);
281 
282  g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
283  g_tree_foreach(scheduler->job_list, (GTraverseFunc)job_sstatus, params->first);
284  }
285  else
286  {
287  job_t* stat = g_tree_lookup(scheduler->job_list, &params->second);
288  if(stat)
289  {
290  job_sstatus(&tmp, g_tree_lookup(scheduler->job_list, &params->second), params->first);
291  }
292  else
293  {
294  sprintf(buf, "ERROR: invalid job id = %d\n", params->second);
295  g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
296  }
297  }
298 
299  g_output_stream_write(params->first, end, sizeof(end), NULL, NULL);
300  g_free(params);
301 }
302 
312 void job_pause_event(scheduler_t* scheduler, arg_int* params)
313 {
314  job_t tmp_job;
315  job_t* job = params->first;
316  GList* iter;
317 
318  // if the job doesn't exist, create a fake
319  if(params->first == NULL)
320  {
321  tmp_job.id = params->second;
322  tmp_job.status = JB_NOT_AVAILABLE;
323  tmp_job.running_agents = NULL;
324  tmp_job.message = NULL;
325 
326  job = &tmp_job;
327  }
328 
329  job_transition(scheduler, job, JB_PAUSED);
330  for(iter = job->running_agents; iter != NULL; iter = iter->next)
331  agent_pause(iter->data);
332 
333  g_free(params);
334 }
335 
342 void job_restart_event(scheduler_t* scheduler, arg_int* params)
343 {
344  job_t tmp_job;
345  job_t* job = params->first;
346  GList* iter;
347 
348  // if the job doesn't exist, create a fake
349  if(job == NULL)
350  {
351  tmp_job.id = params->second;
352  tmp_job.status = JB_PAUSED;
353  tmp_job.running_agents = NULL;
354  tmp_job.message = NULL;
355 
356  event_signal(database_update_event, NULL);
357  job = &tmp_job;
358  }
359 
360  if(job->status != JB_PAUSED)
361  {
362  ERROR("attempt to restart job %d failed, job status was %s",
363  job->id, job_status_strings[job->status]);
364  g_free(params);
365  return;
366  }
367 
368  for(iter = job->running_agents; iter != NULL; iter = iter->next)
369  {
370  if(job->db_result != NULL) agent_write(iter->data, "OK\n", 3);
371  agent_unpause(iter->data);
372  }
373 
374  job_transition(scheduler, job, JB_RESTART);
375  g_free(params);
376 }
377 
385 void job_priority_event(scheduler_t* scheduler, arg_int* params)
386 {
387  GList* iter;
388 
389  database_job_priority(scheduler, params->first, params->second);
390  ((job_t*)params->first)->priority = params->second;
391  for(iter = ((job_t*)params->first)->running_agents; iter; iter = iter->next)
392  setpriority(PRIO_PROCESS, ((agent_t*)iter->data)->pid, params->second);
393  g_free(params);
394 }
395 
406 void job_fail_event(scheduler_t* scheduler, job_t* job)
407 {
408  GList* iter;
409 
410  if(job->status != JB_FAILED)
411  job_transition(scheduler, job, JB_FAILED);
412 
413  for(iter = job->running_agents; iter != NULL; iter = iter->next)
414  {
415  V_JOB("JOB[%d]: job failed, killing agents\n", job->id);
416  agent_kill(iter->data);
417  }
418 }
419 
420 /* ************************************************************************** */
421 /* **** Functions *********************************************************** */
422 /* ************************************************************************** */
423 
434 void job_add_agent(job_t* job, void* agent)
435 {
436  TEST_NULV(job);
437  TEST_NULV(agent);
438  job->running_agents = g_list_append(job->running_agents, agent);
439 }
440 
449 void job_remove_agent(job_t* job, GTree* job_list, void* agent)
450 {
451  GList* curr;
452  TEST_NULV(job);
453 
454  if(job->finished_agents && agent)
455  job->finished_agents = g_list_remove(job->finished_agents, agent);
456 
457  if(job->finished_agents == NULL && (job->status == JB_COMPLETE || job->status == JB_FAILED))
458  {
459  V_JOB("JOB[%d]: job removed from system\n", job->id);
460 
461  for(curr = job->running_agents; curr != NULL; curr = curr->next)
462  ((agent_t*)curr->data)->owner = NULL;
463  for(curr = job->failed_agents; curr != NULL; curr = curr->next)
464  ((agent_t*)curr->data)->owner = NULL;
465  for(curr = job->finished_agents; curr != NULL; curr = curr->next)
466  ((agent_t*)curr->data)->owner = NULL;
467 
468  g_tree_remove(job_list, &job->id);
469  }
470 }
471 
478 void job_finish_agent(job_t* job, void* agent)
479 {
480  TEST_NULV(job);
481  TEST_NULV(agent);
482 
483  job->running_agents = g_list_remove(job->running_agents, agent);
484  job->finished_agents = g_list_append(job->finished_agents, agent);
485 }
486 
493 void job_fail_agent(job_t* job, void* agent)
494 {
495  TEST_NULV(job);
496  TEST_NULV(agent);
497  job->running_agents = g_list_remove(job->running_agents, agent);
498  job->failed_agents = g_list_append(job->failed_agents, agent);
499 }
500 
511 void job_set_data(scheduler_t* scheduler, job_t* job, char* data, int sql)
512 {
513  job->data = g_strdup(data);
514  job->idx = 0;
515 
516  if(sql)
517  {
518  // TODO
519  //j->db_result = PQexec(db_conn, j->data);
520  //j->lock = g_mutex_new();
521  }
522 }
523 
531 void job_update(scheduler_t* scheduler, job_t* job)
532 {
533  GList* iter;
534  int finished = 1;
535 
536  TEST_NULV(job)
537 
538  for(iter = job->running_agents; iter != NULL; iter = iter->next)
539  if(((agent_t*)iter->data)->status != AG_PAUSED)
540  finished = 0;
541 
542  if(job->status != JB_PAUSED && job->status != JB_COMPLETE && finished)
543  {
544  if(job->failed_agents == NULL)
545  {
546  job_transition(scheduler, job, JB_COMPLETE);
547  for(iter = job->finished_agents; iter != NULL; iter = iter->next)
548  {
549  aprintf(iter->data, "CLOSE\n");
550  }
551  }
552  /* this indicates a failed agent */
553  else
554  {
555  g_list_free(job->failed_agents);
556  job->failed_agents = NULL;
557  job->message = NULL;
558  job_fail_event(scheduler, job);
559  }
560  }
561 }
562 
570 int job_is_open(scheduler_t* scheduler, job_t* job)
571 {
572  /* local */
573  int retval = 0;
574 
575  TEST_NULL(job, -1);
576 
577  /* check to make sure that the job status is correct */
578  if(job->status == JB_CHECKEDOUT)
579  job_transition(scheduler, job, JB_STARTED);
580 
581  /* check to see if we even need to worry about sql stuff */
582  if(job->db_result == NULL)
583  return (job->idx == 0 && job->data != NULL);
584 
585  g_mutex_lock(job->lock);
586  if(job->idx < PQntuples(job->db_result))
587  {
588  retval = 1;
589  }
590  else
591  {
592  SafePQclear(job->db_result);
593  job->db_result = database_exec(scheduler, job->data);
594  job->idx = 0;
595 
596  retval = PQntuples(job->db_result) != 0;
597  }
598 
599  g_mutex_unlock(job->lock);
600  return retval;
601 }
602 
610 char* job_next(job_t* job)
611 {
612  char* retval = NULL;
613 
614  TEST_NULL(job, NULL);
615  if(job->db_result == NULL)
616  {
617  job->idx = 1;
618  return job->data;
619  }
620 
621  g_mutex_lock(job->lock);
622 
623  if(job->idx < PQntuples(job->db_result))
624  retval = PQgetvalue(job->db_result, job->idx++, 0);
625 
626  g_mutex_unlock(job->lock);
627  return retval;
628 }
629 
638 {
639  FILE* file;
640  gchar* file_name;
641  gchar* file_path;
642 
643  if(job->id < 0)
644  return main_log;
645 
646  if(job->log)
647  return job->log;
648 
649  file_name = g_strdup_printf("%06d", job->id);
650  file_path = fo_RepMkPath("logs", file_name);
651 
652  if((file = fo_RepFwrite("logs", file_name)) == NULL)
653  {
654  ERROR("JOB[%d]: job unable to create log file: %s\n", job->id, file_path);
655  g_free(file_name);
656  free(file_path);
657  return NULL;
658  }
659 
660  V_JOB("JOB[%d]: job created log file:\n %s\n", job->id, file_path);
661  database_job_log(job->id, file_path);
662  job->log = log_new_FILE(file, file_name, job->agent_type, 0);
663 
664  g_free(file_name);
665  free(file_path);
666  return job->log;
667 }
668 
669 /* ************************************************************************** */
670 /* **** Job list Functions ************************************************** */
671 /* ************************************************************************** */
672 
681 job_t* next_job(GSequence* job_queue)
682 {
683  job_t* retval = NULL;
684  GSequenceIter* beg = g_sequence_get_begin_iter(job_queue);
685 
686  if(g_sequence_get_length(job_queue) != 0)
687  {
688  retval = g_sequence_get(beg);
689  g_sequence_remove(beg);
690  }
691 
692  return retval;
693 }
694 
701 job_t* peek_job(GSequence* job_queue)
702 {
703  GSequenceIter* beg;
704 
705  if(g_sequence_get_length(job_queue) == 0)
706  {
707  return NULL;
708  }
709 
710  beg = g_sequence_get_begin_iter(job_queue);
711  return g_sequence_get(beg);
712 }
713 
720 uint32_t active_jobs(GTree* job_list)
721 {
722  int count = 0;
723  g_tree_foreach(job_list, (GTraverseFunc)is_active, &count);
724  return count;
725 }
726 
int aprintf(agent_t *agent, const char *fmt,...)
Definition: agent.c:1238
ssize_t agent_write(agent_t *agent, const void *buf, int count)
Definition: agent.c:1274
void agent_kill(agent_t *agent)
Unclean kill of an agent.
Definition: agent.c:1223
void agent_pause(agent_t *agent)
Definition: agent.c:1164
#define TEST_NULL(a, ret)
Test if paramater is NULL.
Definition: agent.c:57
void agent_unpause(agent_t *agent)
Definition: agent.c:1177
#define TEST_NULV(a)
Test if paramater is NULL.
Definition: agent.c:47
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Definition: agent.c:1193
Header file with agent related operations.
int verbose
The verbose flag for the cli.
Definition: fo_cli.c:38
char * fo_config_get(fo_conf *conf, const char *group, const char *key, GError **error)
Gets an element based on its group name and key name. If the group or key is not found,...
Definition: fossconfig.c:336
void job_fail_agent(job_t *job, void *agent)
Definition: job.c:493
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:406
static gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
Used to compare two different jobs in the priority queue.
Definition: job.c:135
void job_restart_event(scheduler_t *scheduler, arg_int *params)
Definition: job.c:342
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
Definition: job.c:164
job_t * next_job(GSequence *job_queue)
Gets the next job from the job queue.
Definition: job.c:681
void job_update(scheduler_t *scheduler, job_t *job)
Definition: job.c:531
static int is_active(int *job_id, job_t *job, int *counter)
Tests if a job is active.
Definition: job.c:60
static void job_transition(scheduler_t *scheduler, job_t *job, job_status new_status)
Definition: job.c:109
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
Definition: job.c:570
void job_set_data(scheduler_t *scheduler, job_t *job, char *data, int sql)
Definition: job.c:511
char * job_next(job_t *job)
Definition: job.c:610
job_t * peek_job(GSequence *job_queue)
Gets the job that is at the top of the queue if there is one.
Definition: job.c:701
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
Definition: job.c:449
void job_verbose_event(scheduler_t *scheduler, job_t *job)
Definition: job.c:244
void job_pause_event(scheduler_t *scheduler, arg_int *params)
Event to pause a job.
Definition: job.c:312
void job_status_event(scheduler_t *scheduler, arg_int *params)
Event to get the status of the scheduler or a specific job.
Definition: job.c:266
void job_destroy(job_t *job)
Definition: job.c:201
log_t * job_log(job_t *job)
Definition: job.c:637
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
Definition: job.c:434
void job_priority_event(scheduler_t *scheduler, arg_int *params)
Definition: job.c:385
void job_finish_agent(job_t *job, void *agent)
Definition: job.c:478
static int job_sstatus(int *job_id, job_t *job, GOutputStream *ostr)
Prints the jobs status to the output stream.
Definition: job.c:80
#define SELECT_STRING(passed)
Definition: job.c:44
uint32_t active_jobs(GTree *job_list)
Gets the number of jobs that are not paused.
Definition: job.c:720
FILE * fo_RepFwrite(char *Type, char *Filename)
Perform an fwrite. Also creates directories.
Definition: libfossrepo.c:710
char * fo_RepMkPath(const char *Type, char *Filename)
Given a filename, construct the full path to the file.
Definition: libfossrepo.c:352
log_t * main_log
Definition: logging.c:33
log_t * log_new_FILE(FILE *log_file, gchar *log_name, gchar *pro_name, pid_t pro_pid)
Creates a log file structure based on an already created FILE*.
Definition: logging.c:126
void log_destroy(log_t *log)
Free memory associated with the log file.
Definition: logging.c:150
#define ERROR(...)
Definition: logging.h:79
PGresult * database_exec(scheduler_t *scheduler, const char *sql)
Executes an sql statement for the scheduler.
Definition: database.c:803
void database_job_priority(scheduler_t *scheduler, job_t *job, int priority)
Changes the priority of a job queue entry in the database.
Definition: database.c:1021
void database_update_job(scheduler_t *scheduler, job_t *job, job_status status)
Change the status of a job in the database.
Definition: database.c:945
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:841
void database_job_log(int j_id, char *log_name)
Enters the name of the log file for a job into the database.
Definition: database.c:1006
Header file for the scheduler.
#define SafePQclear(pgres)
Definition: scheduler.h:129
Definition: agent.h:100
Definition: event.h:46
The job structure.
Definition: job.h:51
int32_t id
The identifier for this job.
Definition: job.h:73
GMutex * lock
Lock to maintain data integrity.
Definition: job.h:65
job_status status
The current status for the job.
Definition: job.h:61
uint32_t idx
The current index into the sql results.
Definition: job.h:66
int32_t group_id
The id of the group that created the job.
Definition: job.h:75
char * required_host
If not NULL, this job must run on a specific host machine.
Definition: job.h:54
GList * finished_agents
The list of agents that have completed their tasks.
Definition: job.h:56
gchar * message
Message that will be sent with job notification email.
Definition: job.h:69
gchar * jq_cmd_args
Command line arguments for this job.
Definition: job.h:63
int32_t user_id
The id of the user that created the job.
Definition: job.h:74
GList * failed_agents
The list of agents that failed while working.
Definition: job.h:57
char * agent_type
The type of agent used to analyze the data.
Definition: job.h:53
gchar * data
The data associated with this job (jq_args)
Definition: job.h:62
int32_t priority
Importance of the job, maps directory to unix priority.
Definition: job.h:70
int32_t parent_id
The identifier for the parent of this job (its queue id)
Definition: job.h:72
GList * running_agents
The list of agents assigned to this job that are still working.
Definition: job.h:55
PGresult * db_result
Results from the sql query (if any)
Definition: job.h:64
log_t * log
The log to print any agent logging messages to.
Definition: job.h:58
int32_t verbose
The verbose level for all of the agents in this job.
Definition: job.h:71
Definition: logging.h:35
gchar * log_name
The name of the log file that will be printed to.
Definition: logging.h:36
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:172
gboolean s_pid
The pid of the scheduler process.
Definition: scheduler.h:143
gboolean s_daemon
Is the scheduler being run as a daemon.
Definition: scheduler.h:144
fo_conf * sysconfig
Configuration information loaded from the configuration file.
Definition: scheduler.h:149
uint16_t i_port
The port that the scheduler is listening on.
Definition: scheduler.h:166