FOSSology  4.4.0
Open Source License Compliance by Open Source Software
agent.c
Go to the documentation of this file.
1 /*
2  SPDX-FileCopyrightText: © 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3  SPDX-FileCopyrightText: © 2015 Siemens AG
4 
5  SPDX-License-Identifier: GPL-2.0-only
6 */
7 
15 /* local includes */
16 #include <agent.h>
17 #include <database.h>
18 #include <event.h>
19 #include <host.h>
20 #include <job.h>
21 #include <logging.h>
22 #include <scheduler.h>
23 
24 /* library includes */
25 #include <limits.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <time.h>
30 
31 /* unix library includes */
32 #include <fcntl.h>
33 #include <limits.h>
34 #include <sys/types.h>
35 #include <sys/wait.h>
36 #include <unistd.h>
37 
38 /* other library includes */
39 #include <glib.h>
40 
47 #define TEST_NULV(a) if(!a) { \
48  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; }
57 #define TEST_NULL(a, ret) if(!a) { \
58  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; }
59 
61 #define AGENT_CREDENTIAL \
62  log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, agent->type->name, \
63  agent->pid, agent->host->name)
64 
66 #define AGENT_LOG_CREDENTIAL \
67  con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \
68  agent->owner->id, agent->type->name, agent->pid, agent->host->name)
69 
71 #define AGENT_ERROR(...) do { \
72  log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \
73  AGENT_CREDENTIAL; \
74  log_printf(__VA_ARGS__); \
75  log_printf("\n"); } while(0)
76 
78 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \
79  log_printf("NOTE: "); \
80  AGENT_CREDENTIAL; \
81  log_printf(__VA_ARGS__); \
82  log_printf("\n"); } while(0)
83 
85 #define AGENT_WARNING(...) if(TEST_WARNING) do { \
86  log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \
87  AGENT_CREDENTIAL; \
88  log_printf(__VA_ARGS__); \
89  log_printf("\n"); } while(0)
90 
92 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \
93  AGENT_CREDENTIAL; \
94  log_printf(__VA_ARGS__); } while(0)
95 
97 #define AGENT_CONCURRENT_PRINT(...) do { \
98  AGENT_LOG_CREDENTIAL; \
99  con_printf(job_log(agent->owner), __VA_ARGS__); } while(0)
100 
101 /* ************************************************************************** */
102 /* **** Data Types ********************************************************** */
103 /* ************************************************************************** */
104 
109 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed),
110 const char* agent_status_strings[] =
112 #undef SELECT_STRING
113 
114 /* ************************************************************************** */
115 /* **** Local Functions ***************************************************** */
116 /* ************************************************************************** */
117 
129 static int agent_close_fd(int* pid_ptr, agent_t* agent, agent_t* excepted)
130 {
131  TEST_NULL(agent, 0);
132  if (agent != excepted)
133  {
134  close(agent->from_child);
135  close(agent->to_child);
136  fclose(agent->read);
137  fclose(agent->write);
138  }
139  return 0;
140 }
141 
152 static int update(int* pid_ptr, agent_t* agent, gpointer unused)
153 {
154  TEST_NULL(agent, 0);
155  int nokill = is_agent_special(agent, SAG_NOKILL) || is_meta_special(agent->type, SAG_NOKILL);
156 
157  if (agent->status == AG_SPAWNED || agent->status == AG_RUNNING || agent->status == AG_PAUSED)
158  {
159  /* check last checkin time */
160  if (time(NULL) - agent->check_in > CONF_agent_death_timer && !(agent->owner->status == JB_PAUSED) && !nokill)
161  {
162  AGENT_CONCURRENT_PRINT("no heartbeat for %d seconds\n", (time(NULL) - agent->check_in));
163  agent_kill(agent);
164  return 0;
165  }
166 
167  /* check items processed */
168  if (agent->status != AG_PAUSED && !agent->alive)
169  {
170  agent->n_updates++;
171  }
172  else
173  {
174  agent->n_updates = 0;
175  }
176  if (agent->n_updates > CONF_agent_update_number && !nokill)
177  {
178  AGENT_CONCURRENT_PRINT("agent has not set the alive flag in at least 10 minutes, killing\n");
179  agent_kill(agent);
180  return 0;
181  }
182 
183  AGENT_SEQUENTIAL_PRINT("agent updated correctly, processed %d items: %d\n", agent->total_analyzed,
184  agent->n_updates);
185  agent->alive = 0;
186  }
187 
188  return 0;
189 }
190 
202 static int agent_kill_traverse(int* pid, agent_t* agent, gpointer unused)
203 {
204  agent_kill(agent);
205  return FALSE;
206 }
207 
217 static int agent_list(char* name, meta_agent_t* ma, GOutputStream* ostr)
218 {
219  if (ma->valid)
220  {
221  g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
222  g_output_stream_write(ostr, " ", 1, NULL, NULL);
223  }
224  return FALSE;
225 }
226 
238 static int agent_test(const gchar* name, meta_agent_t* ma, scheduler_t* scheduler)
239 {
240  static int32_t id_gen = -1;
241 
242  GList* iter;
243  host_t* host;
244  char *jq_cmd_args = 0;
245 
246  for (iter = scheduler->host_queue; iter != NULL; iter = iter->next)
247  {
248  host = (host_t*) iter->data;
249  V_AGENT("META_AGENT[%s] testing on HOST[%s]\n", ma->name, host->name);
250  job_t* job = job_init(scheduler->job_list, scheduler->job_queue, ma->name, host->name, id_gen--, 0, 0, 0, 0, jq_cmd_args);
251  agent_init(scheduler, host, job);
252  }
253 
254  return 0;
255 }
256 
264 static void agent_listen(scheduler_t* scheduler, agent_t* agent)
265 {
266  /* static locals */
267 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
268  static GMutex version_lock;
269 #else
270  static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
271 #endif
272 
273  /* locals */
274  char buffer[1024]; // buffer to store c strings read from agent
275  GMatchInfo* match; // regex match information
276  char* arg; // used during regex retrievals
277  int relevant; // used during special retrievals
278 
279  TEST_NULV(agent);
280 
291  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
292  {
293  AGENT_CONCURRENT_PRINT("pipe from child closed: %s\n", strerror(errno));
294  g_thread_exit(NULL);
295  }
296 
297  /* check to make sure "VERSION" was sent */
298  buffer[strlen(buffer) - 1] = '\0';
299  if (strncmp(buffer, "VERSION: ", 9) != 0)
300  {
301  if (strncmp(buffer, "@@@1", 4) == 0)
302  {
303  THREAD_FATAL(job_log(agent->owner), "agent crashed before sending version information");
304  }
305  else
306  {
307  agent->type->valid = 0;
308  agent_fail_event(scheduler, agent);
309  agent_kill(agent);
310  con_printf(main_log, "ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
311  agent->host->name, agent->type->name);
312  AGENT_CONCURRENT_PRINT("agent didn't send version information: \"%s\"\n", buffer);
313  return;
314  }
315  }
316 
317  /* check that the VERSION information is correct */
318 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
319  g_mutex_lock(&version_lock);
320 #else
321  g_static_mutex_lock(&version_lock);
322 #endif
323  strcpy(buffer, &buffer[9]);
324  if (agent->type->version == NULL && agent->type->valid)
325  {
326  agent->type->version_source = agent->host->name;
327  agent->type->version = g_strdup(buffer);
328  if (TVERB_AGENT)
329  con_printf(main_log, "META_AGENT[%s.%s] version is: \"%s\"\n", agent->host->name, agent->type->name,
330  agent->type->version);
331  }
332  else if (strcmp(agent->type->version, buffer) != 0)
333  {
334  con_printf(job_log(agent->owner), "ERROR %s.%d: META_DATA[%s] invalid agent spawn check\n", __FILE__, __LINE__,
335  agent->type->name);
336  con_printf(job_log(agent->owner), "ERROR: versions don't match: %s(%s) != received: %s(%s)\n",
337  agent->type->version_source, agent->type->version, agent->host->name, buffer);
338  agent->type->valid = 0;
339  agent_kill(agent);
340 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
341  g_mutex_unlock(&version_lock);
342 #else
343  g_static_mutex_unlock(&version_lock);
344 #endif
345  return;
346  }
347 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
348  g_mutex_unlock(&version_lock);
349 #else
350  g_static_mutex_unlock(&version_lock);
351 #endif
352 
362  while (1)
363  {
364  /* get message from agent */
365  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
366  g_thread_exit(NULL);
367 
368  buffer[strlen(buffer) - 1] = '\0';
369 
370  if (strlen(buffer) == 0)
371  continue;
372 
373  if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(buffer, "SPECIAL", 7) != 0))
374  AGENT_CONCURRENT_PRINT("received: \"%s\"\n", buffer);
375 
383  if (strncmp(buffer, "BYE", 3) == 0)
384  {
385  if ((agent->return_code = atoi(&(buffer[4]))) != 0)
386  {
387  AGENT_CONCURRENT_PRINT("agent failed with error code %d\n", agent->return_code);
388  event_signal(agent_fail_event, agent);
389  }
390  break;
391  }
392 
399  if (strncmp(buffer, "@@@1", 4) == 0)
400  break;
401 
408  if (strncmp(buffer, "@@@0", 4) == 0 && agent->updated)
409  {
410  aprintf(agent, "%s\n", agent->data);
411  aprintf(agent, "END\n");
412  fflush(agent->write);
413  agent->updated = 0;
414  continue;
415  }
416 
417  /* agent just checked in */
418  agent->check_in = time(NULL);
419 
428  if (strncmp(buffer, "OK", 2) == 0)
429  {
430  if (agent->status != AG_PAUSED)
431  event_signal(agent_ready_event, agent);
432  }
433 
441  else if (strncmp(buffer, "HEART", 5) == 0)
442  {
443  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
444 
445  arg = g_match_info_fetch(match, 3);
446  agent->total_analyzed = atoi(arg);
447  g_free(arg);
448 
449  arg = g_match_info_fetch(match, 6);
450  agent->alive = (arg[0] == '1' || agent->alive);
451  g_free(arg);
452 
453  g_match_info_free(match);
454  match = NULL;
455 
457  }
458 
465  else if (strncmp(buffer, "EMAIL", 5) == 0)
466  {
467  agent->owner->message = g_strdup(buffer + 6);
468  }
469 
477  else if (strncmp(buffer, "SPECIAL", 7) == 0)
478  {
479  relevant = INT_MAX;
480 
481  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
482 
483  arg = g_match_info_fetch(match, 3);
484  relevant &= atoi(arg);
485  g_free(arg);
486 
487  arg = g_match_info_fetch(match, 6);
488  if (atoi(arg))
489  {
490  if (agent->special & relevant)
491  relevant = 0;
492  }
493  else
494  {
495  if (!(agent->special & relevant))
496  relevant = 0;
497  }
498  g_free(arg);
499 
500  g_match_info_free(match);
501 
502  agent->special ^= relevant;
503  }
504 
510  else if (strncmp(buffer, "GETSPECIAL", 10) == 0)
511  {
512  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
513 
514  arg = g_match_info_fetch(match, 3);
515  relevant = atoi(arg);
516  g_free(arg);
517 
518  if (agent->special & relevant)
519  aprintf(agent, "VALUE: 1\n");
520  else
521  aprintf(agent, "VALUE: 0\n");
522 
523  g_match_info_free(match);
524  }
525 
531  else if (!(TVERB_AGENT))
532  AGENT_CONCURRENT_PRINT("\"%s\"\n", buffer);
533  }
534 
535  if (TVERB_AGENT)
536  AGENT_CONCURRENT_PRINT("communication thread closing\n");
537 }
538 
551 static void shell_parse(char* confdir, int user_id, int group_id, char* input, char *jq_cmd_args, int jobId, int* argc, char*** argv)
552 {
553  char* begin;
554  char* curr;
555  int idx = 0;
556 #define MAX_CMD_ARGS 30
557 
558  *argv = g_new0(char*, MAX_CMD_ARGS);
559  begin = NULL;
560 
561  for (curr = input; *curr; curr++)
562  {
563  if (*curr == ' ')
564  {
565  if (begin == NULL)
566  continue;
567 
568  if (*begin == '"')
569  continue;
570 
571  *curr = '\0';
572  (*argv)[idx++] = g_strdup(begin);
573  begin = NULL;
574  }
575  else if (begin == NULL)
576  {
577  begin = curr;
578  }
579  else if (*begin == '"' && *curr == '"')
580  {
581  *begin = '\0';
582  *curr = '\0';
583 
584  (*argv)[idx++] = g_strdup(begin + 1);
585  begin = NULL;
586  }
587  if (idx > MAX_CMD_ARGS - 7)
588  break;
589  }
590 
591  (*argv)[idx++] = g_strdup_printf("--jobId=%d", jobId);
592  (*argv)[idx++] = g_strdup_printf("--config=%s", confdir);
593  (*argv)[idx++] = g_strdup_printf("--userID=%d", user_id);
594  (*argv)[idx++] = g_strdup_printf("--groupID=%d", group_id);
595  (*argv)[idx++] = "--scheduler_start";
596  if (jq_cmd_args)
597  (*argv)[idx++] = jq_cmd_args;
598  (*argc) = idx;
599 }
600 
604 typedef struct
605 {
609 
629 static void* agent_spawn(agent_spawn_args* pass)
630 {
631  /* locals */
632  scheduler_t* scheduler = pass->scheduler;
633  agent_t* agent = pass->agent;
634  gchar* tmp; // pointer to temporary string
635  gchar** args; // the arguments that will be passed to the child
636  int argc; // the number of arguments parsed
637  int len;
638  char buffer[2048]; // character buffer
639 
640  /* spawn the new process */
641  while ((agent->pid = fork()) < 0)
642  sleep(rand() % CONF_fork_backoff_time);
643 
644  /* we are in the child */
645  if (agent->pid == 0)
646  {
647  /* set the child's stdin and stdout to use the pipes */
648  dup2(agent->from_parent, fileno(stdin));
649  dup2(agent->to_parent, fileno(stdout));
650  dup2(agent->to_parent, fileno(stderr));
651 
652  /* close all the unnecessary file descriptors */
653  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_close_fd, agent);
654  close(agent->from_child);
655  close(agent->to_child);
656 
657  /* set the priority of the process to the job's priority */
658  if (nice(agent->owner->priority) == -1)
659  ERROR("unable to correctly set priority of agent process %d", agent->pid);
660 
661  /* if host is null, the agent will run locally to */
662  /* run the agent locally, use the commands that */
663  /* were parsed when the meta_agent was created */
664  if (strcmp(agent->host->address, LOCAL_HOST) == 0)
665  {
666  shell_parse(scheduler->sysconfigdir, agent->owner->user_id, agent->owner->group_id,
667  agent->type->raw_cmd, agent->owner->jq_cmd_args,
668  agent->owner->parent_id, &argc, &args);
669 
670  tmp = args[0];
671  args[0] = g_strdup_printf(AGENT_BINARY, scheduler->sysconfigdir,
672  AGENT_CONF, agent->type->name, tmp);
673 
674  strcpy(buffer, args[0]);
675  *strrchr(buffer, '/') = '\0';
676  if (chdir(buffer) != 0)
677  {
678  ERROR("unable to change working directory: %s\n", strerror(errno));
679  }
680 
681  execv(args[0], args);
682  }
683  /* otherwise the agent will be started using ssh */
684  /* if the agent is started using ssh we don't need */
685  /* to fully parse the arguments, just pass the run */
686  /* command as the last argument to the ssh command */
687  else
688  {
689  args = g_new0(char*, 5);
690  len = snprintf(buffer, sizeof(buffer), AGENT_BINARY " --userID=%d --groupID=%d --scheduler_start --jobId=%d",
691  agent->host->agent_dir, AGENT_CONF, agent->type->name, agent->type->raw_cmd,
692  agent->owner->user_id, agent->owner->group_id, agent->owner->parent_id);
693 
694  if (len>=sizeof(buffer)) {
695  *(buffer + sizeof(buffer) - 1) = '\0';
696  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
697  __FILE__, __LINE__, agent->owner->id, agent->owner->agent_type, buffer);
698 
699  exit(5);
700  }
701 
702  args[0] = "/usr/bin/ssh";
703  args[1] = agent->host->address;
704  args[2] = buffer;
705  args[3] = agent->owner->jq_cmd_args;
706  args[4] = NULL;
707  execv(args[0], args);
708  }
709 
710  /* If we reach here, the exec call has failed */
711  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->owner->id,
712  agent->owner->agent_type, getpid(), strerror(errno));
713  }
714  /* we are in the parent */
715  else
716  {
717  event_signal(agent_create_event, agent);
718  agent_listen(scheduler, agent);
719  }
720 
721  return NULL;
722 }
723 
724 /* ************************************************************************** */
725 /* **** Constructor Destructor ********************************************** */
726 /* ************************************************************************** */
727 
744 meta_agent_t* meta_agent_init(char* name, char* cmd, int max, int spc)
745 {
746  /* locals */
747  meta_agent_t* ma;
748 
749  /* test inputs */
750  if (!name || !cmd)
751  {
752  ERROR("invalid arguments passed to meta_agent_init()");
753  return NULL;
754  }
755 
756  /* confirm valid inputs */
757  if (strlen(name) > MAX_NAME || strlen(cmd) > MAX_CMD)
758  {
759  log_printf("ERROR failed to load %s meta agent", name);
760  return NULL;
761  }
762 
763  /* inputs are valid, create the meta_agent */
764  ma = g_new0(meta_agent_t, 1);
765 
766  strcpy(ma->name, name);
767  strcpy(ma->raw_cmd, cmd);
768  strcat(ma->raw_cmd, " --scheduler_start");
769  ma->max_run = max;
770  ma->run_count = 0;
771  ma->special = spc;
772  ma->version = NULL;
773  ma->valid = TRUE;
774 
775  return ma;
776 }
777 
785 {
786  TEST_NULV(ma);
787  g_free(ma->version);
788  g_free(ma);
789 }
790 
802 agent_t* agent_init(scheduler_t* scheduler, host_t* host, job_t* job)
803 {
804  /* local variables */
805  agent_t* agent;
806  int child_to_parent[2];
807  int parent_to_child[2];
808  agent_spawn_args* pass;
809 
810  /* check job input */
811  if (!job)
812  {
813  log_printf("ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
814  log_printf("ERROR: no other information available\n");
815  return NULL;
816  }
817 
818  /* check that the agent type exists */
819  if (g_tree_lookup(scheduler->meta_agents, job->agent_type) == NULL)
820  {
821  log_printf("ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
822  job->id, job->agent_type);
823  job->message = NULL;
824  job_fail_event(scheduler, job);
825  job_remove_agent(job, scheduler->job_list, NULL);
826  return NULL;
827  }
828 
829  /* allocate memory and do trivial assignments */
830  agent = g_new(agent_t, 1);
831  agent->type = g_tree_lookup(scheduler->meta_agents, job->agent_type);
832  agent->status = AG_CREATED;
833 
834  /* make sure that there is a metaagent for the job */
835  if (agent->type == NULL)
836  {
837  ERROR("meta agent %s does not exist", job->agent_type);
838  return NULL;
839  }
840 
841  /* check if the agent is valid */
842  if (!agent->type->valid)
843  {
844  ERROR("agent %s has been invalidated by version information", job->agent_type);
845  return NULL;
846  }
847 
848  /* create the pipes between the child and the parent */
849  if (pipe(parent_to_child) != 0)
850  {
851  ERROR("JOB[%d.%s] failed to create parent to child pipe", job->id, job->agent_type);
852  g_free(agent);
853  return NULL;
854  }
855  if (pipe(child_to_parent) != 0)
856  {
857  ERROR("JOB[%d.%s] failed to create child to parent pipe", job->id, job->agent_type);
858  g_free(agent);
859  return NULL;
860  }
861 
862  /* set file identifiers to correctly talk to children */
863  agent->from_parent = parent_to_child[0];
864  agent->to_child = parent_to_child[1];
865  agent->from_child = child_to_parent[0];
866  agent->to_parent = child_to_parent[1];
867 
868  /* initialize other info */
869  agent->host = host;
870  agent->owner = job;
871  agent->updated = 0;
872  agent->n_updates = 0;
873  agent->data = NULL;
874  agent->return_code = -1;
875  agent->total_analyzed = 0;
876  agent->special = 0;
877 
878  /* open the relevant file pointers */
879  if ((agent->read = fdopen(agent->from_child, "r")) == NULL)
880  {
881  ERROR("JOB[%d.%s] failed to initialize read file", job->id, job->agent_type);
882  g_free(agent);
883  return NULL;
884  }
885  if ((agent->write = fdopen(agent->to_child, "w")) == NULL)
886  {
887  ERROR("JOB[%d.%s] failed to initialize write file", job->id, job->agent_type);
888  g_free(agent);
889  return NULL;
890  }
891 
892  /* increase the load on the host and count of running agents */
893  if (agent->owner->id > 0)
894  {
895  host_increase_load(agent->host);
897  }
898 
899  /* spawn the listen thread */
900  pass = g_new0(agent_spawn_args, 1);
901  pass->scheduler = scheduler;
902  pass->agent = agent;
903 
904 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
905  agent->thread = g_thread_new(agent->type->name, (GThreadFunc) agent_spawn, pass);
906 #else
907  agent->thread = g_thread_create((GThreadFunc)agent_spawn, pass, 1, NULL);
908 #endif
909 
910  return agent;
911 }
912 
924 void agent_destroy(agent_t* agent)
925 {
926  TEST_NULV(agent);
927 
928  /* close all of the files still open for this agent */
929  close(agent->from_child);
930  close(agent->to_child);
931  close(agent->from_parent);
932  close(agent->to_parent);
933  fclose(agent->write);
934  fclose(agent->read);
935 
936  /* release the child process */
937  g_free(agent);
938 }
939 
940 /* ************************************************************************** */
941 /* **** Events ************************************************************** */
942 /* ************************************************************************** */
943 
952 void agent_death_event(scheduler_t* scheduler, pid_t* pid)
953 {
954  agent_t* agent;
955  int status = pid[1];
956 
957  if ((agent = g_tree_lookup(scheduler->agents, &pid[0])) == NULL)
958  {
959  ERROR("invalid agent death event: pid[%d]", pid[0]);
960  return;
961  }
962 
963  if (agent->owner->id >= 0)
964  event_signal(database_update_event, NULL);
965 
966  if (write(agent->to_parent, "@@@1\n", 5) != 5)
967  AGENT_SEQUENTIAL_PRINT("write to agent unsuccessful: %s\n", strerror(errno));
968  g_thread_join(agent->thread);
969 
970  if (agent->return_code != 0)
971  {
972  if (WIFEXITED(status))
973  {
974  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", (status >> 8));
975  }
976  else if (WIFSIGNALED(status))
977  {
978  AGENT_CONCURRENT_PRINT("agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
979  if (WCOREDUMP(status))
980  AGENT_CONCURRENT_PRINT("agent produced core dump\n");
981  }
982  else
983  {
984  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", agent->return_code);
985  }
986  AGENT_WARNING("agent closed unexpectedly, agent status was %s", agent_status_strings[agent->status]);
987  agent_fail_event(scheduler, agent);
988  }
989 
990  if (agent->status != AG_PAUSED && agent->status != AG_FAILED)
991  agent_transition(agent, AG_PAUSED);
992 
993  job_update(scheduler, agent->owner);
994  if (agent->status == AG_FAILED && agent->owner->id < 0)
995  {
996  log_printf("ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->host->name,
997  agent->type->name);
998  agent->type->valid = 0;
999  }
1000 
1001  if (agent->owner->id < 0 && !agent->type->valid)
1002  AGENT_SEQUENTIAL_PRINT("agent failed startup test, removing from meta agents\n");
1003 
1004  AGENT_SEQUENTIAL_PRINT("successfully remove from the system\n");
1005  job_remove_agent(agent->owner, scheduler->job_list, agent);
1006  g_tree_remove(scheduler->agents, &agent->pid);
1007  g_free(pid);
1008 }
1009 
1021 void agent_create_event(scheduler_t* scheduler, agent_t* agent)
1022 {
1023  TEST_NULV(agent);
1024 
1025  AGENT_SEQUENTIAL_PRINT("agent successfully spawned\n");
1026  g_tree_insert(scheduler->agents, &agent->pid, agent);
1027  agent_transition(agent, AG_SPAWNED);
1028  job_add_agent(agent->owner, agent);
1029 }
1030 
1042 void agent_ready_event(scheduler_t* scheduler, agent_t* agent)
1043 {
1044  int ret;
1045 
1046  TEST_NULV(agent);
1047  if (agent->status == AG_SPAWNED)
1048  {
1049  agent_transition(agent, AG_RUNNING);
1050  AGENT_SEQUENTIAL_PRINT("agent successfully created\n");
1051  }
1052 
1053  if ((ret = job_is_open(scheduler, agent->owner)) == 0)
1054  {
1055  agent_transition(agent, AG_PAUSED);
1056  job_finish_agent(agent->owner, agent);
1057  job_update(scheduler, agent->owner);
1058  return;
1059  }
1060  else if (ret < 0)
1061  {
1062  agent_transition(agent, AG_FAILED);
1063  return;
1064  }
1065  else
1066  {
1067  agent->data = job_next(agent->owner);
1068  agent->updated = 1;
1069  }
1070 
1071  if (write(agent->to_parent, "@@@0\n", 5) != 5)
1072  {
1073  AGENT_ERROR("failed sending new data to agent");
1074  agent_kill(agent);
1075  }
1076 }
1077 
1087 void agent_update_event(scheduler_t* scheduler, void* unused)
1088 {
1089  g_tree_foreach(scheduler->agents, (GTraverseFunc) update, NULL);
1090 }
1091 
1102 void agent_fail_event(scheduler_t* scheduler, agent_t* agent)
1103 {
1104  TEST_NULV(agent);
1105  agent_transition(agent, AG_FAILED);
1106  job_fail_agent(agent->owner, agent);
1107  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1108  AGENT_ERROR("Failed to kill agent thread cleanly");
1109 }
1110 
1118 void list_agents_event(scheduler_t* scheduler, GOutputStream* ostr)
1119 {
1120  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_list, ostr);
1121  g_output_stream_write(ostr, "\nend\n", 5, NULL, NULL);
1122 }
1123 
1124 /* ************************************************************************** */
1125 /* **** Modifier Functions ************************************************** */
1126 /* ************************************************************************** */
1127 
1136 void agent_transition(agent_t* agent, agent_status new_status)
1137 {
1138  AGENT_SEQUENTIAL_PRINT("agent status change: %s -> %s\n", agent_status_strings[agent->status],
1139  agent_status_strings[new_status]);
1140 
1141  if (agent->owner->id > 0)
1142  {
1143  if (agent->status == AG_PAUSED)
1144  {
1145  host_increase_load(agent->host);
1147  }
1148  if (new_status == AG_PAUSED)
1149  {
1150  host_decrease_load(agent->host);
1152  }
1153  }
1154 
1155  agent->status = new_status;
1156 }
1157 
1164 void agent_pause(agent_t* agent)
1165 {
1166  kill(agent->pid, SIGSTOP);
1167  agent_transition(agent, AG_PAUSED);
1168 }
1169 
1178 {
1179  kill(agent->pid, SIGCONT);
1180  agent_transition(agent, AG_RUNNING);
1181 }
1182 
1193 void agent_print_status(agent_t* agent, GOutputStream* ostr)
1194 {
1195  gchar* status_str;
1196  char time_buf[64];
1197  struct tm* time_info;
1198 
1199  TEST_NULV(agent);
1200  TEST_NULV(ostr);
1201 
1202  strcpy(time_buf, "(none)");
1203  time_info = localtime(&agent->check_in);
1204  if (time_info)
1205  strftime(time_buf, sizeof(time_buf), "%F %T", localtime(&agent->check_in));
1206  status_str = g_strdup_printf("agent:%d host:%s type:%s status:%s time:%s\n", agent->pid, agent->host->name,
1207  agent->type->name, agent_status_strings[agent->status], time_buf);
1208 
1209  AGENT_SEQUENTIAL_PRINT("AGENT_STATUS: %s", status_str);
1210  g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1211  g_free(status_str);
1212  return;
1213 }
1214 
1223 void agent_kill(agent_t* agent)
1224 {
1225  AGENT_SEQUENTIAL_PRINT("KILL: sending SIGKILL to pid %d\n", agent->pid);
1227  kill(agent->pid, SIGKILL);
1228 }
1229 
1238 int aprintf(agent_t* agent, const char* fmt, ...)
1239 {
1240  va_list args;
1241  int rc;
1242  char* tmp;
1243 
1244  va_start(args, fmt);
1245  if (TVERB_AGENT)
1246  {
1247  tmp = g_strdup_vprintf(fmt, args);
1248  tmp[strlen(tmp) - 1] = '\0';
1249  AGENT_CONCURRENT_PRINT("sent to agent \"%s\"\n", tmp);
1250  rc = fprintf(agent->write, "%s\n", tmp);
1251  g_free(tmp);
1252  }
1253  else
1254  {
1255  rc = vfprintf(agent->write, fmt, args);
1256  }
1257  va_end(args);
1258  fflush(agent->write);
1259 
1260  return rc;
1261 }
1262 
1274 ssize_t agent_write(agent_t* agent, const void* buf, int count)
1275 {
1276  return write(agent->to_parent, buf, count);
1277 }
1278 
1279 /* ************************************************************************** */
1280 /* **** static functions and meta agents ************************************ */
1281 /* ************************************************************************** */
1282 
1290 void test_agents(scheduler_t* scheduler)
1291 {
1292  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_test, scheduler);
1293 }
1294 
1301 void kill_agents(scheduler_t* scheduler)
1302 {
1303  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_kill_traverse, NULL);
1304 }
1305 
1316 int add_meta_agent(GTree* meta_agents, char* name, char* cmd, int max, int spc)
1317 {
1318  meta_agent_t* ma;
1319 
1320  if (name == NULL)
1321  return 0;
1322 
1323  if (g_tree_lookup(meta_agents, name) == NULL)
1324  {
1325  if ((ma = meta_agent_init(name, cmd, max, spc)) == NULL)
1326  return 0;
1327  g_tree_insert(meta_agents, ma->name, ma);
1328  return 1;
1329  }
1330 
1331  return 0;
1332 }
1333 
1341 int is_meta_special(meta_agent_t* ma, int special_type)
1342 {
1343  return (ma != NULL) && ((ma->special & special_type) != 0);
1344 }
1345 
1353 int is_agent_special(agent_t* agent, int special_type)
1354 {
1355  return (agent != NULL) && ((agent->special & special_type) != 0);
1356 }
1357 
1363 {
1364  ma->run_count++;
1365  V_AGENT("AGENT[%s] run increased to %d\n", ma->name, ma->run_count);
1366 }
1367 
1373 {
1374  ma->run_count--;
1375  V_AGENT("AGENT[%s] run decreased to %d\n", ma->name, ma->run_count);
1376 }
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces.
Definition: agent.c:217
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
Definition: agent.c:1042
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
Definition: agent.c:152
int aprintf(agent_t *agent, const char *fmt,...)
Definition: agent.c:1238
static int agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent's pipes.
Definition: agent.c:129
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
Definition: agent.c:802
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
Definition: agent.c:238
ssize_t agent_write(agent_t *agent, const void *buf, int count)
Definition: agent.c:1274
#define AGENT_SEQUENTIAL_PRINT(...)
Definition: agent.c:92
#define AGENT_CONCURRENT_PRINT(...)
Definition: agent.c:97
#define AGENT_ERROR(...)
Definition: agent.c:71
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
Definition: agent.c:202
void agent_update_event(scheduler_t *scheduler, void *unused)
Definition: agent.c:1087
void meta_agent_increase_count(meta_agent_t *ma)
Definition: agent.c:1362
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
Definition: agent.c:924
#define AGENT_WARNING(...)
Definition: agent.c:85
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
Definition: agent.c:1102
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
Definition: agent.c:744
void agent_kill(agent_t *agent)
Unclean kill of an agent.
Definition: agent.c:1223
void agent_pause(agent_t *agent)
Definition: agent.c:1164
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
Definition: agent.c:1316
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
Definition: agent.c:1021
void agent_transition(agent_t *agent, agent_status new_status)
Definition: agent.c:1136
#define TEST_NULL(a, ret)
Test if paramater is NULL.
Definition: agent.c:57
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
Definition: agent.c:264
void agent_unpause(agent_t *agent)
Definition: agent.c:1177
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
Definition: agent.c:1290
void meta_agent_decrease_count(meta_agent_t *ma)
Definition: agent.c:1372
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
Definition: agent.c:1301
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
Definition: agent.c:1353
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
Definition: agent.c:1118
#define TEST_NULV(a)
Test if paramater is NULL.
Definition: agent.c:47
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
Definition: agent.c:1341
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
Definition: agent.c:551
#define SELECT_STRING(passed)
Definition: agent.c:109
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
Definition: agent.c:952
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
Definition: agent.c:629
void meta_agent_destroy(meta_agent_t *ma)
Definition: agent.c:784
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Definition: agent.c:1193
Header file with agent related operations.
#define MAX_CMD
the size of the agent's command buffer (arbitrary)
Definition: agent.h:26
#define SAG_NOKILL
This agent should not be killed when updating the agent.
Definition: agent.h:33
#define AGENT_STATUS_TYPES(apply)
Definition: agent.h:50
#define MAX_NAME
the size of the agent's name buffer (arbitrary)
Definition: agent.h:27
Event handling operations.
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
Definition: host.c:113
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
Definition: host.c:102
void job_fail_agent(job_t *job, void *agent)
Definition: job.c:493
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:406
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
Definition: job.c:164
void job_update(scheduler_t *scheduler, job_t *job)
Definition: job.c:531
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
Definition: job.c:570
char * job_next(job_t *job)
Definition: job.c:610
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
Definition: job.c:449
log_t * job_log(job_t *job)
Definition: job.c:637
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
Definition: job.c:434
void job_finish_agent(job_t *job, void *agent)
Definition: job.c:478
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
Definition: libfossagent.c:295
int jobId
The id of the job.
char buffer[2048]
The last thing received from the scheduler.
log_t * main_log
Definition: logging.c:33
Log related operations.
#define ERROR(...)
Definition: logging.h:79
#define THREAD_FATAL(file,...)
Definition: logging.h:71
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:841
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
Definition: database.c:992
Header file for the scheduler.
#define AGENT_CONF
Agent conf location.
Definition: scheduler.h:123
#define AGENT_BINARY
Format to get agent binary.
Definition: scheduler.h:122
agent_t * agent
Reference to current agent state.
Definition: agent.c:607
scheduler_t * scheduler
Reference to current scheduler state.
Definition: agent.c:606
Definition: agent.h:100
time_t check_in
the time that the agent last generated anything
Definition: agent.h:108
int to_child
file identifier to print to the child
Definition: agent.h:114
agent_status status
the state of execution the agent is currently in
Definition: agent.h:106
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
Definition: agent.h:102
int from_child
file identifier to read from child
Definition: agent.h:115
gchar * data
the data that has been sent to the agent for analysis
Definition: agent.h:122
FILE * write
FILE* that abstracts the use of the to_child socket.
Definition: agent.h:118
gboolean alive
flag to tell the scheduler if the agent is still alive
Definition: agent.h:125
job_t * owner
the job that this agent is assigned to
Definition: agent.h:121
FILE * read
FILE* that abstracts the use of the from_child socket.
Definition: agent.h:117
uint32_t special
any special flags that the agent has set
Definition: agent.h:127
int to_parent
file identifier to print to the parent (child stdout)
Definition: agent.h:116
uint8_t n_updates
keeps track of the number of times the agent has updated
Definition: agent.h:109
uint64_t total_analyzed
the total number that this agent has analyzed
Definition: agent.h:124
host_t * host
the host that this agent will start on
Definition: agent.h:103
GThread * thread
the thread that communicates with this agent
Definition: agent.h:107
int from_parent
file identifier to read from the parent (child stdin)
Definition: agent.h:113
uint8_t return_code
what was returned by the agent when it disconnected
Definition: agent.h:126
pid_t pid
the pid of the process this agent is running in
Definition: agent.h:110
gboolean updated
boolean flag to indicate if the scheduler has updated the data
Definition: agent.h:123
Definition: host.h:26
char * agent_dir
The location on the host machine where the executables are.
Definition: host.h:29
char * address
The address of the host, used by ssh when starting a new agent.
Definition: host.h:28
char * name
The name of the host, used to store host internally to scheduler.
Definition: host.h:27
The job structure.
Definition: job.h:51
int32_t id
The identifier for this job.
Definition: job.h:73
job_status status
The current status for the job.
Definition: job.h:61
int32_t group_id
The id of the group that created the job.
Definition: job.h:75
gchar * message
Message that will be sent with job notification email.
Definition: job.h:69
gchar * jq_cmd_args
Command line arguments for this job.
Definition: job.h:63
int32_t user_id
The id of the user that created the job.
Definition: job.h:74
char * agent_type
The type of agent used to analyze the data.
Definition: job.h:53
int32_t priority
Importance of the job, maps directory to unix priority.
Definition: job.h:70
int32_t parent_id
The identifier for the parent of this job (its queue id)
Definition: job.h:72
Store the results of a regex match.
Definition: scanners.hpp:28
int valid
flag indicating if the meta_agent is valid
Definition: agent.h:89
char raw_cmd[MAX_CMD+1]
the raw command that will start the agent, used for ssh
Definition: agent.h:84
int max_run
the maximum number that can run at once -1 if no limit
Definition: agent.h:85
char name[256]
the name associated with this agent i.e. nomos, copyright...
Definition: agent.h:83
int run_count
the count of agents in running state
Definition: agent.h:90
char * version
the version of the agent that is running on all hosts
Definition: agent.h:88
char * version_source
the machine that reported the version information
Definition: agent.h:87
int special
any special condition associated with the agent
Definition: agent.h:86
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:172
gchar * sysconfigdir
The system directory that contain fossology.conf.
Definition: scheduler.h:150
GList * host_queue
Round-robin queue for choosing which host use next.
Definition: scheduler.h:161
GRegex * parse_agent_msg
Parses messages coming from the agents.
Definition: scheduler.h:186
GTree * meta_agents
List of all meta agents available to the scheduler.
Definition: scheduler.h:156
GTree * agents
List of any currently running agents.
Definition: scheduler.h:157
GSequence * job_queue
heap of jobs that still need to be started
Definition: scheduler.h:173