FOSSology  4.5.0-rc1
Open Source License Compliance by Open Source Software
agent.c
Go to the documentation of this file.
1 /*
2  SPDX-FileCopyrightText: © 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3  SPDX-FileCopyrightText: © 2015 Siemens AG
4 
5  SPDX-License-Identifier: GPL-2.0-only
6 */
7 
15 /* local includes */
16 #include <agent.h>
17 #include <database.h>
18 #include <event.h>
19 #include <host.h>
20 #include <job.h>
21 #include <logging.h>
22 #include <scheduler.h>
23 
24 /* library includes */
25 #include <limits.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <time.h>
30 
31 /* unix library includes */
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 /* other library includes */
39 #include <glib.h>
40 
47 #define TEST_NULV(a) if(!a) { \
48  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; }
57 #define TEST_NULL(a, ret) if(!a) { \
58  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; }
59 
61 #define AGENT_CREDENTIAL \
62  log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, agent->type->name, \
63  agent->pid, agent->host->name)
64 
66 #define AGENT_LOG_CREDENTIAL \
67  con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \
68  agent->owner->id, agent->type->name, agent->pid, agent->host->name)
69 
71 #define AGENT_ERROR(...) do { \
72  log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \
73  AGENT_CREDENTIAL; \
74  log_printf(__VA_ARGS__); \
75  log_printf("\n"); } while(0)
76 
78 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \
79  log_printf("NOTE: "); \
80  AGENT_CREDENTIAL; \
81  log_printf(__VA_ARGS__); \
82  log_printf("\n"); } while(0)
83 
85 #define AGENT_WARNING(...) if(TEST_WARNING) do { \
86  log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \
87  AGENT_CREDENTIAL; \
88  log_printf(__VA_ARGS__); \
89  log_printf("\n"); } while(0)
90 
92 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \
93  AGENT_CREDENTIAL; \
94  log_printf(__VA_ARGS__); } while(0)
95 
97 #define AGENT_CONCURRENT_PRINT(...) do { \
98  AGENT_LOG_CREDENTIAL; \
99  con_printf(job_log(agent->owner), __VA_ARGS__); } while(0)
100 
101 /* ************************************************************************** */
102 /* **** Data Types ********************************************************** */
103 /* ************************************************************************** */
104 
109 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed),
110 const char* agent_status_strings[] =
112 #undef SELECT_STRING
113 
114 /* ************************************************************************** */
115 /* **** Local Functions ***************************************************** */
116 /* ************************************************************************** */
117 
129 static int agent_close_fd(int* pid_ptr, agent_t* agent, agent_t* excepted)
130 {
131  TEST_NULL(agent, 0);
132  if (agent != excepted)
133  {
134  close(agent->from_child);
135  close(agent->to_child);
136  fclose(agent->read);
137  fclose(agent->write);
138  }
139  return 0;
140 }
141 
152 static int update(int* pid_ptr, agent_t* agent, gpointer unused)
153 {
154  TEST_NULL(agent, 0);
155  int nokill = is_agent_special(agent, SAG_NOKILL) || is_meta_special(agent->type, SAG_NOKILL);
156 
157  if (agent->status == AG_SPAWNED || agent->status == AG_RUNNING || agent->status == AG_PAUSED)
158  {
159  /* check last checkin time */
160  if (time(NULL) - agent->check_in > CONF_agent_death_timer && !(agent->owner->status == JB_PAUSED) && !nokill)
161  {
162  AGENT_CONCURRENT_PRINT("no heartbeat for %d seconds\n", (time(NULL) - agent->check_in));
163  agent_kill(agent);
164  return 0;
165  }
166 
167  /* check items processed */
168  if (agent->status != AG_PAUSED && !agent->alive)
169  {
170  agent->n_updates++;
171  }
172  else
173  {
174  agent->n_updates = 0;
175  }
176  if (agent->n_updates > CONF_agent_update_number && !nokill)
177  {
178  AGENT_CONCURRENT_PRINT("agent has not set the alive flag in at least 10 minutes, killing\n");
179  agent_kill(agent);
180  return 0;
181  }
182 
183  AGENT_SEQUENTIAL_PRINT("agent updated correctly, processed %d items: %d\n", agent->total_analyzed,
184  agent->n_updates);
185  agent->alive = 0;
186  }
187 
188  return 0;
189 }
190 
202 static int agent_kill_traverse(int* pid, agent_t* agent, gpointer unused)
203 {
204  agent_kill(agent);
205  return FALSE;
206 }
207 
217 static int agent_list(char* name, meta_agent_t* ma, GOutputStream* ostr)
218 {
219  if (ma->valid)
220  {
221  g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
222  g_output_stream_write(ostr, " ", 1, NULL, NULL);
223  }
224  return FALSE;
225 }
226 
238 static int agent_test(const gchar* name, meta_agent_t* ma, scheduler_t* scheduler)
239 {
240  static int32_t id_gen = -1;
241 
242  GList* iter;
243  host_t* host;
244  char *jq_cmd_args = 0;
245 
246  for (iter = scheduler->host_queue; iter != NULL; iter = iter->next)
247  {
248  host = (host_t*) iter->data;
249  V_AGENT("META_AGENT[%s] testing on HOST[%s]\n", ma->name, host->name);
250  job_t* job = job_init(scheduler->job_list, scheduler->job_queue, ma->name, host->name, id_gen--, 0, 0, 0, 0, jq_cmd_args);
251  agent_init(scheduler, host, job);
252  }
253 
254  return 0;
255 }
256 
264 static void agent_listen(scheduler_t* scheduler, agent_t* agent)
265 {
266  /* static locals */
267 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
268  static GMutex version_lock;
269 #else
270  static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
271 #endif
272 
273  /* locals */
274  char buffer[1024]; // buffer to store c strings read from agent
275  GMatchInfo* match; // regex match information
276  char* arg; // used during regex retrievals
277  int relevant; // used during special retrievals
278 
279  TEST_NULV(agent);
280 
291  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
292  {
293  AGENT_CONCURRENT_PRINT("pipe from child closed: %s\n", strerror(errno));
294  g_thread_exit(NULL);
295  }
296 
297  /* check to make sure "VERSION" was sent */
298  buffer[strlen(buffer) - 1] = '\0';
299  if (strncmp(buffer, "VERSION: ", 9) != 0)
300  {
301  if (strncmp(buffer, "@@@1", 4) == 0)
302  {
303  THREAD_FATAL(job_log(agent->owner), "agent crashed before sending version information");
304  }
305  else
306  {
307  agent->type->valid = 0;
308  agent_fail_event(scheduler, agent);
309  agent_kill(agent);
310  con_printf(main_log, "ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
311  agent->host->name, agent->type->name);
312  AGENT_CONCURRENT_PRINT("agent didn't send version information: \"%s\"\n", buffer);
313  return;
314  }
315  }
316 
317  /* check that the VERSION information is correct */
318 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
319  g_mutex_lock(&version_lock);
320 #else
321  g_static_mutex_lock(&version_lock);
322 #endif
323  strcpy(buffer, &buffer[9]);
324  if (agent->type->version == NULL && agent->type->valid)
325  {
326  agent->type->version_source = agent->host->name;
327  agent->type->version = g_strdup(buffer);
328  if (TVERB_AGENT)
329  con_printf(main_log, "META_AGENT[%s.%s] version is: \"%s\"\n", agent->host->name, agent->type->name,
330  agent->type->version);
331  }
332  else if (strcmp(agent->type->version, buffer) != 0)
333  {
334  con_printf(job_log(agent->owner), "ERROR %s.%d: META_DATA[%s] invalid agent spawn check\n", __FILE__, __LINE__,
335  agent->type->name);
336  con_printf(job_log(agent->owner), "ERROR: versions don't match: %s(%s) != received: %s(%s)\n",
337  agent->type->version_source, agent->type->version, agent->host->name, buffer);
338  agent->type->valid = 0;
339  agent_kill(agent);
340 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
341  g_mutex_unlock(&version_lock);
342 #else
343  g_static_mutex_unlock(&version_lock);
344 #endif
345  return;
346  }
347 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
348  g_mutex_unlock(&version_lock);
349 #else
350  g_static_mutex_unlock(&version_lock);
351 #endif
352 
362  while (1)
363  {
364  /* get message from agent */
365  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
366  g_thread_exit(NULL);
367 
368  buffer[strlen(buffer) - 1] = '\0';
369 
370  if (strlen(buffer) == 0)
371  continue;
372 
373  if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(buffer, "SPECIAL", 7) != 0))
374  AGENT_CONCURRENT_PRINT("received: \"%s\"\n", buffer);
375 
383  if (strncmp(buffer, "BYE", 3) == 0)
384  {
385  if ((agent->return_code = atoi(&(buffer[4]))) != 0)
386  {
387  AGENT_CONCURRENT_PRINT("agent failed with error code %d\n", agent->return_code);
388  event_signal(agent_fail_event, agent);
389  }
390  break;
391  }
392 
399  if (strncmp(buffer, "@@@1", 4) == 0)
400  break;
401 
408  if (strncmp(buffer, "@@@0", 4) == 0 && agent->updated)
409  {
410  aprintf(agent, "%s\n", agent->data);
411  aprintf(agent, "END\n");
412  fflush(agent->write);
413  agent->updated = 0;
414  continue;
415  }
416 
417  /* agent just checked in */
418  agent->check_in = time(NULL);
419 
428  if (strncmp(buffer, "OK", 2) == 0)
429  {
430  if (agent->status != AG_PAUSED)
431  event_signal(agent_ready_event, agent);
432  }
433 
441  else if (strncmp(buffer, "HEART", 5) == 0)
442  {
443  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
444 
445  arg = g_match_info_fetch(match, 3);
446  agent->total_analyzed = atoi(arg);
447  g_free(arg);
448 
449  arg = g_match_info_fetch(match, 6);
450  agent->alive = (arg[0] == '1' || agent->alive);
451  g_free(arg);
452 
453  g_match_info_free(match);
454  match = NULL;
455 
457  }
458 
465  else if (strncmp(buffer, "EMAIL", 5) == 0)
466  {
467  agent->owner->message = g_strdup(buffer + 6);
468  }
469 
477  else if (strncmp(buffer, "SPECIAL", 7) == 0)
478  {
479  relevant = INT_MAX;
480 
481  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
482 
483  arg = g_match_info_fetch(match, 3);
484  relevant &= atoi(arg);
485  g_free(arg);
486 
487  arg = g_match_info_fetch(match, 6);
488  if (atoi(arg))
489  {
490  if (agent->special & relevant)
491  relevant = 0;
492  }
493  else
494  {
495  if (!(agent->special & relevant))
496  relevant = 0;
497  }
498  g_free(arg);
499 
500  g_match_info_free(match);
501 
502  agent->special ^= relevant;
503  }
504 
510  else if (strncmp(buffer, "GETSPECIAL", 10) == 0)
511  {
512  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
513 
514  arg = g_match_info_fetch(match, 3);
515  relevant = atoi(arg);
516  g_free(arg);
517 
518  if (agent->special & relevant)
519  aprintf(agent, "VALUE: 1\n");
520  else
521  aprintf(agent, "VALUE: 0\n");
522 
523  g_match_info_free(match);
524  }
525 
531  else if (!(TVERB_AGENT))
532  AGENT_CONCURRENT_PRINT("\"%s\"\n", buffer);
533  }
534 
535  if (TVERB_AGENT)
536  AGENT_CONCURRENT_PRINT("communication thread closing\n");
537 }
538 
551 static void shell_parse(char* confdir, int user_id, int group_id, char* input, char *jq_cmd_args, int jobId, int* argc, char*** argv)
552 {
553  char* begin;
554  char* curr;
555  int idx = 0;
556 #define MAX_CMD_ARGS 30
557 
558  *argv = g_new0(char*, MAX_CMD_ARGS);
559  begin = NULL;
560 
561  for (curr = input; *curr; curr++)
562  {
563  if (*curr == ' ')
564  {
565  if (begin == NULL)
566  continue;
567 
568  if (*begin == '"')
569  continue;
570 
571  *curr = '\0';
572  (*argv)[idx++] = g_strdup(begin);
573  begin = NULL;
574  }
575  else if (begin == NULL)
576  {
577  begin = curr;
578  }
579  else if (*begin == '"' && *curr == '"')
580  {
581  *begin = '\0';
582  *curr = '\0';
583 
584  (*argv)[idx++] = g_strdup(begin + 1);
585  begin = NULL;
586  }
587  if (idx > MAX_CMD_ARGS - 7)
588  break;
589  }
590 
591  (*argv)[idx++] = g_strdup_printf("--jobId=%d", jobId);
592  (*argv)[idx++] = g_strdup_printf("--config=%s", confdir);
593  (*argv)[idx++] = g_strdup_printf("--userID=%d", user_id);
594  (*argv)[idx++] = g_strdup_printf("--groupID=%d", group_id);
595  (*argv)[idx++] = "--scheduler_start";
596  if (jq_cmd_args)
597  {
598  const char *start = jq_cmd_args;
599  const char *current = jq_cmd_args;
600  gboolean in_quotes = FALSE;
601 
602  while (*current != '\0')
603  {
604  if (*current == '\'' || *current == '"')
605  in_quotes = !in_quotes;
606  else if (*current == ' ' && !in_quotes)
607  {
608  if (current > start)
609  {
610  int len = current - start;
611  char *arg = g_strndup(start, len);
612  (*argv)[idx++] = arg;
613  }
614  start = current + 1;
615  }
616  current++;
617  }
618 
619  if (current > start)
620  {
621  char *arg = g_strndup(start, current - start);
622  (*argv)[idx++] = arg;
623  }
624  }
625  (*argc) = idx;
626 }
627 
631 typedef struct
632 {
636 
656 static void* agent_spawn(agent_spawn_args* pass)
657 {
658  /* locals */
659  scheduler_t* scheduler = pass->scheduler;
660  agent_t* agent = pass->agent;
661  gchar* tmp; // pointer to temporary string
662  gchar** args; // the arguments that will be passed to the child
663  int argc; // the number of arguments parsed
664  int len;
665  char buffer[2048]; // character buffer
666 
667  /* spawn the new process */
668  while ((agent->pid = fork()) < 0)
669  sleep(rand() % CONF_fork_backoff_time);
670 
671  /* we are in the child */
672  if (agent->pid == 0)
673  {
674  /* set the child's stdin and stdout to use the pipes */
675  dup2(agent->from_parent, fileno(stdin));
676  dup2(agent->to_parent, fileno(stdout));
677  dup2(agent->to_parent, fileno(stderr));
678 
679  /* close all the unnecessary file descriptors */
680  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_close_fd, agent);
681  close(agent->from_child);
682  close(agent->to_child);
683 
684  /* set the priority of the process to the job's priority */
685  if (nice(agent->owner->priority) == -1)
686  ERROR("unable to correctly set priority of agent process %d", agent->pid);
687 
688  /* if host is null, the agent will run locally to */
689  /* run the agent locally, use the commands that */
690  /* were parsed when the meta_agent was created */
691  if (strcmp(agent->host->address, LOCAL_HOST) == 0)
692  {
693  shell_parse(scheduler->sysconfigdir, agent->owner->user_id, agent->owner->group_id,
694  agent->type->raw_cmd, agent->owner->jq_cmd_args,
695  agent->owner->parent_id, &argc, &args);
696 
697  tmp = args[0];
698  args[0] = g_strdup_printf(AGENT_BINARY, scheduler->sysconfigdir,
699  AGENT_CONF, agent->type->name, tmp);
700 
701  strcpy(buffer, args[0]);
702  *strrchr(buffer, '/') = '\0';
703  if (chdir(buffer) != 0)
704  {
705  ERROR("unable to change working directory: %s\n", strerror(errno));
706  }
707 
708  execv(args[0], args);
709  }
710  /* otherwise the agent will be started using ssh */
711  /* if the agent is started using ssh we don't need */
712  /* to fully parse the arguments, just pass the run */
713  /* command as the last argument to the ssh command */
714  else
715  {
716  args = g_new0(char*, 5);
717  len = snprintf(buffer, sizeof(buffer), AGENT_BINARY " --userID=%d --groupID=%d --scheduler_start --jobId=%d",
718  agent->host->agent_dir, AGENT_CONF, agent->type->name, agent->type->raw_cmd,
719  agent->owner->user_id, agent->owner->group_id, agent->owner->parent_id);
720 
721  if (len>=sizeof(buffer)) {
722  *(buffer + sizeof(buffer) - 1) = '\0';
723  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
724  __FILE__, __LINE__, agent->owner->id, agent->owner->agent_type, buffer);
725 
726  exit(5);
727  }
728 
729  args[0] = "/usr/bin/ssh";
730  args[1] = agent->host->address;
731  args[2] = buffer;
732  args[3] = agent->owner->jq_cmd_args;
733  args[4] = NULL;
734  execv(args[0], args);
735  }
736 
737  /* If we reach here, the exec call has failed */
738  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->owner->id,
739  agent->owner->agent_type, getpid(), strerror(errno));
740  }
741  /* we are in the parent */
742  else
743  {
744  event_signal(agent_create_event, agent);
745  agent_listen(scheduler, agent);
746  }
747 
748  return NULL;
749 }
750 
751 /* ************************************************************************** */
752 /* **** Constructor Destructor ********************************************** */
753 /* ************************************************************************** */
754 
771 meta_agent_t* meta_agent_init(char* name, char* cmd, int max, int spc)
772 {
773  /* locals */
774  meta_agent_t* ma;
775 
776  /* test inputs */
777  if (!name || !cmd)
778  {
779  ERROR("invalid arguments passed to meta_agent_init()");
780  return NULL;
781  }
782 
783  /* confirm valid inputs */
784  if (strlen(name) > MAX_NAME || strlen(cmd) > MAX_CMD)
785  {
786  log_printf("ERROR failed to load %s meta agent", name);
787  return NULL;
788  }
789 
790  /* inputs are valid, create the meta_agent */
791  ma = g_new0(meta_agent_t, 1);
792 
793  strcpy(ma->name, name);
794  strcpy(ma->raw_cmd, cmd);
795  strcat(ma->raw_cmd, " --scheduler_start");
796  ma->max_run = max;
797  ma->run_count = 0;
798  ma->special = spc;
799  ma->version = NULL;
800  ma->valid = TRUE;
801 
802  return ma;
803 }
804 
812 {
813  TEST_NULV(ma);
814  g_free(ma->version);
815  g_free(ma);
816 }
817 
829 agent_t* agent_init(scheduler_t* scheduler, host_t* host, job_t* job)
830 {
831  /* local variables */
832  agent_t* agent;
833  int child_to_parent[2];
834  int parent_to_child[2];
835  agent_spawn_args* pass;
836 
837  /* check job input */
838  if (!job)
839  {
840  log_printf("ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
841  log_printf("ERROR: no other information available\n");
842  return NULL;
843  }
844 
845  /* check that the agent type exists */
846  if (g_tree_lookup(scheduler->meta_agents, job->agent_type) == NULL)
847  {
848  log_printf("ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
849  job->id, job->agent_type);
850  job->message = NULL;
851  job_fail_event(scheduler, job);
852  job_remove_agent(job, scheduler->job_list, NULL);
853  return NULL;
854  }
855 
856  /* allocate memory and do trivial assignments */
857  agent = g_new(agent_t, 1);
858  agent->type = g_tree_lookup(scheduler->meta_agents, job->agent_type);
859  agent->status = AG_CREATED;
860 
861  /* make sure that there is a metaagent for the job */
862  if (agent->type == NULL)
863  {
864  ERROR("meta agent %s does not exist", job->agent_type);
865  return NULL;
866  }
867 
868  /* check if the agent is valid */
869  if (!agent->type->valid)
870  {
871  ERROR("agent %s has been invalidated by version information", job->agent_type);
872  return NULL;
873  }
874 
875  /* create the pipes between the child and the parent */
876  if (pipe(parent_to_child) != 0)
877  {
878  ERROR("JOB[%d.%s] failed to create parent to child pipe", job->id, job->agent_type);
879  g_free(agent);
880  return NULL;
881  }
882  if (pipe(child_to_parent) != 0)
883  {
884  ERROR("JOB[%d.%s] failed to create child to parent pipe", job->id, job->agent_type);
885  g_free(agent);
886  return NULL;
887  }
888 
889  /* set file identifiers to correctly talk to children */
890  agent->from_parent = parent_to_child[0];
891  agent->to_child = parent_to_child[1];
892  agent->from_child = child_to_parent[0];
893  agent->to_parent = child_to_parent[1];
894 
895  /* initialize other info */
896  agent->host = host;
897  agent->owner = job;
898  agent->updated = 0;
899  agent->n_updates = 0;
900  agent->data = NULL;
901  agent->return_code = -1;
902  agent->total_analyzed = 0;
903  agent->special = 0;
904 
905  /* open the relevant file pointers */
906  if ((agent->read = fdopen(agent->from_child, "r")) == NULL)
907  {
908  ERROR("JOB[%d.%s] failed to initialize read file", job->id, job->agent_type);
909  g_free(agent);
910  return NULL;
911  }
912  if ((agent->write = fdopen(agent->to_child, "w")) == NULL)
913  {
914  ERROR("JOB[%d.%s] failed to initialize write file", job->id, job->agent_type);
915  g_free(agent);
916  return NULL;
917  }
918 
919  /* increase the load on the host and count of running agents */
920  if (agent->owner->id > 0)
921  {
922  host_increase_load(agent->host);
924  }
925 
926  /* spawn the listen thread */
927  pass = g_new0(agent_spawn_args, 1);
928  pass->scheduler = scheduler;
929  pass->agent = agent;
930 
931 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
932  agent->thread = g_thread_new(agent->type->name, (GThreadFunc) agent_spawn, pass);
933 #else
934  agent->thread = g_thread_create((GThreadFunc)agent_spawn, pass, 1, NULL);
935 #endif
936 
937  return agent;
938 }
939 
951 void agent_destroy(agent_t* agent)
952 {
953  TEST_NULV(agent);
954 
955  /* close all of the files still open for this agent */
956  close(agent->from_child);
957  close(agent->to_child);
958  close(agent->from_parent);
959  close(agent->to_parent);
960  fclose(agent->write);
961  fclose(agent->read);
962 
963  /* release the child process */
964  g_free(agent);
965 }
966 
967 /* ************************************************************************** */
968 /* **** Events ************************************************************** */
969 /* ************************************************************************** */
970 
979 void agent_death_event(scheduler_t* scheduler, pid_t* pid)
980 {
981  agent_t* agent;
982  int status = pid[1];
983 
984  if ((agent = g_tree_lookup(scheduler->agents, &pid[0])) == NULL)
985  {
986  ERROR("invalid agent death event: pid[%d]", pid[0]);
987  return;
988  }
989 
990  if (agent->owner->id >= 0)
991  event_signal(database_update_event, NULL);
992 
993  if (write(agent->to_parent, "@@@1\n", 5) != 5)
994  AGENT_SEQUENTIAL_PRINT("write to agent unsuccessful: %s\n", strerror(errno));
995  g_thread_join(agent->thread);
996 
997  if (agent->return_code != 0)
998  {
999  if (WIFEXITED(status))
1000  {
1001  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", (status >> 8));
1002  }
1003  else if (WIFSIGNALED(status))
1004  {
1005  AGENT_CONCURRENT_PRINT("agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
1006  if (WCOREDUMP(status))
1007  AGENT_CONCURRENT_PRINT("agent produced core dump\n");
1008  }
1009  else
1010  {
1011  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", agent->return_code);
1012  }
1013  AGENT_WARNING("agent closed unexpectedly, agent status was %s", agent_status_strings[agent->status]);
1014  agent_fail_event(scheduler, agent);
1015  }
1016 
1017  if (agent->status != AG_PAUSED && agent->status != AG_FAILED)
1018  agent_transition(agent, AG_PAUSED);
1019 
1020  job_update(scheduler, agent->owner);
1021  if (agent->status == AG_FAILED && agent->owner->id < 0)
1022  {
1023  log_printf("ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->host->name,
1024  agent->type->name);
1025  agent->type->valid = 0;
1026  }
1027 
1028  if (agent->owner->id < 0 && !agent->type->valid)
1029  AGENT_SEQUENTIAL_PRINT("agent failed startup test, removing from meta agents\n");
1030 
1031  AGENT_SEQUENTIAL_PRINT("successfully remove from the system\n");
1032  job_remove_agent(agent->owner, scheduler->job_list, agent);
1033  g_tree_remove(scheduler->agents, &agent->pid);
1034  g_free(pid);
1035 }
1036 
1048 void agent_create_event(scheduler_t* scheduler, agent_t* agent)
1049 {
1050  TEST_NULV(agent);
1051 
1052  AGENT_SEQUENTIAL_PRINT("agent successfully spawned\n");
1053  g_tree_insert(scheduler->agents, &agent->pid, agent);
1054  agent_transition(agent, AG_SPAWNED);
1055  job_add_agent(agent->owner, agent);
1056 }
1057 
1069 void agent_ready_event(scheduler_t* scheduler, agent_t* agent)
1070 {
1071  int ret;
1072 
1073  TEST_NULV(agent);
1074  if (agent->status == AG_SPAWNED)
1075  {
1076  agent_transition(agent, AG_RUNNING);
1077  AGENT_SEQUENTIAL_PRINT("agent successfully created\n");
1078  }
1079 
1080  if ((ret = job_is_open(scheduler, agent->owner)) == 0)
1081  {
1082  agent_transition(agent, AG_PAUSED);
1083  job_finish_agent(agent->owner, agent);
1084  job_update(scheduler, agent->owner);
1085  return;
1086  }
1087  else if (ret < 0)
1088  {
1089  agent_transition(agent, AG_FAILED);
1090  return;
1091  }
1092  else
1093  {
1094  agent->data = job_next(agent->owner);
1095  agent->updated = 1;
1096  }
1097 
1098  if (write(agent->to_parent, "@@@0\n", 5) != 5)
1099  {
1100  AGENT_ERROR("failed sending new data to agent");
1101  agent_kill(agent);
1102  }
1103 }
1104 
1114 void agent_update_event(scheduler_t* scheduler, void* unused)
1115 {
1116  g_tree_foreach(scheduler->agents, (GTraverseFunc) update, NULL);
1117 }
1118 
1129 void agent_fail_event(scheduler_t* scheduler, agent_t* agent)
1130 {
1131  TEST_NULV(agent);
1132  agent_transition(agent, AG_FAILED);
1133  job_fail_agent(agent->owner, agent);
1134  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1135  AGENT_ERROR("Failed to kill agent thread cleanly");
1136 }
1137 
1145 void list_agents_event(scheduler_t* scheduler, GOutputStream* ostr)
1146 {
1147  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_list, ostr);
1148  g_output_stream_write(ostr, "\nend\n", 5, NULL, NULL);
1149 }
1150 
1151 /* ************************************************************************** */
1152 /* **** Modifier Functions ************************************************** */
1153 /* ************************************************************************** */
1154 
1163 void agent_transition(agent_t* agent, agent_status new_status)
1164 {
1165  AGENT_SEQUENTIAL_PRINT("agent status change: %s -> %s\n", agent_status_strings[agent->status],
1166  agent_status_strings[new_status]);
1167 
1168  if (agent->owner->id > 0)
1169  {
1170  if (agent->status == AG_PAUSED)
1171  {
1172  host_increase_load(agent->host);
1174  }
1175  if (new_status == AG_PAUSED)
1176  {
1177  host_decrease_load(agent->host);
1179  }
1180  }
1181 
1182  agent->status = new_status;
1183 }
1184 
1191 void agent_pause(agent_t* agent)
1192 {
1193  kill(agent->pid, SIGSTOP);
1194  agent_transition(agent, AG_PAUSED);
1195 }
1196 
1205 {
1206  kill(agent->pid, SIGCONT);
1207  agent_transition(agent, AG_RUNNING);
1208 }
1209 
1220 void agent_print_status(agent_t* agent, GOutputStream* ostr)
1221 {
1222  gchar* status_str;
1223  char time_buf[64];
1224  struct tm* time_info;
1225 
1226  TEST_NULV(agent);
1227  TEST_NULV(ostr);
1228 
1229  strcpy(time_buf, "(none)");
1230  time_info = localtime(&agent->check_in);
1231  if (time_info)
1232  strftime(time_buf, sizeof(time_buf), "%F %T", localtime(&agent->check_in));
1233  status_str = g_strdup_printf("agent:%d host:%s type:%s status:%s time:%s\n", agent->pid, agent->host->name,
1234  agent->type->name, agent_status_strings[agent->status], time_buf);
1235 
1236  AGENT_SEQUENTIAL_PRINT("AGENT_STATUS: %s", status_str);
1237  g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1238  g_free(status_str);
1239  return;
1240 }
1241 
1250 void agent_kill(agent_t* agent)
1251 {
1252  AGENT_SEQUENTIAL_PRINT("KILL: sending SIGKILL to pid %d\n", agent->pid);
1254  kill(agent->pid, SIGKILL);
1255 }
1256 
1265 int aprintf(agent_t* agent, const char* fmt, ...)
1266 {
1267  va_list args;
1268  int rc;
1269  char* tmp;
1270 
1271  va_start(args, fmt);
1272  if (TVERB_AGENT)
1273  {
1274  tmp = g_strdup_vprintf(fmt, args);
1275  tmp[strlen(tmp) - 1] = '\0';
1276  AGENT_CONCURRENT_PRINT("sent to agent \"%s\"\n", tmp);
1277  rc = fprintf(agent->write, "%s\n", tmp);
1278  g_free(tmp);
1279  }
1280  else
1281  {
1282  rc = vfprintf(agent->write, fmt, args);
1283  }
1284  va_end(args);
1285  fflush(agent->write);
1286 
1287  return rc;
1288 }
1289 
1301 ssize_t agent_write(agent_t* agent, const void* buf, int count)
1302 {
1303  return write(agent->to_parent, buf, count);
1304 }
1305 
1306 /* ************************************************************************** */
1307 /* **** static functions and meta agents ************************************ */
1308 /* ************************************************************************** */
1309 
1317 void test_agents(scheduler_t* scheduler)
1318 {
1319  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_test, scheduler);
1320 }
1321 
1328 void kill_agents(scheduler_t* scheduler)
1329 {
1330  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_kill_traverse, NULL);
1331 }
1332 
1343 int add_meta_agent(GTree* meta_agents, char* name, char* cmd, int max, int spc)
1344 {
1345  meta_agent_t* ma;
1346 
1347  if (name == NULL)
1348  return 0;
1349 
1350  if (g_tree_lookup(meta_agents, name) == NULL)
1351  {
1352  if ((ma = meta_agent_init(name, cmd, max, spc)) == NULL)
1353  return 0;
1354  g_tree_insert(meta_agents, ma->name, ma);
1355  return 1;
1356  }
1357 
1358  return 0;
1359 }
1360 
1368 int is_meta_special(meta_agent_t* ma, int special_type)
1369 {
1370  return (ma != NULL) && ((ma->special & special_type) != 0);
1371 }
1372 
1380 int is_agent_special(agent_t* agent, int special_type)
1381 {
1382  return (agent != NULL) && ((agent->special & special_type) != 0);
1383 }
1384 
1390 {
1391  ma->run_count++;
1392  V_AGENT("AGENT[%s] run increased to %d\n", ma->name, ma->run_count);
1393 }
1394 
1400 {
1401  ma->run_count--;
1402  V_AGENT("AGENT[%s] run decreased to %d\n", ma->name, ma->run_count);
1403 }
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces.
Definition: agent.c:217
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
Definition: agent.c:1069
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
Definition: agent.c:152
int aprintf(agent_t *agent, const char *fmt,...)
Definition: agent.c:1265
static int agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent's pipes.
Definition: agent.c:129
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
Definition: agent.c:829
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
Definition: agent.c:238
ssize_t agent_write(agent_t *agent, const void *buf, int count)
Definition: agent.c:1301
#define AGENT_SEQUENTIAL_PRINT(...)
Definition: agent.c:92
#define AGENT_CONCURRENT_PRINT(...)
Definition: agent.c:97
#define AGENT_ERROR(...)
Definition: agent.c:71
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
Definition: agent.c:202
void agent_update_event(scheduler_t *scheduler, void *unused)
Definition: agent.c:1114
void meta_agent_increase_count(meta_agent_t *ma)
Definition: agent.c:1389
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
Definition: agent.c:951
#define AGENT_WARNING(...)
Definition: agent.c:85
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
Definition: agent.c:1129
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
Definition: agent.c:771
void agent_kill(agent_t *agent)
Unclean kill of an agent.
Definition: agent.c:1250
void agent_pause(agent_t *agent)
Definition: agent.c:1191
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
Definition: agent.c:1343
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
Definition: agent.c:1048
void agent_transition(agent_t *agent, agent_status new_status)
Definition: agent.c:1163
#define TEST_NULL(a, ret)
Test if paramater is NULL.
Definition: agent.c:57
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
Definition: agent.c:264
void agent_unpause(agent_t *agent)
Definition: agent.c:1204
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
Definition: agent.c:1317
void meta_agent_decrease_count(meta_agent_t *ma)
Definition: agent.c:1399
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
Definition: agent.c:1328
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
Definition: agent.c:1380
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
Definition: agent.c:1145
#define TEST_NULV(a)
Test if paramater is NULL.
Definition: agent.c:47
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
Definition: agent.c:1368
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
Definition: agent.c:551
#define SELECT_STRING(passed)
Definition: agent.c:109
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
Definition: agent.c:979
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
Definition: agent.c:656
void meta_agent_destroy(meta_agent_t *ma)
Definition: agent.c:811
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Definition: agent.c:1220
Header file with agent related operations.
#define MAX_CMD
the size of the agent's command buffer (arbitrary)
Definition: agent.h:26
#define SAG_NOKILL
This agent should not be killed when updating the agent.
Definition: agent.h:33
#define AGENT_STATUS_TYPES(apply)
Definition: agent.h:50
#define MAX_NAME
the size of the agent's name buffer (arbitrary)
Definition: agent.h:27
Event handling operations.
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
Definition: host.c:113
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
Definition: host.c:102
void job_fail_agent(job_t *job, void *agent)
Definition: job.c:493
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:406
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
Definition: job.c:164
void job_update(scheduler_t *scheduler, job_t *job)
Definition: job.c:531
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
Definition: job.c:570
char * job_next(job_t *job)
Definition: job.c:610
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
Definition: job.c:449
log_t * job_log(job_t *job)
Definition: job.c:637
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
Definition: job.c:434
void job_finish_agent(job_t *job, void *agent)
Definition: job.c:478
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
Definition: libfossagent.c:295
int jobId
The id of the job.
char buffer[2048]
The last thing received from the scheduler.
log_t * main_log
Definition: logging.c:33
Log related operations.
#define ERROR(...)
Definition: logging.h:79
#define THREAD_FATAL(file,...)
Definition: logging.h:71
start($application)
start the application Assumes application is restartable via /etc/init.d/<script>....
Definition: pkgConfig.php:1214
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:841
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
Definition: database.c:992
Header file for the scheduler.
#define AGENT_CONF
Agent conf location.
Definition: scheduler.h:123
#define AGENT_BINARY
Format to get agent binary.
Definition: scheduler.h:122
agent_t * agent
Reference to current agent state.
Definition: agent.c:634
scheduler_t * scheduler
Reference to current scheduler state.
Definition: agent.c:633
Definition: agent.h:100
time_t check_in
the time that the agent last generated anything
Definition: agent.h:108
int to_child
file identifier to print to the child
Definition: agent.h:114
agent_status status
the state of execution the agent is currently in
Definition: agent.h:106
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
Definition: agent.h:102
int from_child
file identifier to read from child
Definition: agent.h:115
gchar * data
the data that has been sent to the agent for analysis
Definition: agent.h:122
FILE * write
FILE* that abstracts the use of the to_child socket.
Definition: agent.h:118
gboolean alive
flag to tell the scheduler if the agent is still alive
Definition: agent.h:125
job_t * owner
the job that this agent is assigned to
Definition: agent.h:121
FILE * read
FILE* that abstracts the use of the from_child socket.
Definition: agent.h:117
uint32_t special
any special flags that the agent has set
Definition: agent.h:127
int to_parent
file identifier to print to the parent (child stdout)
Definition: agent.h:116
uint8_t n_updates
keeps track of the number of times the agent has updated
Definition: agent.h:109
uint64_t total_analyzed
the total number that this agent has analyzed
Definition: agent.h:124
host_t * host
the host that this agent will start on
Definition: agent.h:103
GThread * thread
the thread that communicates with this agent
Definition: agent.h:107
int from_parent
file identifier to read from the parent (child stdin)
Definition: agent.h:113
uint8_t return_code
what was returned by the agent when it disconnected
Definition: agent.h:126
pid_t pid
the pid of the process this agent is running in
Definition: agent.h:110
gboolean updated
boolean flag to indicate if the scheduler has updated the data
Definition: agent.h:123
Definition: host.h:26
char * agent_dir
The location on the host machine where the executables are.
Definition: host.h:29
char * address
The address of the host, used by ssh when starting a new agent.
Definition: host.h:28
char * name
The name of the host, used to store host internally to scheduler.
Definition: host.h:27
The job structure.
Definition: job.h:51
int32_t id
The identifier for this job.
Definition: job.h:73
job_status status
The current status for the job.
Definition: job.h:61
int32_t group_id
The id of the group that created the job.
Definition: job.h:75
gchar * message
Message that will be sent with job notification email.
Definition: job.h:69
gchar * jq_cmd_args
Command line arguments for this job.
Definition: job.h:63
int32_t user_id
The id of the user that created the job.
Definition: job.h:74
char * agent_type
The type of agent used to analyze the data.
Definition: job.h:53
int32_t priority
Importance of the job, maps directory to unix priority.
Definition: job.h:70
int32_t parent_id
The identifier for the parent of this job (its queue id)
Definition: job.h:72
Store the results of a regex match.
Definition: scanners.hpp:28
int valid
flag indicating if the meta_agent is valid
Definition: agent.h:89
char raw_cmd[MAX_CMD+1]
the raw command that will start the agent, used for ssh
Definition: agent.h:84
int max_run
the maximum number that can run at once -1 if no limit
Definition: agent.h:85
char name[256]
the name associated with this agent i.e. nomos, copyright...
Definition: agent.h:83
int run_count
the count of agents in running state
Definition: agent.h:90
char * version
the version of the agent that is running on all hosts
Definition: agent.h:88
char * version_source
the machine that reported the version information
Definition: agent.h:87
int special
any special condition associated with the agent
Definition: agent.h:86
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:172
gchar * sysconfigdir
The system directory that contain fossology.conf.
Definition: scheduler.h:150
GList * host_queue
Round-robin queue for choosing which host use next.
Definition: scheduler.h:161
GRegex * parse_agent_msg
Parses messages coming from the agents.
Definition: scheduler.h:186
GTree * meta_agents
List of all meta agents available to the scheduler.
Definition: scheduler.h:156
GTree * agents
List of any currently running agents.
Definition: scheduler.h:157
GSequence * job_queue
heap of jobs that still need to be started
Definition: scheduler.h:173