FOSSology  4.6.0
Open Source License Compliance by Open Source Software
agent.c
Go to the documentation of this file.
1 /*
2  SPDX-FileCopyrightText: © 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3  SPDX-FileCopyrightText: © 2015 Siemens AG
4 
5  SPDX-License-Identifier: GPL-2.0-only
6 */
7 
15 /* local includes */
16 #include <agent.h>
17 #include <database.h>
18 #include <event.h>
19 #include <host.h>
20 #include <job.h>
21 #include <logging.h>
22 #include <scheduler.h>
23 
24 /* library includes */
25 #include <limits.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <time.h>
30 
31 /* unix library includes */
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 /* other library includes */
39 #include <glib.h>
40 
47 #define TEST_NULV(a) if(!a) { \
48  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; }
57 #define TEST_NULL(a, ret) if(!a) { \
58  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; }
59 
61 #define AGENT_CREDENTIAL do { \
62  if(agent && agent->owner) \
63  log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, \
64  agent->type ? agent->type->name : "?", \
65  agent->pid, agent->host ? agent->host->name : "?"); \
66  else \
67  log_printf("AGENT[%s][pid=%d][host=%s]: ", \
68  agent && agent->type ? agent->type->name : "?", \
69  agent ? agent->pid : -1, \
70  agent && agent->host ? agent->host->name : "?"); \
71 } while(0)
72 
74 #define AGENT_LOG_CREDENTIAL do { \
75  if(agent && agent->owner) \
76  con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \
77  agent->owner->id, agent->type ? agent->type->name : "?", \
78  agent->pid, agent->host ? agent->host->name : "?"); \
79 } while(0)
80 
82 #define AGENT_ERROR(...) do { \
83  log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \
84  AGENT_CREDENTIAL; \
85  log_printf(__VA_ARGS__); \
86  log_printf("\n"); } while(0)
87 
89 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \
90  log_printf("NOTE: "); \
91  AGENT_CREDENTIAL; \
92  log_printf(__VA_ARGS__); \
93  log_printf("\n"); } while(0)
94 
96 #define AGENT_WARNING(...) if(TEST_WARNING) do { \
97  log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \
98  AGENT_CREDENTIAL; \
99  log_printf(__VA_ARGS__); \
100  log_printf("\n"); } while(0)
101 
103 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \
104  AGENT_CREDENTIAL; \
105  log_printf(__VA_ARGS__); } while(0)
106 
108 #define AGENT_CONCURRENT_PRINT(...) do { \
109  AGENT_LOG_CREDENTIAL; \
110  con_printf(job_log(agent->owner), __VA_ARGS__); } while(0)
111 
112 /* ************************************************************************** */
113 /* **** Data Types ********************************************************** */
114 /* ************************************************************************** */
115 
120 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed),
121 const char* agent_status_strings[] =
123 #undef SELECT_STRING
124 
125 /* ************************************************************************** */
126 /* **** Local Functions ***************************************************** */
127 /* ************************************************************************** */
128 
140 static int agent_close_fd(int* pid_ptr, agent_t* agent, agent_t* excepted)
141 {
142  TEST_NULL(agent, 0);
143  if (agent != excepted)
144  {
145  close(agent->from_child);
146  close(agent->to_child);
147  fclose(agent->read);
148  fclose(agent->write);
149  }
150  return 0;
151 }
152 
163 static int update(int* pid_ptr, agent_t* agent, gpointer unused)
164 {
165  TEST_NULL(agent, 0);
166  if (agent->owner == NULL)
167  {
168  log_printf("ERROR %s.%d: Agent pid %d has no owner; killing to prevent NULL deref\n", __FILE__, __LINE__, agent->pid);
169  agent_kill(agent);
170  return 0;
171  }
172  int nokill = is_agent_special(agent, SAG_NOKILL) || is_meta_special(agent->type, SAG_NOKILL);
173 
174  if (agent->status == AG_SPAWNED || agent->status == AG_RUNNING || agent->status == AG_PAUSED)
175  {
176  /* check last checkin time */
177  if (time(NULL) - agent->check_in > CONF_agent_death_timer && !(agent->owner->status == JB_PAUSED) && !nokill)
178  {
179  AGENT_CONCURRENT_PRINT("no heartbeat for %d seconds\n", (time(NULL) - agent->check_in));
180  agent_kill(agent);
181  return 0;
182  }
183 
184  /* check items processed */
185  if (agent->status != AG_PAUSED && !agent->alive)
186  {
187  agent->n_updates++;
188  }
189  else
190  {
191  agent->n_updates = 0;
192  }
193  if (agent->n_updates > CONF_agent_update_number && !nokill)
194  {
195  AGENT_CONCURRENT_PRINT("agent has not set the alive flag in at least 10 minutes, killing\n");
196  agent_kill(agent);
197  return 0;
198  }
199 
200  AGENT_SEQUENTIAL_PRINT("agent updated correctly, processed %d items: %d\n", agent->total_analyzed,
201  agent->n_updates);
202  agent->alive = 0;
203  }
204 
205  return 0;
206 }
207 
219 static int agent_kill_traverse(int* pid, agent_t* agent, gpointer unused)
220 {
221  agent_kill(agent);
222  return FALSE;
223 }
224 
234 static int agent_list(char* name, meta_agent_t* ma, GOutputStream* ostr)
235 {
236  if (ma->valid)
237  {
238  g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
239  g_output_stream_write(ostr, " ", 1, NULL, NULL);
240  }
241  return FALSE;
242 }
243 
255 static int agent_test(const gchar* name, meta_agent_t* ma, scheduler_t* scheduler)
256 {
257  static int32_t id_gen = -1;
258 
259  GList* iter;
260  host_t* host;
261  char *jq_cmd_args = 0;
262 
263  for (iter = scheduler->host_queue; iter != NULL; iter = iter->next)
264  {
265  host = (host_t*) iter->data;
266  V_AGENT("META_AGENT[%s] testing on HOST[%s]\n", ma->name, host->name);
267  job_t* job = job_init(scheduler->job_list, scheduler->job_queue, ma->name, host->name, id_gen--, 0, 0, 0, 0, jq_cmd_args);
268  agent_init(scheduler, host, job);
269  }
270 
271  return 0;
272 }
273 
281 static void agent_listen(scheduler_t* scheduler, agent_t* agent)
282 {
283  /* static locals */
284 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
285  static GMutex version_lock;
286 #else
287  static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
288 #endif
289 
290  /* locals */
291  char buffer[1024]; // buffer to store c strings read from agent
292  GMatchInfo* match; // regex match information
293  char* arg; // used during regex retrievals
294  int relevant; // used during special retrievals
295 
296  TEST_NULV(agent);
297 
308  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
309  {
310  AGENT_CONCURRENT_PRINT("pipe from child closed: %s\n", strerror(errno));
311  g_thread_exit(NULL);
312  }
313 
314  /* check to make sure "VERSION" was sent */
315  buffer[strlen(buffer) - 1] = '\0';
316  if (strncmp(buffer, "VERSION: ", 9) != 0)
317  {
318  if (strncmp(buffer, "@@@1", 4) == 0)
319  {
320  THREAD_FATAL(job_log(agent->owner), "agent crashed before sending version information");
321  }
322  else
323  {
324  agent->type->valid = 0;
325  agent_fail_event(scheduler, agent);
326  agent_kill(agent);
327  con_printf(main_log, "ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
328  agent->host->name, agent->type->name);
329  AGENT_CONCURRENT_PRINT("agent didn't send version information: \"%s\"\n", buffer);
330  return;
331  }
332  }
333 
334  /* check that the VERSION information is correct */
335 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
336  g_mutex_lock(&version_lock);
337 #else
338  g_static_mutex_lock(&version_lock);
339 #endif
340  strcpy(buffer, &buffer[9]);
341  if (agent->type->version == NULL && agent->type->valid)
342  {
343  agent->type->version_source = agent->host->name;
344  agent->type->version = g_strdup(buffer);
345  if (TVERB_AGENT)
346  con_printf(main_log, "META_AGENT[%s.%s] version is: \"%s\"\n", agent->host->name, agent->type->name,
347  agent->type->version);
348  }
349  else if (strcmp(agent->type->version, buffer) != 0)
350  {
351  con_printf(job_log(agent->owner), "ERROR %s.%d: META_DATA[%s] invalid agent spawn check\n", __FILE__, __LINE__,
352  agent->type->name);
353  con_printf(job_log(agent->owner), "ERROR: versions don't match: %s(%s) != received: %s(%s)\n",
354  agent->type->version_source, agent->type->version, agent->host->name, buffer);
355  agent->type->valid = 0;
356  agent_kill(agent);
357 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
358  g_mutex_unlock(&version_lock);
359 #else
360  g_static_mutex_unlock(&version_lock);
361 #endif
362  return;
363  }
364 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
365  g_mutex_unlock(&version_lock);
366 #else
367  g_static_mutex_unlock(&version_lock);
368 #endif
369 
379  while (1)
380  {
381  /* get message from agent */
382  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
383  g_thread_exit(NULL);
384 
385  buffer[strlen(buffer) - 1] = '\0';
386 
387  if (strlen(buffer) == 0)
388  continue;
389 
390  if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(buffer, "SPECIAL", 7) != 0))
391  AGENT_CONCURRENT_PRINT("received: \"%s\"\n", buffer);
392 
400  if (strncmp(buffer, "BYE", 3) == 0)
401  {
402  if ((agent->return_code = atoi(&(buffer[4]))) != 0)
403  {
404  AGENT_CONCURRENT_PRINT("agent failed with error code %d\n", agent->return_code);
405  event_signal(agent_fail_event, agent);
406  }
407  break;
408  }
409 
416  if (strncmp(buffer, "@@@1", 4) == 0)
417  break;
418 
425  if (strncmp(buffer, "@@@0", 4) == 0 && agent->updated)
426  {
427  aprintf(agent, "%s\n", agent->data);
428  aprintf(agent, "END\n");
429  fflush(agent->write);
430  agent->updated = 0;
431  continue;
432  }
433 
434  /* agent just checked in */
435  agent->check_in = time(NULL);
436 
445  if (strncmp(buffer, "OK", 2) == 0)
446  {
447  if (agent->status != AG_PAUSED)
448  event_signal(agent_ready_event, agent);
449  }
450 
458  else if (strncmp(buffer, "HEART", 5) == 0)
459  {
460  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
461 
462  arg = g_match_info_fetch(match, 3);
463  agent->total_analyzed = atoi(arg);
464  g_free(arg);
465 
466  arg = g_match_info_fetch(match, 6);
467  agent->alive = (arg[0] == '1' || agent->alive);
468  g_free(arg);
469 
470  g_match_info_free(match);
471  match = NULL;
472 
474  }
475 
482  else if (strncmp(buffer, "EMAIL", 5) == 0)
483  {
484  agent->owner->message = g_strdup(buffer + 6);
485  }
486 
494  else if (strncmp(buffer, "SPECIAL", 7) == 0)
495  {
496  relevant = INT_MAX;
497 
498  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
499 
500  arg = g_match_info_fetch(match, 3);
501  relevant &= atoi(arg);
502  g_free(arg);
503 
504  arg = g_match_info_fetch(match, 6);
505  if (atoi(arg))
506  {
507  if (agent->special & relevant)
508  relevant = 0;
509  }
510  else
511  {
512  if (!(agent->special & relevant))
513  relevant = 0;
514  }
515  g_free(arg);
516 
517  g_match_info_free(match);
518 
519  agent->special ^= relevant;
520  }
521 
527  else if (strncmp(buffer, "GETSPECIAL", 10) == 0)
528  {
529  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
530 
531  arg = g_match_info_fetch(match, 3);
532  relevant = atoi(arg);
533  g_free(arg);
534 
535  if (agent->special & relevant)
536  aprintf(agent, "VALUE: 1\n");
537  else
538  aprintf(agent, "VALUE: 0\n");
539 
540  g_match_info_free(match);
541  }
542 
548  else if (!(TVERB_AGENT))
549  AGENT_CONCURRENT_PRINT("\"%s\"\n", buffer);
550  }
551 
552  if (TVERB_AGENT)
553  AGENT_CONCURRENT_PRINT("communication thread closing\n");
554 }
555 
568 static void shell_parse(char* confdir, int user_id, int group_id, char* input, char *jq_cmd_args, int jobId, int* argc, char*** argv)
569 {
570  char* begin;
571  char* curr;
572  int idx = 0;
573 #define MAX_CMD_ARGS 30
574 
575  *argv = g_new0(char*, MAX_CMD_ARGS);
576  begin = NULL;
577 
578  for (curr = input; *curr; curr++)
579  {
580  if (*curr == ' ')
581  {
582  if (begin == NULL)
583  continue;
584 
585  if (*begin == '"')
586  continue;
587 
588  *curr = '\0';
589  (*argv)[idx++] = g_strdup(begin);
590  begin = NULL;
591  }
592  else if (begin == NULL)
593  {
594  begin = curr;
595  }
596  else if (*begin == '"' && *curr == '"')
597  {
598  *begin = '\0';
599  *curr = '\0';
600 
601  (*argv)[idx++] = g_strdup(begin + 1);
602  begin = NULL;
603  }
604  if (idx > MAX_CMD_ARGS - 7)
605  break;
606  }
607 
608  (*argv)[idx++] = g_strdup_printf("--jobId=%d", jobId);
609  (*argv)[idx++] = g_strdup_printf("--config=%s", confdir);
610  (*argv)[idx++] = g_strdup_printf("--userID=%d", user_id);
611  (*argv)[idx++] = g_strdup_printf("--groupID=%d", group_id);
612  (*argv)[idx++] = "--scheduler_start";
613  if (jq_cmd_args)
614  {
615  const char *start = jq_cmd_args;
616  const char *current = jq_cmd_args;
617  gboolean in_quotes = FALSE;
618 
619  while (*current != '\0')
620  {
621  if (*current == '\'' || *current == '"')
622  in_quotes = !in_quotes;
623  else if (*current == ' ' && !in_quotes)
624  {
625  if (current > start)
626  {
627  int len = current - start;
628  char *arg = g_strndup(start, len);
629  (*argv)[idx++] = arg;
630  }
631  start = current + 1;
632  }
633  current++;
634  }
635 
636  if (current > start)
637  {
638  char *arg = g_strndup(start, current - start);
639  (*argv)[idx++] = arg;
640  }
641  }
642  (*argc) = idx;
643 }
644 
648 typedef struct
649 {
653 
673 static void* agent_spawn(agent_spawn_args* pass)
674 {
675  /* locals */
676  scheduler_t* scheduler = pass->scheduler;
677  agent_t* agent = pass->agent;
678  gchar* tmp; // pointer to temporary string
679  gchar** args; // the arguments that will be passed to the child
680  int argc; // the number of arguments parsed
681  int len;
682  char buffer[2048]; // character buffer
683 
684  /* spawn the new process */
685  if (agent->owner == NULL)
686  {
687  log_printf("ERROR %s.%d: Agent spawn requested but agent has no owner; aborting spawn.\n", __FILE__, __LINE__);
688 
689  /* Close FILE* streams if they were opened by agent_init(). */
690  if (agent->read) { fclose(agent->read); agent->read = NULL; agent->from_child = -1; }
691  if (agent->write) { fclose(agent->write); agent->write = NULL; agent->to_child = -1; }
692 
693  /* from_child/to_child were already closed via fclose() above. */
694  if (agent->from_parent >= 0) { close(agent->from_parent); agent->from_parent = -1; }
695  if (agent->to_parent >= 0) { close(agent->to_parent); agent->to_parent = -1; }
696 
697  /* Mark failed and free the spawn args to avoid leaking the heap allocation */
698  agent->status = AG_FAILED;
699  g_free(pass);
700  return NULL;
701  }
702 
703  while ((agent->pid = fork()) < 0)
704  sleep(rand() % CONF_fork_backoff_time);
705 
706  /* we are in the child */
707  if (agent->pid == 0)
708  {
709  /* set the child's stdin and stdout to use the pipes */
710  dup2(agent->from_parent, fileno(stdin));
711  dup2(agent->to_parent, fileno(stdout));
712  dup2(agent->to_parent, fileno(stderr));
713 
714  /* close all the unnecessary file descriptors */
715  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_close_fd, agent);
716  close(agent->from_child);
717  close(agent->to_child);
718 
719  /* set the priority of the process to the job's priority */
720  if (nice(agent->owner->priority) == -1)
721  ERROR("unable to correctly set priority of agent process %d", agent->pid);
722 
723  /* if host is null, the agent will run locally to */
724  /* run the agent locally, use the commands that */
725  /* were parsed when the meta_agent was created */
726  if (strcmp(agent->host->address, LOCAL_HOST) == 0)
727  {
728  shell_parse(scheduler->sysconfigdir, agent->owner->user_id, agent->owner->group_id,
729  agent->type->raw_cmd, agent->owner->jq_cmd_args,
730  agent->owner->parent_id, &argc, &args);
731 
732  tmp = args[0];
733  args[0] = g_strdup_printf(AGENT_BINARY, scheduler->sysconfigdir,
734  AGENT_CONF, agent->type->name, tmp);
735 
736  strcpy(buffer, args[0]);
737  *strrchr(buffer, '/') = '\0';
738  if (chdir(buffer) != 0)
739  {
740  ERROR("unable to change working directory: %s\n", strerror(errno));
741  }
742 
743  execv(args[0], args);
744  }
745  /* otherwise the agent will be started using ssh */
746  /* if the agent is started using ssh we don't need */
747  /* to fully parse the arguments, just pass the run */
748  /* command as the last argument to the ssh command */
749  else
750  {
751  args = g_new0(char*, 5);
752  len = snprintf(buffer, sizeof(buffer), AGENT_BINARY " --userID=%d --groupID=%d --scheduler_start --jobId=%d",
753  agent->host->agent_dir, AGENT_CONF, agent->type->name, agent->type->raw_cmd,
754  agent->owner->user_id, agent->owner->group_id, agent->owner->parent_id);
755 
756  if (len>=sizeof(buffer)) {
757  *(buffer + sizeof(buffer) - 1) = '\0';
758  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
759  __FILE__, __LINE__, agent->owner->id, agent->owner->agent_type, buffer);
760 
761  exit(5);
762  }
763 
764  args[0] = "/usr/bin/ssh";
765  args[1] = agent->host->address;
766  args[2] = buffer;
767  args[3] = agent->owner->jq_cmd_args;
768  args[4] = NULL;
769  execv(args[0], args);
770  }
771 
772  /* If we reach here, the exec call has failed */
773  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->owner->id,
774  agent->owner->agent_type, getpid(), strerror(errno));
775  }
776  /* we are in the parent */
777  else
778  {
779  event_signal(agent_create_event, agent);
780  agent_listen(scheduler, agent);
781  }
782 
783  return NULL;
784 }
785 
786 /* ************************************************************************** */
787 /* **** Constructor Destructor ********************************************** */
788 /* ************************************************************************** */
789 
806 meta_agent_t* meta_agent_init(char* name, char* cmd, int max, int spc)
807 {
808  /* locals */
809  meta_agent_t* ma;
810 
811  /* test inputs */
812  if (!name || !cmd)
813  {
814  ERROR("invalid arguments passed to meta_agent_init()");
815  return NULL;
816  }
817 
818  /* confirm valid inputs */
819  if (strlen(name) > MAX_NAME || strlen(cmd) > MAX_CMD)
820  {
821  log_printf("ERROR failed to load %s meta agent", name);
822  return NULL;
823  }
824 
825  /* inputs are valid, create the meta_agent */
826  ma = g_new0(meta_agent_t, 1);
827 
828  strcpy(ma->name, name);
829  strcpy(ma->raw_cmd, cmd);
830  strcat(ma->raw_cmd, " --scheduler_start");
831  ma->max_run = max;
832  ma->run_count = 0;
833  ma->special = spc;
834  ma->version = NULL;
835  ma->valid = TRUE;
836 
837  return ma;
838 }
839 
847 {
848  TEST_NULV(ma);
849  g_free(ma->version);
850  g_free(ma);
851 }
852 
864 agent_t* agent_init(scheduler_t* scheduler, host_t* host, job_t* job)
865 {
866  /* local variables */
867  agent_t* agent;
868  int child_to_parent[2];
869  int parent_to_child[2];
870  agent_spawn_args* pass;
871 
872  /* check job input */
873  if (!job)
874  {
875  log_printf("ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
876  log_printf("ERROR: no other information available\n");
877  return NULL;
878  }
879 
880  /* check that the agent type exists */
881  if (g_tree_lookup(scheduler->meta_agents, job->agent_type) == NULL)
882  {
883  log_printf("ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
884  job->id, job->agent_type);
885  job->message = NULL;
886  job_fail_event(scheduler, job);
887  job_remove_agent(job, scheduler->job_list, NULL);
888  return NULL;
889  }
890 
891  /* allocate memory and do trivial assignments */
892  agent = g_new(agent_t, 1);
893  agent->type = g_tree_lookup(scheduler->meta_agents, job->agent_type);
894  agent->status = AG_CREATED;
895 
896  /* make sure that there is a metaagent for the job */
897  if (agent->type == NULL)
898  {
899  ERROR("meta agent %s does not exist", job->agent_type);
900  return NULL;
901  }
902 
903  /* check if the agent is valid */
904  if (!agent->type->valid)
905  {
906  ERROR("agent %s has been invalidated by version information", job->agent_type);
907  return NULL;
908  }
909 
910  /* create the pipes between the child and the parent */
911  if (pipe(parent_to_child) != 0)
912  {
913  ERROR("JOB[%d.%s] failed to create parent to child pipe", job->id, job->agent_type);
914  g_free(agent);
915  return NULL;
916  }
917  if (pipe(child_to_parent) != 0)
918  {
919  ERROR("JOB[%d.%s] failed to create child to parent pipe", job->id, job->agent_type);
920  g_free(agent);
921  return NULL;
922  }
923 
924  /* set file identifiers to correctly talk to children */
925  agent->from_parent = parent_to_child[0];
926  agent->to_child = parent_to_child[1];
927  agent->from_child = child_to_parent[0];
928  agent->to_parent = child_to_parent[1];
929 
930  /* initialize other info */
931  agent->host = host;
932  agent->owner = job;
933  agent->updated = 0;
934  agent->n_updates = 0;
935  agent->data = NULL;
936  agent->return_code = -1;
937  agent->total_analyzed = 0;
938  agent->special = 0;
939 
940  /* open the relevant file pointers */
941  if ((agent->read = fdopen(agent->from_child, "r")) == NULL)
942  {
943  ERROR("JOB[%d.%s] failed to initialize read file", job->id, job->agent_type);
944  g_free(agent);
945  return NULL;
946  }
947  if ((agent->write = fdopen(agent->to_child, "w")) == NULL)
948  {
949  ERROR("JOB[%d.%s] failed to initialize write file", job->id, job->agent_type);
950  g_free(agent);
951  return NULL;
952  }
953 
954  /* increase the load on the host and count of running agents */
955  if (agent->owner->id > 0)
956  {
957  host_increase_load(agent->host);
959  }
960 
961  /* spawn the listen thread */
962  pass = g_new0(agent_spawn_args, 1);
963  pass->scheduler = scheduler;
964  pass->agent = agent;
965 
966 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
967  agent->thread = g_thread_new(agent->type->name, (GThreadFunc) agent_spawn, pass);
968 #else
969  agent->thread = g_thread_create((GThreadFunc)agent_spawn, pass, 1, NULL);
970 #endif
971 
972  return agent;
973 }
974 
986 void agent_destroy(agent_t* agent)
987 {
988  TEST_NULV(agent);
989 
990  /* close all of the files still open for this agent */
991  close(agent->from_child);
992  close(agent->to_child);
993  close(agent->from_parent);
994  close(agent->to_parent);
995  fclose(agent->write);
996  fclose(agent->read);
997 
998  /* release the child process */
999  g_free(agent);
1000 }
1001 
1002 /* ************************************************************************** */
1003 /* **** Events ************************************************************** */
1004 /* ************************************************************************** */
1005 
1014 void agent_death_event(scheduler_t* scheduler, pid_t* pid)
1015 {
1016  agent_t* agent;
1017  int status = pid[1];
1018 
1019  if ((agent = g_tree_lookup(scheduler->agents, &pid[0])) == NULL)
1020  {
1021  ERROR("invalid agent death event: pid[%d]", pid[0]);
1022  return;
1023  }
1024 
1025  if (agent->owner->id >= 0)
1026  event_signal(database_update_event, NULL);
1027 
1028  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1029  AGENT_SEQUENTIAL_PRINT("write to agent unsuccessful: %s\n", strerror(errno));
1030  g_thread_join(agent->thread);
1031 
1032  if (agent->return_code != 0)
1033  {
1034  if (WIFEXITED(status))
1035  {
1036  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", (status >> 8));
1037  }
1038  else if (WIFSIGNALED(status))
1039  {
1040  AGENT_CONCURRENT_PRINT("agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
1041  if (WCOREDUMP(status))
1042  AGENT_CONCURRENT_PRINT("agent produced core dump\n");
1043  }
1044  else
1045  {
1046  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", agent->return_code);
1047  }
1048  AGENT_WARNING("agent closed unexpectedly, agent status was %s", agent_status_strings[agent->status]);
1049  agent_fail_event(scheduler, agent);
1050  }
1051 
1052  if (agent->status != AG_PAUSED && agent->status != AG_FAILED)
1053  agent_transition(agent, AG_PAUSED);
1054 
1055  job_update(scheduler, agent->owner);
1056  if (agent->status == AG_FAILED && agent->owner->id < 0)
1057  {
1058  log_printf("ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->host->name,
1059  agent->type->name);
1060  agent->type->valid = 0;
1061  }
1062 
1063  if (agent->owner->id < 0 && !agent->type->valid)
1064  AGENT_SEQUENTIAL_PRINT("agent failed startup test, removing from meta agents\n");
1065 
1066  AGENT_SEQUENTIAL_PRINT("successfully remove from the system\n");
1067  job_remove_agent(agent->owner, scheduler->job_list, agent);
1068  g_tree_remove(scheduler->agents, &agent->pid);
1069  g_free(pid);
1070 }
1071 
1083 void agent_create_event(scheduler_t* scheduler, agent_t* agent)
1084 {
1085  TEST_NULV(agent);
1086 
1087  AGENT_SEQUENTIAL_PRINT("agent successfully spawned\n");
1088  g_tree_insert(scheduler->agents, &agent->pid, agent);
1089  agent_transition(agent, AG_SPAWNED);
1090  job_add_agent(agent->owner, agent);
1091 }
1092 
1104 void agent_ready_event(scheduler_t* scheduler, agent_t* agent)
1105 {
1106  int ret;
1107 
1108  TEST_NULV(agent);
1109  // If the agent has no job (owner is NULL), it shouldn't be here.
1110  // This prevents the "job passed is NULL
1111  if (agent->owner == NULL)
1112  {
1113  ERROR("Agent ready event received but agent has no owner. Terminating agent to prevent scheduler crash.");
1114  agent_kill(agent);
1115  return;
1116  }
1117 
1118  if (agent->status == AG_SPAWNED)
1119  {
1120  agent_transition(agent, AG_RUNNING);
1121  AGENT_SEQUENTIAL_PRINT("agent successfully created\n");
1122  }
1123 
1124  if ((ret = job_is_open(scheduler, agent->owner)) == 0)
1125  {
1126  agent_transition(agent, AG_PAUSED);
1127  job_finish_agent(agent->owner, agent);
1128  job_update(scheduler, agent->owner);
1129  return;
1130  }
1131  else if (ret < 0)
1132  {
1133  agent_transition(agent, AG_FAILED);
1134  return;
1135  }
1136  else
1137  {
1138  agent->data = job_next(agent->owner);
1139  agent->updated = 1;
1140  }
1141 
1142  if (write(agent->to_parent, "@@@0\n", 5) != 5)
1143  {
1144  AGENT_ERROR("failed sending new data to agent");
1145  agent_kill(agent);
1146  }
1147 }
1148 
1158 void agent_update_event(scheduler_t* scheduler, void* unused)
1159 {
1160  g_tree_foreach(scheduler->agents, (GTraverseFunc) update, NULL);
1161 }
1162 
1173 void agent_fail_event(scheduler_t* scheduler, agent_t* agent)
1174 {
1175  TEST_NULV(agent);
1176  agent_transition(agent, AG_FAILED);
1177  job_fail_agent(agent->owner, agent);
1178  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1179  AGENT_ERROR("Failed to kill agent thread cleanly");
1180 }
1181 
1189 void list_agents_event(scheduler_t* scheduler, GOutputStream* ostr)
1190 {
1191  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_list, ostr);
1192  g_output_stream_write(ostr, "\nend\n", 5, NULL, NULL);
1193 }
1194 
1195 /* ************************************************************************** */
1196 /* **** Modifier Functions ************************************************** */
1197 /* ************************************************************************** */
1198 
1207 void agent_transition(agent_t* agent, agent_status new_status)
1208 {
1209  AGENT_SEQUENTIAL_PRINT("agent status change: %s -> %s\n", agent_status_strings[agent->status],
1210  agent_status_strings[new_status]);
1211 
1212  if (agent->owner->id > 0)
1213  {
1214  if (agent->status == AG_PAUSED)
1215  {
1216  host_increase_load(agent->host);
1218  }
1219  if (new_status == AG_PAUSED)
1220  {
1221  host_decrease_load(agent->host);
1223  }
1224  }
1225 
1226  agent->status = new_status;
1227 }
1228 
1235 void agent_pause(agent_t* agent)
1236 {
1237  kill(agent->pid, SIGSTOP);
1238  agent_transition(agent, AG_PAUSED);
1239 }
1240 
1249 {
1250  kill(agent->pid, SIGCONT);
1251  agent_transition(agent, AG_RUNNING);
1252 }
1253 
1264 void agent_print_status(agent_t* agent, GOutputStream* ostr)
1265 {
1266  gchar* status_str;
1267  char time_buf[64];
1268  struct tm* time_info;
1269 
1270  TEST_NULV(agent);
1271  TEST_NULV(ostr);
1272 
1273  strcpy(time_buf, "(none)");
1274  time_info = localtime(&agent->check_in);
1275  if (time_info)
1276  strftime(time_buf, sizeof(time_buf), "%F %T", localtime(&agent->check_in));
1277  status_str = g_strdup_printf("agent:%d host:%s type:%s status:%s time:%s\n", agent->pid, agent->host->name,
1278  agent->type->name, agent_status_strings[agent->status], time_buf);
1279 
1280  AGENT_SEQUENTIAL_PRINT("AGENT_STATUS: %s", status_str);
1281  g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1282  g_free(status_str);
1283  return;
1284 }
1285 
1294 void agent_kill(agent_t* agent)
1295 {
1296  AGENT_SEQUENTIAL_PRINT("KILL: sending SIGKILL to pid %d\n", agent->pid);
1298  kill(agent->pid, SIGKILL);
1299 }
1300 
1309 int aprintf(agent_t* agent, const char* fmt, ...)
1310 {
1311  va_list args;
1312  int rc;
1313  char* tmp;
1314 
1315  va_start(args, fmt);
1316  if (TVERB_AGENT)
1317  {
1318  tmp = g_strdup_vprintf(fmt, args);
1319  tmp[strlen(tmp) - 1] = '\0';
1320  AGENT_CONCURRENT_PRINT("sent to agent \"%s\"\n", tmp);
1321  rc = fprintf(agent->write, "%s\n", tmp);
1322  g_free(tmp);
1323  }
1324  else
1325  {
1326  rc = vfprintf(agent->write, fmt, args);
1327  }
1328  va_end(args);
1329  fflush(agent->write);
1330 
1331  return rc;
1332 }
1333 
1345 ssize_t agent_write(agent_t* agent, const void* buf, int count)
1346 {
1347  return write(agent->to_parent, buf, count);
1348 }
1349 
1350 /* ************************************************************************** */
1351 /* **** static functions and meta agents ************************************ */
1352 /* ************************************************************************** */
1353 
1361 void test_agents(scheduler_t* scheduler)
1362 {
1363  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_test, scheduler);
1364 }
1365 
1372 void kill_agents(scheduler_t* scheduler)
1373 {
1374  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_kill_traverse, NULL);
1375 }
1376 
1387 int add_meta_agent(GTree* meta_agents, char* name, char* cmd, int max, int spc)
1388 {
1389  meta_agent_t* ma;
1390 
1391  if (name == NULL)
1392  return 0;
1393 
1394  if (g_tree_lookup(meta_agents, name) == NULL)
1395  {
1396  if ((ma = meta_agent_init(name, cmd, max, spc)) == NULL)
1397  return 0;
1398  g_tree_insert(meta_agents, ma->name, ma);
1399  return 1;
1400  }
1401 
1402  return 0;
1403 }
1404 
1412 int is_meta_special(meta_agent_t* ma, int special_type)
1413 {
1414  return (ma != NULL) && ((ma->special & special_type) != 0);
1415 }
1416 
1424 int is_agent_special(agent_t* agent, int special_type)
1425 {
1426  return (agent != NULL) && ((agent->special & special_type) != 0);
1427 }
1428 
1434 {
1435  ma->run_count++;
1436  V_AGENT("AGENT[%s] run increased to %d\n", ma->name, ma->run_count);
1437 }
1438 
1444 {
1445  ma->run_count--;
1446  V_AGENT("AGENT[%s] run decreased to %d\n", ma->name, ma->run_count);
1447 }
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces.
Definition: agent.c:234
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
Definition: agent.c:1104
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
Definition: agent.c:163
int aprintf(agent_t *agent, const char *fmt,...)
Definition: agent.c:1309
static int agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent's pipes.
Definition: agent.c:140
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
Definition: agent.c:864
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
Definition: agent.c:255
ssize_t agent_write(agent_t *agent, const void *buf, int count)
Definition: agent.c:1345
#define AGENT_SEQUENTIAL_PRINT(...)
Definition: agent.c:103
#define AGENT_CONCURRENT_PRINT(...)
Definition: agent.c:108
#define AGENT_ERROR(...)
Definition: agent.c:82
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
Definition: agent.c:219
void agent_update_event(scheduler_t *scheduler, void *unused)
Definition: agent.c:1158
void meta_agent_increase_count(meta_agent_t *ma)
Definition: agent.c:1433
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
Definition: agent.c:986
#define AGENT_WARNING(...)
Definition: agent.c:96
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
Definition: agent.c:1173
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
Definition: agent.c:806
void agent_kill(agent_t *agent)
Unclean kill of an agent.
Definition: agent.c:1294
void agent_pause(agent_t *agent)
Definition: agent.c:1235
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
Definition: agent.c:1387
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
Definition: agent.c:1083
void agent_transition(agent_t *agent, agent_status new_status)
Definition: agent.c:1207
#define TEST_NULL(a, ret)
Test if paramater is NULL.
Definition: agent.c:57
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
Definition: agent.c:281
void agent_unpause(agent_t *agent)
Definition: agent.c:1248
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
Definition: agent.c:1361
void meta_agent_decrease_count(meta_agent_t *ma)
Definition: agent.c:1443
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
Definition: agent.c:1372
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
Definition: agent.c:1424
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
Definition: agent.c:1189
#define TEST_NULV(a)
Test if paramater is NULL.
Definition: agent.c:47
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
Definition: agent.c:1412
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
Definition: agent.c:568
#define SELECT_STRING(passed)
Definition: agent.c:120
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
Definition: agent.c:1014
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
Definition: agent.c:673
void meta_agent_destroy(meta_agent_t *ma)
Definition: agent.c:846
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Definition: agent.c:1264
Header file with agent related operations.
#define MAX_CMD
the size of the agent's command buffer (arbitrary)
Definition: agent.h:26
#define SAG_NOKILL
This agent should not be killed when updating the agent.
Definition: agent.h:33
#define AGENT_STATUS_TYPES(apply)
Definition: agent.h:50
#define MAX_NAME
the size of the agent's name buffer (arbitrary)
Definition: agent.h:27
Event handling operations.
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
Definition: host.c:113
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
Definition: host.c:102
void job_fail_agent(job_t *job, void *agent)
Definition: job.c:493
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:406
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
Definition: job.c:164
void job_update(scheduler_t *scheduler, job_t *job)
Definition: job.c:531
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
Definition: job.c:570
char * job_next(job_t *job)
Definition: job.c:610
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
Definition: job.c:449
log_t * job_log(job_t *job)
Definition: job.c:637
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
Definition: job.c:434
void job_finish_agent(job_t *job, void *agent)
Definition: job.c:478
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
Definition: libfossagent.c:295
int jobId
The id of the job.
char buffer[2048]
The last thing received from the scheduler.
log_t * main_log
Definition: logging.c:33
Log related operations.
#define ERROR(...)
Definition: logging.h:79
#define THREAD_FATAL(file,...)
Definition: logging.h:71
start($application)
start the application Assumes application is restartable via /etc/init.d/<script>....
Definition: pkgConfig.php:1214
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:841
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
Definition: database.c:992
Header file for the scheduler.
#define AGENT_CONF
Agent conf location.
Definition: scheduler.h:123
#define AGENT_BINARY
Format to get agent binary.
Definition: scheduler.h:122
agent_t * agent
Reference to current agent state.
Definition: agent.c:651
scheduler_t * scheduler
Reference to current scheduler state.
Definition: agent.c:650
Definition: agent.h:100
time_t check_in
the time that the agent last generated anything
Definition: agent.h:108
int to_child
file identifier to print to the child
Definition: agent.h:114
agent_status status
the state of execution the agent is currently in
Definition: agent.h:106
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
Definition: agent.h:102
int from_child
file identifier to read from child
Definition: agent.h:115
gchar * data
the data that has been sent to the agent for analysis
Definition: agent.h:122
FILE * write
FILE* that abstracts the use of the to_child socket.
Definition: agent.h:118
gboolean alive
flag to tell the scheduler if the agent is still alive
Definition: agent.h:125
job_t * owner
the job that this agent is assigned to
Definition: agent.h:121
FILE * read
FILE* that abstracts the use of the from_child socket.
Definition: agent.h:117
uint32_t special
any special flags that the agent has set
Definition: agent.h:127
int to_parent
file identifier to print to the parent (child stdout)
Definition: agent.h:116
uint8_t n_updates
keeps track of the number of times the agent has updated
Definition: agent.h:109
uint64_t total_analyzed
the total number that this agent has analyzed
Definition: agent.h:124
host_t * host
the host that this agent will start on
Definition: agent.h:103
GThread * thread
the thread that communicates with this agent
Definition: agent.h:107
int from_parent
file identifier to read from the parent (child stdin)
Definition: agent.h:113
uint8_t return_code
what was returned by the agent when it disconnected
Definition: agent.h:126
pid_t pid
the pid of the process this agent is running in
Definition: agent.h:110
gboolean updated
boolean flag to indicate if the scheduler has updated the data
Definition: agent.h:123
Definition: host.h:26
char * agent_dir
The location on the host machine where the executables are.
Definition: host.h:29
char * address
The address of the host, used by ssh when starting a new agent.
Definition: host.h:28
char * name
The name of the host, used to store host internally to scheduler.
Definition: host.h:27
The job structure.
Definition: job.h:51
int32_t id
The identifier for this job.
Definition: job.h:73
job_status status
The current status for the job.
Definition: job.h:61
int32_t group_id
The id of the group that created the job.
Definition: job.h:75
gchar * message
Message that will be sent with job notification email.
Definition: job.h:69
gchar * jq_cmd_args
Command line arguments for this job.
Definition: job.h:63
int32_t user_id
The id of the user that created the job.
Definition: job.h:74
char * agent_type
The type of agent used to analyze the data.
Definition: job.h:53
int32_t priority
Importance of the job, maps directory to unix priority.
Definition: job.h:70
int32_t parent_id
The identifier for the parent of this job (its queue id)
Definition: job.h:72
Store the results of a regex match.
Definition: scanners.hpp:28
int valid
flag indicating if the meta_agent is valid
Definition: agent.h:89
char raw_cmd[MAX_CMD+1]
the raw command that will start the agent, used for ssh
Definition: agent.h:84
int max_run
the maximum number that can run at once -1 if no limit
Definition: agent.h:85
char name[256]
the name associated with this agent i.e. nomos, copyright...
Definition: agent.h:83
int run_count
the count of agents in running state
Definition: agent.h:90
char * version
the version of the agent that is running on all hosts
Definition: agent.h:88
char * version_source
the machine that reported the version information
Definition: agent.h:87
int special
any special condition associated with the agent
Definition: agent.h:86
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:172
gchar * sysconfigdir
The system directory that contain fossology.conf.
Definition: scheduler.h:150
GList * host_queue
Round-robin queue for choosing which host use next.
Definition: scheduler.h:161
GRegex * parse_agent_msg
Parses messages coming from the agents.
Definition: scheduler.h:186
GTree * meta_agents
List of all meta agents available to the scheduler.
Definition: scheduler.h:156
GTree * agents
List of any currently running agents.
Definition: scheduler.h:157
GSequence * job_queue
heap of jobs that still need to be started
Definition: scheduler.h:173