FOSSology  4.7.1
Open Source License Compliance by Open Source Software
agent.c
Go to the documentation of this file.
1 /*
2  SPDX-FileCopyrightText: © 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3  SPDX-FileCopyrightText: © 2015 Siemens AG
4 
5  SPDX-License-Identifier: GPL-2.0-only
6 */
7 
15 /* local includes */
16 #include <agent.h>
17 #include <database.h>
18 #include <event.h>
19 #include <host.h>
20 #include <job.h>
21 #include <logging.h>
22 #include <scheduler.h>
23 
24 /* library includes */
25 #include <limits.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <time.h>
30 
31 /* unix library includes */
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 /* other library includes */
39 #include <glib.h>
40 
47 #define TEST_NULV(a) if(!a) { \
48  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; }
57 #define TEST_NULL(a, ret) if(!a) { \
58  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; }
59 
61 #define AGENT_CREDENTIAL do { \
62  if(agent && agent->owner) \
63  log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, \
64  agent->type ? agent->type->name : "?", \
65  agent->pid, agent->host ? agent->host->name : "?"); \
66  else \
67  log_printf("AGENT[%s][pid=%d][host=%s]: ", \
68  agent && agent->type ? agent->type->name : "?", \
69  agent ? agent->pid : -1, \
70  agent && agent->host ? agent->host->name : "?"); \
71 } while(0)
72 
74 #define AGENT_LOG_CREDENTIAL do { \
75  if(agent && agent->owner) \
76  con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \
77  agent->owner->id, agent->type ? agent->type->name : "?", \
78  agent->pid, agent->host ? agent->host->name : "?"); \
79 } while(0)
80 
82 #define AGENT_ERROR(...) do { \
83  log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \
84  AGENT_CREDENTIAL; \
85  log_printf(__VA_ARGS__); \
86  log_printf("\n"); } while(0)
87 
89 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \
90  log_printf("NOTE: "); \
91  AGENT_CREDENTIAL; \
92  log_printf(__VA_ARGS__); \
93  log_printf("\n"); } while(0)
94 
96 #define AGENT_WARNING(...) if(TEST_WARNING) do { \
97  log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \
98  AGENT_CREDENTIAL; \
99  log_printf(__VA_ARGS__); \
100  log_printf("\n"); } while(0)
101 
103 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \
104  AGENT_CREDENTIAL; \
105  log_printf(__VA_ARGS__); } while(0)
106 
108 #define AGENT_CONCURRENT_PRINT(...) do { \
109  AGENT_LOG_CREDENTIAL; \
110  con_printf(job_log(agent->owner), __VA_ARGS__); } while(0)
111 
112 /* ************************************************************************** */
113 /* **** Data Types ********************************************************** */
114 /* ************************************************************************** */
115 
120 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed),
121 const char* agent_status_strings[] =
123 #undef SELECT_STRING
124 
125 /* ************************************************************************** */
126 /* **** Local Functions ***************************************************** */
127 /* ************************************************************************** */
128 
140 static gboolean agent_close_fd(int* pid_ptr, agent_t* agent, agent_t* excepted)
141 {
142  TEST_NULL(agent, FALSE);
143  if (agent != excepted)
144  {
145  // Use raw close(): fclose() would flush stdio buffers into the parent's pipes.
146  close(agent->from_child);
147  close(agent->to_child);
148  close(agent->from_parent);
149  close(agent->to_parent);
150  }
151  return FALSE;
152 }
153 
164 static int update(int* pid_ptr, agent_t* agent, gpointer unused)
165 {
166  TEST_NULL(agent, 0);
167  if (agent->owner == NULL)
168  {
169  log_printf("ERROR %s.%d: Agent pid %d has no owner; killing to prevent NULL deref\n", __FILE__, __LINE__, agent->pid);
170  agent_kill(agent);
171  return 0;
172  }
173  int nokill = is_agent_special(agent, SAG_NOKILL) || is_meta_special(agent->type, SAG_NOKILL);
174 
175  if (agent->status == AG_SPAWNED || agent->status == AG_RUNNING || agent->status == AG_PAUSED)
176  {
177  /* check last checkin time */
178  if (time(NULL) - agent->check_in > CONF_agent_death_timer && !(agent->owner->status == JB_PAUSED) && !nokill)
179  {
180  AGENT_CONCURRENT_PRINT("no heartbeat for %d seconds\n", (time(NULL) - agent->check_in));
181  agent_kill(agent);
182  return 0;
183  }
184 
185  /* check items processed */
186  if (agent->status != AG_PAUSED && !agent->alive)
187  {
188  agent->n_updates++;
189  }
190  else
191  {
192  agent->n_updates = 0;
193  }
194  if (agent->n_updates > CONF_agent_update_number && !nokill)
195  {
196  AGENT_CONCURRENT_PRINT("agent has not set the alive flag in at least 10 minutes, killing\n");
197  agent_kill(agent);
198  return 0;
199  }
200 
201  AGENT_SEQUENTIAL_PRINT("agent updated correctly, processed %d items: %d\n", agent->total_analyzed,
202  agent->n_updates);
203  agent->alive = 0;
204  }
205 
206  return 0;
207 }
208 
220 static int agent_kill_traverse(int* pid, agent_t* agent, gpointer unused)
221 {
222  agent_kill(agent);
223  return FALSE;
224 }
225 
235 static int agent_list(char* name, meta_agent_t* ma, GOutputStream* ostr)
236 {
237  if (ma->valid)
238  {
239  g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
240  g_output_stream_write(ostr, " ", 1, NULL, NULL);
241  }
242  return FALSE;
243 }
244 
256 static int agent_test(const gchar* name, meta_agent_t* ma, scheduler_t* scheduler)
257 {
258  static int32_t id_gen = -1;
259 
260  GList* iter;
261  host_t* host;
262  char *jq_cmd_args = 0;
263 
264  for (iter = scheduler->host_queue; iter != NULL; iter = iter->next)
265  {
266  host = (host_t*) iter->data;
267  V_AGENT("META_AGENT[%s] testing on HOST[%s]\n", ma->name, host->name);
268  job_t* job = job_init(scheduler->job_list, scheduler->job_queue, ma->name, host->name, id_gen--, 0, 0, 0, 0, jq_cmd_args);
269  agent_init(scheduler, host, job);
270  }
271 
272  return 0;
273 }
274 
275 /* Protects meta_agent_t::version and version_source - held by both the
276  * agent_listen spawn thread and the event-loop thread (agent_meta_version_reset). */
277 #if GLIB_CHECK_VERSION(2, 32, 0)
278 static GMutex version_lock;
279 #else
280 static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
281 #endif
282 
291 {
292  if (ma == NULL) return;
293 #if GLIB_CHECK_VERSION(2, 32, 0)
294  g_mutex_lock(&version_lock);
295 #else
296  g_static_mutex_lock(&version_lock);
297 #endif
298  /* Log the old version while holding the lock so the read and free are atomic. */
299  if (ma->version != NULL)
300  log_printf("NOTE: version refresh: resetting cached version for agent "
301  "type \"%s\" (was \"%s\")\n", ma->name, ma->version);
302  g_free(ma->version);
303  ma->version = NULL;
304  ma->version_source = NULL;
305  ma->valid = TRUE;
306 #if GLIB_CHECK_VERSION(2, 32, 0)
307  g_mutex_unlock(&version_lock);
308 #else
309  g_static_mutex_unlock(&version_lock);
310 #endif
311 }
312 
319 typedef struct
320 {
321  pid_t pid;
323 } fail_pid_arg;
324 
332 static void agent_listen(scheduler_t* scheduler, agent_t* agent)
333 {
334  /* locals */
335  char buffer[1024]; // buffer to store c strings read from agent
336  GMatchInfo* match; // regex match information
337  char* arg; // used during regex retrievals
338  int relevant; // used during special retrievals
339 
340  TEST_NULV(agent);
341 
352  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
353  {
354  AGENT_CONCURRENT_PRINT("pipe from child closed: %s\n", strerror(errno));
355  g_thread_exit(NULL);
356  }
357 
358  /* check to make sure "VERSION" was sent */
359  buffer[strlen(buffer) - 1] = '\0';
360  if (strncmp(buffer, "VERSION: ", 9) != 0)
361  {
362  if (strncmp(buffer, "@@@1", 4) == 0)
363  {
364  THREAD_FATAL(job_log(agent->owner), "agent crashed before sending version information");
365  }
366  else
367  {
368  agent->type->valid = 0;
369  agent_fail_event(scheduler, agent);
370  agent_kill(agent);
371  con_printf(main_log, "ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
372  agent->host->name, agent->type->name);
373  AGENT_CONCURRENT_PRINT("agent didn't send version information: \"%s\"\n", buffer);
374  return;
375  }
376  }
377 
378  /* check that the VERSION information is correct */
379 #if GLIB_CHECK_VERSION(2, 32, 0)
380  g_mutex_lock(&version_lock);
381 #else
382  g_static_mutex_lock(&version_lock);
383 #endif
384  /* skip the "VERSION: " prefix */
385  const char* version_str = buffer + 9;
386  if (agent->type->version == NULL && agent->type->valid)
387  {
388  agent->type->version_source = agent->host->name;
389  agent->type->version = g_strdup(version_str);
390  if (TVERB_AGENT)
391  con_printf(main_log, "META_AGENT[%s.%s] version is: \"%s\"\n", agent->host->name, agent->type->name,
392  agent->type->version);
393  }
394  else if (strcmp(agent->type->version, version_str) != 0)
395  {
396  /* Version mismatch: hard-fail during startup tests (id<0); adopt new version at runtime. */
397  if (agent->owner != NULL && agent->owner->id < 0)
398  {
399  /* Startup test: hard-fail so version skew is caught at boot. */
400  con_printf(job_log(agent->owner),
401  "ERROR %s.%d: META_DATA[%s] invalid agent spawn check (startup)\n",
402  __FILE__, __LINE__, agent->type->name);
403  con_printf(job_log(agent->owner),
404  "ERROR: versions don't match: %s(\"%s\") != received: %s(\"%s\")\n",
405  agent->type->version_source, agent->type->version,
406  agent->host->name, version_str);
407  agent->type->valid = 0;
408  /* non-zero return_code so the startup-test job is marked failed */
409  agent->return_code = -1;
410  kill(-agent->pid, SIGKILL);
411 #if GLIB_CHECK_VERSION(2, 32, 0)
412  g_mutex_unlock(&version_lock);
413 #else
414  g_static_mutex_unlock(&version_lock);
415 #endif
416  return;
417  }
418 
419  /* Runtime: adopt new version and respawn cleanly. */
420  con_printf(main_log,
421  "WARNING %s.%d: META_AGENT[%s] version changed: \"%s\" -> \"%s\" "
422  "(was reported by: %s, now: %s). Adopting new version; agent will respawn.\n",
423  __FILE__, __LINE__, agent->type->name,
424  agent->type->version, version_str,
425  agent->type->version_source, agent->host->name);
426 
427  g_free(agent->type->version);
428  agent->type->version = g_strdup(version_str);
429  agent->type->version_source = agent->host->name;
430  /* return_code=0 so the death event respawns cleanly instead of failing the job.
431  * Kill the process group so child processes die too. */
432  agent->return_code = 0;
433  kill(-agent->pid, SIGKILL);
434 #if GLIB_CHECK_VERSION(2, 32, 0)
435  g_mutex_unlock(&version_lock);
436 #else
437  g_static_mutex_unlock(&version_lock);
438 #endif
439  return;
440  }
441 #if GLIB_CHECK_VERSION(2, 32, 0)
442  g_mutex_unlock(&version_lock);
443 #else
444  g_static_mutex_unlock(&version_lock);
445 #endif
446 
456  while (1)
457  {
458  /* get message from agent */
459  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
460  g_thread_exit(NULL);
461 
462  buffer[strlen(buffer) - 1] = '\0';
463 
464  if (strlen(buffer) == 0)
465  continue;
466 
467  if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(buffer, "SPECIAL", 7) != 0))
468  AGENT_CONCURRENT_PRINT("received: \"%s\"\n", buffer);
469 
477  if (strncmp(buffer, "BYE", 3) == 0)
478  {
479  if ((agent->return_code = atoi(&(buffer[4]))) != 0)
480  {
481  AGENT_CONCURRENT_PRINT("agent failed with error code %d\n", agent->return_code);
482  /* Pass the pid, not the agent pointer: agent_death_event() can join this
483  * thread and free the agent before the queued event runs. */
484  fail_pid_arg* fail_arg = g_new0(fail_pid_arg, 1);
485  fail_arg->pid = agent->pid;
486  fail_arg->agent = agent;
487  event_signal(agent_fail_event_pid, fail_arg);
488  }
489  break;
490  }
491 
498  if (strncmp(buffer, "@@@1", 4) == 0)
499  break;
500 
507  if (strncmp(buffer, "@@@0", 4) == 0 && agent->updated)
508  {
509  aprintf(agent, "%s\n", agent->data);
510  aprintf(agent, "END\n");
511  fflush(agent->write);
512  agent->updated = 0;
513  continue;
514  }
515 
516  /* agent just checked in */
517  agent->check_in = time(NULL);
518 
527  if (strncmp(buffer, "OK", 2) == 0)
528  {
529  if (agent->status != AG_PAUSED)
530  event_signal(agent_ready_event, agent);
531  }
532 
540  else if (strncmp(buffer, "HEART", 5) == 0)
541  {
542  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
543 
544  arg = g_match_info_fetch(match, 3);
545  agent->total_analyzed = atoi(arg);
546  g_free(arg);
547 
548  arg = g_match_info_fetch(match, 6);
549  agent->alive = (arg[0] == '1' || agent->alive);
550  g_free(arg);
551 
552  g_match_info_free(match);
553  match = NULL;
554 
556  }
557 
564  else if (strncmp(buffer, "EMAIL", 5) == 0)
565  {
566  g_free(agent->owner->message);
567  agent->owner->message = g_strdup(buffer + 6);
568  }
569 
577  else if (strncmp(buffer, "SPECIAL", 7) == 0)
578  {
579  relevant = INT_MAX;
580 
581  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
582 
583  arg = g_match_info_fetch(match, 3);
584  relevant &= atoi(arg);
585  g_free(arg);
586 
587  arg = g_match_info_fetch(match, 6);
588  if (atoi(arg))
589  {
590  if (agent->special & relevant)
591  relevant = 0;
592  }
593  else
594  {
595  if (!(agent->special & relevant))
596  relevant = 0;
597  }
598  g_free(arg);
599 
600  g_match_info_free(match);
601 
602  agent->special ^= relevant;
603  }
604 
610  else if (strncmp(buffer, "GETSPECIAL", 10) == 0)
611  {
612  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
613 
614  arg = g_match_info_fetch(match, 3);
615  relevant = atoi(arg);
616  g_free(arg);
617 
618  if (agent->special & relevant)
619  aprintf(agent, "VALUE: 1\n");
620  else
621  aprintf(agent, "VALUE: 0\n");
622 
623  g_match_info_free(match);
624  }
625 
631  else if (!(TVERB_AGENT))
632  AGENT_CONCURRENT_PRINT("\"%s\"\n", buffer);
633  }
634 
635  if (TVERB_AGENT)
636  AGENT_CONCURRENT_PRINT("communication thread closing\n");
637 }
638 
651 static void shell_parse(char* confdir, int user_id, int group_id, char* input, char *jq_cmd_args, int jobId, int* argc, char*** argv)
652 {
653  char* begin;
654  char* curr;
655  int idx = 0;
656 #define MAX_CMD_ARGS 30
657 
658  *argv = g_new0(char*, MAX_CMD_ARGS);
659  begin = NULL;
660 
661  for (curr = input; *curr; curr++)
662  {
663  if (*curr == ' ')
664  {
665  if (begin == NULL)
666  continue;
667 
668  if (*begin == '"')
669  continue;
670 
671  *curr = '\0';
672  (*argv)[idx++] = g_strdup(begin);
673  begin = NULL;
674  }
675  else if (begin == NULL)
676  {
677  begin = curr;
678  }
679  else if (*begin == '"' && *curr == '"')
680  {
681  *begin = '\0';
682  *curr = '\0';
683 
684  (*argv)[idx++] = g_strdup(begin + 1);
685  begin = NULL;
686  }
687  if (idx > MAX_CMD_ARGS - 7)
688  break;
689  }
690 
691  (*argv)[idx++] = g_strdup_printf("--jobId=%d", jobId);
692  (*argv)[idx++] = g_strdup_printf("--config=%s", confdir);
693  (*argv)[idx++] = g_strdup_printf("--userID=%d", user_id);
694  (*argv)[idx++] = g_strdup_printf("--groupID=%d", group_id);
695  (*argv)[idx++] = "--scheduler_start";
696  if (jq_cmd_args)
697  {
698  const char *start = jq_cmd_args;
699  const char *current = jq_cmd_args;
700  gboolean in_quotes = FALSE;
701 
702  while (*current != '\0')
703  {
704  if (*current == '\'' || *current == '"')
705  in_quotes = !in_quotes;
706  else if (*current == ' ' && !in_quotes)
707  {
708  if (current > start)
709  {
710  int len = current - start;
711  char *arg = g_strndup(start, len);
712  (*argv)[idx++] = arg;
713  }
714  start = current + 1;
715  }
716  current++;
717  }
718 
719  if (current > start)
720  {
721  char *arg = g_strndup(start, current - start);
722  (*argv)[idx++] = arg;
723  }
724  }
725  (*argc) = idx;
726 }
727 
731 typedef struct
732 {
736 
756 static void* agent_spawn(agent_spawn_args* pass)
757 {
758  /* locals */
759  scheduler_t* scheduler = pass->scheduler;
760  agent_t* agent = pass->agent;
761  g_free(pass); /* all values extracted; free before fork so g_thread_exit can't leak it */
762  gchar* tmp; // pointer to temporary string
763  gchar** args; // the arguments that will be passed to the child
764  int argc; // the number of arguments parsed
765  int len;
766  char buffer[2048]; // character buffer
767 
768  /* spawn the new process */
769  if (agent->owner == NULL)
770  {
771  log_printf("ERROR %s.%d: Agent spawn requested but agent has no owner; aborting spawn.\n", __FILE__, __LINE__);
772 
773  /* Close FILE* streams if they were opened by agent_init(). */
774  if (agent->read) { fclose(agent->read); agent->read = NULL; agent->from_child = -1; }
775  if (agent->write) { fclose(agent->write); agent->write = NULL; agent->to_child = -1; }
776 
777  /* from_child/to_child were already closed via fclose() above. */
778  if (agent->from_parent >= 0) { close(agent->from_parent); agent->from_parent = -1; }
779  if (agent->to_parent >= 0) { close(agent->to_parent); agent->to_parent = -1; }
780 
781  agent->status = AG_FAILED;
782  return NULL;
783  }
784 
785  while ((agent->pid = fork()) < 0)
786  sleep(rand() % CONF_fork_backoff_time);
787 
788  /* we are in the child */
789  if (agent->pid == 0)
790  {
791  /* Own process group so kill(-pid) reaches all child processes. */
792  setpgid(0, 0);
793 
794  /* set the child's stdin and stdout to use the pipes */
795  dup2(agent->from_parent, fileno(stdin));
796  dup2(agent->to_parent, fileno(stdout));
797  dup2(agent->to_parent, fileno(stderr));
798 
799  /* close all the unnecessary file descriptors */
800  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_close_fd, agent);
801  close(agent->from_child);
802  close(agent->to_child);
803 
804  /* set the priority of the process to the job's priority */
805  if (nice(agent->owner->priority) == -1)
806  ERROR("unable to correctly set priority of agent process %d", agent->pid);
807 
808  /* if host is null, the agent will run locally to */
809  /* run the agent locally, use the commands that */
810  /* were parsed when the meta_agent was created */
811  if (strcmp(agent->host->address, LOCAL_HOST) == 0)
812  {
813  shell_parse(scheduler->sysconfigdir, agent->owner->user_id, agent->owner->group_id,
814  agent->type->raw_cmd, agent->owner->jq_cmd_args,
815  agent->owner->parent_id, &argc, &args);
816 
817  tmp = args[0];
818  args[0] = g_strdup_printf(AGENT_BINARY, scheduler->sysconfigdir,
819  AGENT_CONF, agent->type->name, tmp);
820 
821  strcpy(buffer, args[0]);
822  *strrchr(buffer, '/') = '\0';
823  if (chdir(buffer) != 0)
824  {
825  ERROR("unable to change working directory: %s\n", strerror(errno));
826  }
827 
828  execv(args[0], args);
829  }
830  /* otherwise the agent will be started using ssh */
831  /* if the agent is started using ssh we don't need */
832  /* to fully parse the arguments, just pass the run */
833  /* command as the last argument to the ssh command */
834  else
835  {
836  args = g_new0(char*, 5);
837  len = snprintf(buffer, sizeof(buffer), AGENT_BINARY " --userID=%d --groupID=%d --scheduler_start --jobId=%d",
838  agent->host->agent_dir, AGENT_CONF, agent->type->name, agent->type->raw_cmd,
839  agent->owner->user_id, agent->owner->group_id, agent->owner->parent_id);
840 
841  if (len>=sizeof(buffer)) {
842  *(buffer + sizeof(buffer) - 1) = '\0';
843  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
844  __FILE__, __LINE__, agent->owner->id, agent->owner->agent_type, buffer);
845 
846  exit(5);
847  }
848 
849  args[0] = "/usr/bin/ssh";
850  args[1] = agent->host->address;
851  args[2] = buffer;
852  args[3] = agent->owner->jq_cmd_args;
853  args[4] = NULL;
854  execv(args[0], args);
855  }
856 
857  /* If we reach here, the exec call has failed */
858  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->owner->id,
859  agent->owner->agent_type, getpid(), strerror(errno));
860  }
861  /* we are in the parent */
862  else
863  {
864  /* Mirror child's setpgid to close the race between fork and exec. */
865  setpgid(agent->pid, agent->pid);
866 
867  event_signal(agent_create_event, agent);
868  agent_listen(scheduler, agent);
869  }
870 
871  return NULL;
872 }
873 
874 /* ************************************************************************** */
875 /* **** Constructor Destructor ********************************************** */
876 /* ************************************************************************** */
877 
894 meta_agent_t* meta_agent_init(char* name, char* cmd, int max, int spc)
895 {
896  /* locals */
897  meta_agent_t* ma;
898 
899  /* test inputs */
900  if (!name || !cmd)
901  {
902  ERROR("invalid arguments passed to meta_agent_init()");
903  return NULL;
904  }
905 
906  /* confirm valid inputs */
907  if (strlen(name) > MAX_NAME || strlen(cmd) > MAX_CMD)
908  {
909  log_printf("ERROR failed to load %s meta agent", name);
910  return NULL;
911  }
912 
913  /* inputs are valid, create the meta_agent */
914  ma = g_new0(meta_agent_t, 1);
915 
916  strcpy(ma->name, name);
917  strcpy(ma->raw_cmd, cmd);
918  strcat(ma->raw_cmd, " --scheduler_start");
919  ma->max_run = max;
920  ma->run_count = 0;
921  ma->special = spc;
922  ma->version = NULL;
923  ma->valid = TRUE;
924 
925  return ma;
926 }
927 
935 {
936  TEST_NULV(ma);
937  g_free(ma->version);
938  g_free(ma);
939 }
940 
952 agent_t* agent_init(scheduler_t* scheduler, host_t* host, job_t* job)
953 {
954  /* local variables */
955  agent_t* agent;
956  int child_to_parent[2];
957  int parent_to_child[2];
958  agent_spawn_args* pass;
959 
960  /* check job input */
961  if (!job)
962  {
963  log_printf("ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
964  log_printf("ERROR: no other information available\n");
965  return NULL;
966  }
967 
968  /* check that the agent type exists */
969  meta_agent_t* ma = g_tree_lookup(scheduler->meta_agents, job->agent_type);
970  if (ma == NULL)
971  {
972  log_printf("ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
973  job->id, job->agent_type);
974  job->message = NULL;
975  job_fail_event(scheduler, job);
976  job_remove_agent(job, scheduler->job_list, NULL);
977  return NULL;
978  }
979 
980  /* allocate memory and do trivial assignments */
981  agent = g_new(agent_t, 1);
982  agent->type = ma;
983  agent->status = AG_CREATED;
984 
985  /* check if the agent is valid */
986  if (!agent->type->valid)
987  {
988  ERROR("agent %s has been invalidated by version information", job->agent_type);
989  /* Job will never get an agent; fail and remove it to avoid a leak. */
990  g_free(agent);
991  g_free(job->message);
992  job->message = g_strdup("agent type has been invalidated");
993  job_fail_event(scheduler, job);
994  job_remove_agent(job, scheduler->job_list, NULL);
995  return NULL;
996  }
997 
998  /* create the pipes between the child and the parent */
999  if (pipe(parent_to_child) != 0)
1000  {
1001  ERROR("JOB[%d.%s] failed to create parent to child pipe", job->id, job->agent_type);
1002  g_free(agent);
1003  return NULL;
1004  }
1005  if (pipe(child_to_parent) != 0)
1006  {
1007  ERROR("JOB[%d.%s] failed to create child to parent pipe", job->id, job->agent_type);
1008  close(parent_to_child[0]);
1009  close(parent_to_child[1]);
1010  g_free(agent);
1011  return NULL;
1012  }
1013 
1014  /* set file identifiers to correctly talk to children */
1015  agent->from_parent = parent_to_child[0];
1016  agent->to_child = parent_to_child[1];
1017  agent->from_child = child_to_parent[0];
1018  agent->to_parent = child_to_parent[1];
1019 
1020  /* initialize other info */
1021  agent->host = host;
1022  agent->owner = job;
1023  agent->updated = 0;
1024  agent->n_updates = 0;
1025  agent->data = NULL;
1026  agent->return_code = -1;
1027  agent->total_analyzed = 0;
1028  agent->special = 0;
1029  agent->accounted = FALSE;
1030 
1031  /* open the relevant file pointers */
1032  if ((agent->read = fdopen(agent->from_child, "r")) == NULL)
1033  {
1034  ERROR("JOB[%d.%s] failed to initialize read file", job->id, job->agent_type);
1035  close(agent->from_child);
1036  close(agent->to_child);
1037  close(agent->from_parent);
1038  close(agent->to_parent);
1039  g_free(agent);
1040  return NULL;
1041  }
1042  if ((agent->write = fdopen(agent->to_child, "w")) == NULL)
1043  {
1044  ERROR("JOB[%d.%s] failed to initialize write file", job->id, job->agent_type);
1045  fclose(agent->read); /* closes from_child */
1046  close(agent->from_parent);
1047  close(agent->to_parent);
1048  close(agent->to_child);
1049  g_free(agent);
1050  return NULL;
1051  }
1052 
1053  /* increase the load on the host and count of running agents */
1054  if (agent->owner->id > 0)
1055  {
1056  host_increase_load(agent->host);
1058  /* mark that this agent holds a slot, so it can be released even if the job
1059  * is removed before the death event runs */
1060  agent->accounted = TRUE;
1061  }
1062 
1063  /* spawn the listen thread */
1064  pass = g_new0(agent_spawn_args, 1);
1065  pass->scheduler = scheduler;
1066  pass->agent = agent;
1067 
1068 #if GLIB_CHECK_VERSION(2, 32, 0)
1069  agent->thread = g_thread_new(agent->type->name, (GThreadFunc) agent_spawn, pass);
1070 #else
1071  agent->thread = g_thread_create((GThreadFunc)agent_spawn, pass, 1, NULL);
1072 #endif
1073 
1074  return agent;
1075 }
1076 
1089 {
1090  TEST_NULV(agent);
1091 
1092  // from_child/to_child are owned by the FILE* wrappers; fclose closes them.
1093  close(agent->from_parent);
1094  close(agent->to_parent);
1095  if (agent->write)
1096  {
1097  fclose(agent->write);
1098  }
1099  if (agent->read)
1100  {
1101  fclose(agent->read);
1102  }
1103 
1104  /* release the child process */
1105  g_free(agent);
1106 }
1107 
1108 /* ************************************************************************** */
1109 /* **** Events ************************************************************** */
1110 /* ************************************************************************** */
1111 
1120 void agent_death_event(scheduler_t* scheduler, pid_t* pid)
1121 {
1122  agent_t* agent;
1123  int status = pid[1];
1124 
1125  if ((agent = g_tree_lookup(scheduler->agents, &pid[0])) == NULL)
1126  {
1127  // SIGCHLD can race ahead of agent_create_event; retry up to 3 times.
1128  if (pid[2]++ < 3)
1129  {
1130  event_signal(agent_death_event, pid);
1131  return;
1132  }
1133  ERROR("invalid agent death event: pid[%d]", pid[0]);
1134  g_free(pid);
1135  return;
1136  }
1137 
1138  /* Ownerless agent: job_remove_agent already removed and freed the job, so the
1139  * usual decrement paths never ran. Unblock and join the thread, release the slot
1140  * this agent still holds, then drop it. */
1141  if (agent->owner == NULL)
1142  {
1143  log_printf("ERROR %s.%d: agent_death_event for ownerless agent pid %d - cleaning up\n",
1144  __FILE__, __LINE__, (int)pid[0]);
1145  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1146  {
1147  AGENT_SEQUENTIAL_PRINT("write to ownerless agent unsuccessful: %s\n", strerror(errno));
1148  }
1149  g_thread_join(agent->thread);
1150  /* Release the slot if this agent was counted and still holds it (AG_PAUSED
1151  * agents already released theirs when they paused). */
1152  if (agent->accounted && agent->status != AG_PAUSED)
1153  {
1154  host_decrease_load(agent->host);
1156  agent->accounted = FALSE;
1157  }
1158  g_tree_remove(scheduler->agents, &agent->pid);
1159  g_free(pid);
1160  return;
1161  }
1162 
1163  if (agent->owner->id >= 0)
1164  {
1165  event_signal(database_update_event, NULL);
1166  }
1167 
1168  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1169  {
1170  AGENT_SEQUENTIAL_PRINT("write to agent unsuccessful: %s\n", strerror(errno));
1171  }
1172  g_thread_join(agent->thread);
1173 
1174  /* Skip if the agent was already failed (the "BYE n" path may have run first);
1175  * calling it twice would add the agent to failed_agents twice. */
1176  if (agent->return_code != 0 && agent->status != AG_FAILED)
1177  {
1178  if (WIFEXITED(status))
1179  {
1180  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", (status >> 8));
1181  }
1182  else if (WIFSIGNALED(status))
1183  {
1184  AGENT_CONCURRENT_PRINT("agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
1185  if (WCOREDUMP(status))
1186  {
1187  AGENT_CONCURRENT_PRINT("agent produced core dump\n");
1188  }
1189  }
1190  else
1191  {
1192  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", agent->return_code);
1193  }
1194  AGENT_WARNING("agent closed unexpectedly, agent status was %s", agent_status_strings[agent->status]);
1195  agent_fail_event(scheduler, agent);
1196  }
1197 
1198  /* Decrement run_count and host load once per death:
1199  * AG_FAILED: agent_fail_event skipped it, so do it here.
1200  * AG_PAUSED: already done at the PAUSED transition.
1201  * otherwise: do it through agent_transition(AG_PAUSED). */
1202  if (agent->status == AG_FAILED)
1203  {
1204  if (agent->owner != NULL && agent->owner->id > 0)
1205  {
1206  host_decrease_load(agent->host);
1208  }
1209  }
1210  else if (agent->status != AG_PAUSED)
1211  {
1212  agent_transition(agent, AG_PAUSED);
1213  }
1214 
1215  /* Move the dying agent to finished_agents so job_remove_agent can clean up.
1216  * If the job is still JB_CHECKEDOUT the agent died before it got any data
1217  * (e.g. version-mismatch respawn), so re-queue the job for a fresh dispatch
1218  * instead of completing it. */
1219  if (agent->owner != NULL &&
1220  g_list_find(agent->owner->running_agents, agent) != NULL)
1221  {
1222  if (agent->owner->status == JB_CHECKEDOUT)
1223  {
1224  agent->owner->running_agents = g_list_remove(
1225  agent->owner->running_agents, agent);
1226  /* reset the timer so the stale reaper gives it a fresh grace period */
1227  agent->owner->checkedout_at = time(NULL);
1228  g_sequence_insert_sorted(scheduler->job_queue, agent->owner,
1229  job_compare, NULL);
1230  }
1231  else
1232  {
1233  job_finish_agent(agent->owner, agent);
1234  job_update(scheduler, agent->owner);
1235  }
1236  }
1237  else if (agent->owner != NULL)
1238  {
1239  job_update(scheduler, agent->owner);
1240  }
1241  if (agent->status == AG_FAILED && agent->owner->id < 0)
1242  {
1243  log_printf("ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->host->name,
1244  agent->type->name);
1245  agent->type->valid = 0;
1246  }
1247 
1248  if (agent->owner->id < 0 && !agent->type->valid)
1249  AGENT_SEQUENTIAL_PRINT("agent failed startup test, removing from meta agents\n");
1250 
1251  AGENT_SEQUENTIAL_PRINT("successfully remove from the system\n");
1252  job_remove_agent(agent->owner, scheduler->job_list, agent);
1253  g_tree_remove(scheduler->agents, &agent->pid);
1254  g_free(pid);
1255 }
1256 
1268 void agent_create_event(scheduler_t* scheduler, agent_t* agent)
1269 {
1270  TEST_NULV(agent);
1271 
1272  AGENT_SEQUENTIAL_PRINT("agent successfully spawned\n");
1273  g_tree_insert(scheduler->agents, &agent->pid, agent);
1274  agent_transition(agent, AG_SPAWNED);
1275  job_add_agent(agent->owner, agent);
1276 }
1277 
1289 void agent_ready_event(scheduler_t* scheduler, agent_t* agent)
1290 {
1291  int ret;
1292 
1293  TEST_NULV(agent);
1294  // If the agent has no job (owner is NULL), it shouldn't be here.
1295  // This prevents the "job passed is NULL
1296  if (agent->owner == NULL)
1297  {
1298  ERROR("Agent ready event received but agent has no owner. Terminating agent to prevent scheduler crash.");
1299  agent_kill(agent);
1300  return;
1301  }
1302 
1303  if (agent->status == AG_SPAWNED)
1304  {
1305  agent_transition(agent, AG_RUNNING);
1306  AGENT_SEQUENTIAL_PRINT("agent successfully created\n");
1307  }
1308 
1309  if ((ret = job_is_open(scheduler, agent->owner)) == 0)
1310  {
1311  agent_transition(agent, AG_PAUSED);
1312  job_finish_agent(agent->owner, agent);
1313  job_update(scheduler, agent->owner);
1314  return;
1315  }
1316  else if (ret < 0)
1317  {
1318  /* DB lookup failed (connection lost or OOM). Fail the job and kill the agent:
1319  * the watchdog ignores AG_FAILED agents, so we must SIGKILL it ourselves or it
1320  * would run forever. agent_fail_event has already set AG_FAILED, so the later
1321  * agent_death_event will not fail it again. */
1322  agent_fail_event(scheduler, agent);
1323  job_update(scheduler, agent->owner);
1324  agent_kill(agent);
1325  return;
1326  }
1327  else
1328  {
1329  agent->data = job_next(agent->owner);
1330  agent->updated = 1;
1331  }
1332 
1333  if (write(agent->to_parent, "@@@0\n", 5) != 5)
1334  {
1335  AGENT_ERROR("failed sending new data to agent");
1336  agent_kill(agent);
1337  }
1338 }
1339 
1343 typedef struct {
1344  GList* list;
1345  time_t now;
1346 } zombie_ctx;
1347 
1353 static gboolean collect_zombie_agents(int* pid_ptr, agent_t* agent, zombie_ctx* ctx)
1354 {
1355  if (agent == NULL || agent->owner == NULL)
1356  {
1357  return FALSE;
1358  }
1359  // Job must be FAILED with agent still in running/spawned state.
1360  if (agent->owner->status != JB_FAILED)
1361  {
1362  return FALSE;
1363  }
1364  if (agent->status != AG_SPAWNED && agent->status != AG_RUNNING)
1365  {
1366  return FALSE;
1367  }
1368  // 3× timer guarantees the normal watchdog already attempted SIGKILL.
1369  if (ctx->now - agent->check_in <= (time_t)(3 * CONF_agent_death_timer))
1370  {
1371  return FALSE;
1372  }
1373 
1374  ctx->list = g_list_prepend(ctx->list, agent);
1375  return FALSE;
1376 }
1377 
1390 void agent_update_event(scheduler_t* scheduler, void* unused)
1391 {
1392  g_tree_foreach(scheduler->agents, (GTraverseFunc) update, NULL);
1393 
1394  // Second pass: force-reap D-state zombie agents stuck in FAILED jobs.
1395  zombie_ctx zctx = { NULL, time(NULL) };
1396  g_tree_foreach(scheduler->agents, (GTraverseFunc) collect_zombie_agents, &zctx);
1397 
1398  GList* iter;
1399  for (iter = zctx.list; iter != NULL; iter = iter->next)
1400  {
1401  agent_t* zombie = (agent_t*)iter->data;
1402 
1403  log_printf("WARNING %s.%d: JOB[%d].%s[%d.%s]: agent unresponsive for %ld s"
1404  " in a FAILED job - forcing death-event cleanup\n",
1405  __FILE__, __LINE__,
1406  zombie->owner ? zombie->owner->id : -1,
1407  zombie->type ? zombie->type->name : "unknown",
1408  (int)zombie->pid,
1409  zombie->host ? zombie->host->name : "unknown",
1410  (long)(zctx.now - zombie->check_in));
1411 
1412  /* Queue it rather than calling directly, which would block on g_thread_join.
1413  * This is a synthetic death (no real waitpid status), so pass status 0;
1414  * agent_death_event uses return_code, not pids[1], to decide the fail path. */
1415  pid_t* pids = g_new0(pid_t, 3); /* [0]=pid [1]=status [2]=retry */
1416  pids[0] = zombie->pid;
1417  pids[1] = 0;
1418  event_signal(agent_death_event, pids);
1419  }
1420 
1421  g_list_free(zctx.list);
1422 }
1423 
1434 void agent_fail_event(scheduler_t* scheduler, agent_t* agent)
1435 {
1436  TEST_NULV(agent);
1437 
1438  // Ownerless agent: job already removed; transition and unblock thread to avoid NULL deref.
1439  if (agent->owner == NULL)
1440  {
1441  AGENT_ERROR("agent_fail_event called on ownerless agent (pid=%d), killing cleanly", agent->pid);
1442  agent_transition(agent, AG_FAILED);
1443  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1444  {
1445  AGENT_ERROR("Failed to kill ownerless agent thread cleanly");
1446  }
1447  return;
1448  }
1449 
1450  agent_transition(agent, AG_FAILED);
1451  job_fail_agent(agent->owner, agent);
1452  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1453  {
1454  AGENT_ERROR("Failed to kill agent thread cleanly");
1455  }
1456 }
1457 
1470 void agent_fail_event_pid(scheduler_t* scheduler, void* arg)
1471 {
1472  fail_pid_arg* fa = (fail_pid_arg*) arg;
1473  agent_t* agent = g_tree_lookup(scheduler->agents, &fa->pid);
1474 
1475  if (agent != NULL && agent == fa->agent && agent->status != AG_FAILED)
1476  {
1477  agent_fail_event(scheduler, agent);
1478  }
1479 
1480  g_free(fa);
1481 }
1482 
1490 void list_agents_event(scheduler_t* scheduler, GOutputStream* ostr)
1491 {
1492  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_list, ostr);
1493  g_output_stream_write(ostr, "\nend\n", 5, NULL, NULL);
1494 }
1495 
1496 /* ************************************************************************** */
1497 /* **** Modifier Functions ************************************************** */
1498 /* ************************************************************************** */
1499 
1508 void agent_transition(agent_t* agent, agent_status new_status)
1509 {
1510  AGENT_SEQUENTIAL_PRINT("agent status change: %s -> %s\n", agent_status_strings[agent->status],
1511  agent_status_strings[new_status]);
1512 
1513  if (agent->owner != NULL && agent->owner->id > 0)
1514  {
1515  if (agent->status == AG_PAUSED)
1516  {
1517  host_increase_load(agent->host);
1519  }
1520  if (new_status == AG_PAUSED)
1521  {
1522  host_decrease_load(agent->host);
1524  }
1525  }
1526 
1527  agent->status = new_status;
1528 }
1529 
1536 void agent_pause(agent_t* agent)
1537 {
1538  kill(agent->pid, SIGSTOP);
1539  agent_transition(agent, AG_PAUSED);
1540 }
1541 
1550 {
1551  kill(agent->pid, SIGCONT);
1552  agent_transition(agent, AG_RUNNING);
1553 }
1554 
1565 void agent_print_status(agent_t* agent, GOutputStream* ostr)
1566 {
1567  gchar* status_str;
1568  char time_buf[64];
1569  struct tm* time_info;
1570 
1571  TEST_NULV(agent);
1572  TEST_NULV(ostr);
1573 
1574  strcpy(time_buf, "(none)");
1575  time_info = localtime(&agent->check_in);
1576  if (time_info)
1577  strftime(time_buf, sizeof(time_buf), "%F %T", localtime(&agent->check_in));
1578  status_str = g_strdup_printf("agent:%d host:%s type:%s status:%s time:%s\n", agent->pid, agent->host->name,
1579  agent->type->name, agent_status_strings[agent->status], time_buf);
1580 
1581  AGENT_SEQUENTIAL_PRINT("AGENT_STATUS: %s", status_str);
1582  g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1583  g_free(status_str);
1584  return;
1585 }
1586 
1601 void agent_kill(agent_t* agent)
1602 {
1603  AGENT_SEQUENTIAL_PRINT("KILL: sending SIGKILL to pid %d\n", agent->pid);
1604  /* kill the process group so child processes (e.g. sh/python3) die too.
1605  * Leave return_code alone: a non-zero value makes the job fail, not complete. */
1606  kill(-agent->pid, SIGKILL);
1607 }
1608 
1617 int aprintf(agent_t* agent, const char* fmt, ...)
1618 {
1619  va_list args;
1620  int rc;
1621  char* tmp;
1622 
1623  va_start(args, fmt);
1624  if (TVERB_AGENT)
1625  {
1626  tmp = g_strdup_vprintf(fmt, args);
1627  tmp[strlen(tmp) - 1] = '\0';
1628  AGENT_CONCURRENT_PRINT("sent to agent \"%s\"\n", tmp);
1629  rc = fprintf(agent->write, "%s\n", tmp);
1630  g_free(tmp);
1631  }
1632  else
1633  {
1634  rc = vfprintf(agent->write, fmt, args);
1635  }
1636  va_end(args);
1637  fflush(agent->write);
1638 
1639  return rc;
1640 }
1641 
1653 ssize_t agent_write(agent_t* agent, const void* buf, int count)
1654 {
1655  return write(agent->to_parent, buf, count);
1656 }
1657 
1658 /* ************************************************************************** */
1659 /* **** static functions and meta agents ************************************ */
1660 /* ************************************************************************** */
1661 
1669 void test_agents(scheduler_t* scheduler)
1670 {
1671  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_test, scheduler);
1672 }
1673 
1680 void kill_agents(scheduler_t* scheduler)
1681 {
1682  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_kill_traverse, NULL);
1683 }
1684 
1695 int add_meta_agent(GTree* meta_agents, char* name, char* cmd, int max, int spc)
1696 {
1697  meta_agent_t* ma;
1698 
1699  if (name == NULL)
1700  return 0;
1701 
1702  if (g_tree_lookup(meta_agents, name) == NULL)
1703  {
1704  if ((ma = meta_agent_init(name, cmd, max, spc)) == NULL)
1705  return 0;
1706  g_tree_insert(meta_agents, ma->name, ma);
1707  return 1;
1708  }
1709 
1710  return 0;
1711 }
1712 
1720 int is_meta_special(meta_agent_t* ma, int special_type)
1721 {
1722  return (ma != NULL) && ((ma->special & special_type) != 0);
1723 }
1724 
1732 int is_agent_special(agent_t* agent, int special_type)
1733 {
1734  return (agent != NULL) && ((agent->special & special_type) != 0);
1735 }
1736 
1742 {
1743  ma->run_count++;
1744  V_AGENT("AGENT[%s] run increased to %d\n", ma->name, ma->run_count);
1745 }
1746 
1752 {
1753  ma->run_count--;
1754  V_AGENT("AGENT[%s] run decreased to %d\n", ma->name, ma->run_count);
1755 }
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces.
Definition: agent.c:235
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
Definition: agent.c:1289
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
Definition: agent.c:164
int aprintf(agent_t *agent, const char *fmt,...)
Definition: agent.c:1617
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
Definition: agent.c:952
static gboolean agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent's pipes.
Definition: agent.c:140
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
Definition: agent.c:256
static gboolean collect_zombie_agents(int *pid_ptr, agent_t *agent, zombie_ctx *ctx)
GTraverseFunc: collect agents in FAILED jobs silent for >3×agent_death_timer. Targets D-state process...
Definition: agent.c:1353
ssize_t agent_write(agent_t *agent, const void *buf, int count)
Definition: agent.c:1653
#define AGENT_SEQUENTIAL_PRINT(...)
Definition: agent.c:103
#define AGENT_CONCURRENT_PRINT(...)
Definition: agent.c:108
#define AGENT_ERROR(...)
Definition: agent.c:82
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
Definition: agent.c:220
void agent_meta_version_reset(meta_agent_t *ma)
Reset a meta_agent's cached version fields under version_lock.
Definition: agent.c:290
void agent_update_event(scheduler_t *scheduler, void *unused)
Definition: agent.c:1390
void meta_agent_increase_count(meta_agent_t *ma)
Definition: agent.c:1741
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
Definition: agent.c:1088
#define AGENT_WARNING(...)
Definition: agent.c:96
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
Definition: agent.c:1434
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
Definition: agent.c:894
void agent_kill(agent_t *agent)
Unclean kill of an agent whose job should fail.
Definition: agent.c:1601
void agent_pause(agent_t *agent)
Definition: agent.c:1536
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
Definition: agent.c:1695
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
Definition: agent.c:1268
void agent_transition(agent_t *agent, agent_status new_status)
Definition: agent.c:1508
#define TEST_NULL(a, ret)
Test if paramater is NULL.
Definition: agent.c:57
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
Definition: agent.c:332
void agent_unpause(agent_t *agent)
Definition: agent.c:1549
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
Definition: agent.c:1669
void meta_agent_decrease_count(meta_agent_t *ma)
Definition: agent.c:1751
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
Definition: agent.c:1680
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
Definition: agent.c:1732
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
Definition: agent.c:1490
void agent_fail_event_pid(scheduler_t *scheduler, void *arg)
Deferred agent_fail_event() that is safe against a freed agent.
Definition: agent.c:1470
#define TEST_NULV(a)
Test if paramater is NULL.
Definition: agent.c:47
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
Definition: agent.c:1720
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
Definition: agent.c:651
#define SELECT_STRING(passed)
Definition: agent.c:120
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
Definition: agent.c:1120
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
Definition: agent.c:756
void meta_agent_destroy(meta_agent_t *ma)
Definition: agent.c:934
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Definition: agent.c:1565
Header file with agent related operations.
#define MAX_CMD
the size of the agent's command buffer (arbitrary)
Definition: agent.h:26
#define SAG_NOKILL
This agent should not be killed when updating the agent.
Definition: agent.h:33
#define AGENT_STATUS_TYPES(apply)
Definition: agent.h:50
#define MAX_NAME
the size of the agent's name buffer (arbitrary)
Definition: agent.h:27
Event handling operations.
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
Definition: host.c:113
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
Definition: host.c:102
void job_fail_agent(job_t *job, void *agent)
Definition: job.c:535
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:437
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
Definition: job.c:164
void job_update(scheduler_t *scheduler, job_t *job)
Definition: job.c:574
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
Definition: job.c:624
char * job_next(job_t *job)
Definition: job.c:673
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
Definition: job.c:494
gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
Used to compare two different jobs in the priority queue.
Definition: job.c:135
log_t * job_log(job_t *job)
Definition: job.c:700
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
Definition: job.c:479
void job_finish_agent(job_t *job, void *agent)
Definition: job.c:520
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
Definition: libfossagent.c:295
int jobId
The id of the job.
char buffer[2048]
The last thing received from the scheduler.
log_t * main_log
Definition: logging.c:33
Log related operations.
#define ERROR(...)
Definition: logging.h:79
#define THREAD_FATAL(file,...)
Definition: logging.h:71
start($application)
start the application Assumes application is restartable via /etc/init.d/<script>....
Definition: pkgConfig.php:1214
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:846
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
Definition: database.c:999
Header file for the scheduler.
#define AGENT_CONF
Agent conf location.
Definition: scheduler.h:123
#define AGENT_BINARY
Format to get agent binary.
Definition: scheduler.h:122
agent_t * agent
Reference to current agent state.
Definition: agent.c:734
scheduler_t * scheduler
Reference to current scheduler state.
Definition: agent.c:733
Definition: agent.h:100
time_t check_in
the time that the agent last generated anything
Definition: agent.h:108
int to_child
file identifier to print to the child
Definition: agent.h:114
agent_status status
the state of execution the agent is currently in
Definition: agent.h:106
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
Definition: agent.h:102
int from_child
file identifier to read from child
Definition: agent.h:115
gboolean accounted
TRUE if this agent holds host load + run_count (owner->id > 0 at init); used to release the slot if t...
Definition: agent.h:128
gchar * data
the data that has been sent to the agent for analysis
Definition: agent.h:122
FILE * write
FILE* that abstracts the use of the to_child socket.
Definition: agent.h:118
gboolean alive
flag to tell the scheduler if the agent is still alive
Definition: agent.h:125
job_t * owner
the job that this agent is assigned to
Definition: agent.h:121
FILE * read
FILE* that abstracts the use of the from_child socket.
Definition: agent.h:117
uint32_t special
any special flags that the agent has set
Definition: agent.h:127
int to_parent
file identifier to print to the parent (child stdout)
Definition: agent.h:116
uint8_t n_updates
keeps track of the number of times the agent has updated
Definition: agent.h:109
uint64_t total_analyzed
the total number that this agent has analyzed
Definition: agent.h:124
host_t * host
the host that this agent will start on
Definition: agent.h:103
GThread * thread
the thread that communicates with this agent
Definition: agent.h:107
int from_parent
file identifier to read from the parent (child stdin)
Definition: agent.h:113
uint8_t return_code
what was returned by the agent when it disconnected
Definition: agent.h:126
pid_t pid
the pid of the process this agent is running in
Definition: agent.h:110
gboolean updated
boolean flag to indicate if the scheduler has updated the data
Definition: agent.h:123
Argument for agent_fail_event_pid().
Definition: agent.c:320
agent_t * agent
expected agent pointer, only compared
Definition: agent.c:322
pid_t pid
pid of the agent that reported failure
Definition: agent.c:321
Definition: host.h:26
char * agent_dir
The location on the host machine where the executables are.
Definition: host.h:29
char * address
The address of the host, used by ssh when starting a new agent.
Definition: host.h:28
char * name
The name of the host, used to store host internally to scheduler.
Definition: host.h:27
The job structure.
Definition: job.h:52
int32_t id
The identifier for this job.
Definition: job.h:74
job_status status
The current status for the job.
Definition: job.h:62
int32_t group_id
The id of the group that created the job.
Definition: job.h:76
gchar * message
Message that will be sent with job notification email.
Definition: job.h:70
gchar * jq_cmd_args
Command line arguments for this job.
Definition: job.h:64
time_t checkedout_at
Timestamp when job entered JB_CHECKEDOUT (for stale detection grace period)
Definition: job.h:77
int32_t user_id
The id of the user that created the job.
Definition: job.h:75
char * agent_type
The type of agent used to analyze the data.
Definition: job.h:54
int32_t priority
Importance of the job, maps directory to unix priority.
Definition: job.h:71
int32_t parent_id
The identifier for the parent of this job (its queue id)
Definition: job.h:73
GList * running_agents
The list of agents assigned to this job that are still working.
Definition: job.h:56
Store the results of a regex match.
Definition: scanners.hpp:28
int valid
flag indicating if the meta_agent is valid
Definition: agent.h:89
char raw_cmd[MAX_CMD+1]
the raw command that will start the agent, used for ssh
Definition: agent.h:84
int max_run
the maximum number that can run at once -1 if no limit
Definition: agent.h:85
char name[256]
the name associated with this agent i.e. nomos, copyright...
Definition: agent.h:83
int run_count
the count of agents in running state
Definition: agent.h:90
char * version
the version of the agent that is running on all hosts
Definition: agent.h:88
char * version_source
the machine that reported the version information
Definition: agent.h:87
int special
any special condition associated with the agent
Definition: agent.h:86
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:172
gchar * sysconfigdir
The system directory that contain fossology.conf.
Definition: scheduler.h:150
GList * host_queue
Round-robin queue for choosing which host use next.
Definition: scheduler.h:161
GRegex * parse_agent_msg
Parses messages coming from the agents.
Definition: scheduler.h:189
GTree * meta_agents
List of all meta agents available to the scheduler.
Definition: scheduler.h:156
GTree * agents
List of any currently running agents.
Definition: scheduler.h:157
GSequence * job_queue
heap of jobs that still need to be started
Definition: scheduler.h:173
Context passed to collect_zombie_agents().
Definition: agent.c:1343
time_t now
Current-time snapshot for the traversal.
Definition: agent.c:1345
GList * list
Accumulates zombie agent_t pointers.
Definition: agent.c:1344