34 #include <sys/types.h>
47 #define TEST_NULV(a) if(!a) { \
48 errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; }
57 #define TEST_NULL(a, ret) if(!a) { \
58 errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; }
61 #define AGENT_CREDENTIAL \
62 log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, agent->type->name, \
63 agent->pid, agent->host->name)
66 #define AGENT_LOG_CREDENTIAL \
67 con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \
68 agent->owner->id, agent->type->name, agent->pid, agent->host->name)
71 #define AGENT_ERROR(...) do { \
72 log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \
74 log_printf(__VA_ARGS__); \
75 log_printf("\n"); } while(0)
78 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \
79 log_printf("NOTE: "); \
81 log_printf(__VA_ARGS__); \
82 log_printf("\n"); } while(0)
85 #define AGENT_WARNING(...) if(TEST_WARNING) do { \
86 log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \
88 log_printf(__VA_ARGS__); \
89 log_printf("\n"); } while(0)
92 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \
94 log_printf(__VA_ARGS__); } while(0)
97 #define AGENT_CONCURRENT_PRINT(...) do { \
98 AGENT_LOG_CREDENTIAL; \
99 con_printf(job_log(agent->owner), __VA_ARGS__); } while(0)
109 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed),
110 const char* agent_status_strings[] =
132 if (agent != excepted)
137 fclose(agent->
write);
157 if (agent->
status == AG_SPAWNED || agent->
status == AG_RUNNING || agent->
status == AG_PAUSED)
160 if (time(NULL) - agent->
check_in > CONF_agent_death_timer && !(agent->
owner->
status == JB_PAUSED) && !nokill)
176 if (agent->
n_updates > CONF_agent_update_number && !nokill)
221 g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
222 g_output_stream_write(ostr,
" ", 1, NULL, NULL);
240 static int32_t id_gen = -1;
244 char *jq_cmd_args = 0;
246 for (iter = scheduler->
host_queue; iter != NULL; iter = iter->next)
248 host = (
host_t*) iter->data;
249 V_AGENT(
"META_AGENT[%s] testing on HOST[%s]\n", ma->
name, host->
name);
267 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
268 static GMutex version_lock;
270 static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
299 if (strncmp(
buffer,
"VERSION: ", 9) != 0)
301 if (strncmp(
buffer,
"@@@1", 4) == 0)
310 con_printf(
main_log,
"ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
318 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
319 g_mutex_lock(&version_lock);
321 g_static_mutex_lock(&version_lock);
334 con_printf(
job_log(agent->
owner),
"ERROR %s.%d: META_DATA[%s] invalid agent spawn check\n", __FILE__, __LINE__,
336 con_printf(
job_log(agent->
owner),
"ERROR: versions don't match: %s(%s) != received: %s(%s)\n",
340 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
341 g_mutex_unlock(&version_lock);
343 g_static_mutex_unlock(&version_lock);
347 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
348 g_mutex_unlock(&version_lock);
350 g_static_mutex_unlock(&version_lock);
373 if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(
buffer,
"SPECIAL", 7) != 0))
383 if (strncmp(
buffer,
"BYE", 3) == 0)
399 if (strncmp(
buffer,
"@@@1", 4) == 0)
412 fflush(agent->
write);
428 if (strncmp(
buffer,
"OK", 2) == 0)
430 if (agent->
status != AG_PAUSED)
441 else if (strncmp(
buffer,
"HEART", 5) == 0)
445 arg = g_match_info_fetch(
match, 3);
449 arg = g_match_info_fetch(
match, 6);
450 agent->
alive = (arg[0] ==
'1' || agent->
alive);
453 g_match_info_free(
match);
465 else if (strncmp(
buffer,
"EMAIL", 5) == 0)
477 else if (strncmp(
buffer,
"SPECIAL", 7) == 0)
483 arg = g_match_info_fetch(
match, 3);
484 relevant &= atoi(arg);
487 arg = g_match_info_fetch(
match, 6);
495 if (!(agent->
special & relevant))
500 g_match_info_free(
match);
510 else if (strncmp(
buffer,
"GETSPECIAL", 10) == 0)
514 arg = g_match_info_fetch(
match, 3);
515 relevant = atoi(arg);
523 g_match_info_free(
match);
531 else if (!(TVERB_AGENT))
551 static void shell_parse(
char* confdir,
int user_id,
int group_id,
char* input,
char *jq_cmd_args,
int jobId,
int* argc,
char*** argv)
556 #define MAX_CMD_ARGS 30
558 *argv = g_new0(
char*, MAX_CMD_ARGS);
561 for (curr = input; *curr; curr++)
572 (*argv)[idx++] = g_strdup(begin);
575 else if (begin == NULL)
579 else if (*begin ==
'"' && *curr ==
'"')
584 (*argv)[idx++] = g_strdup(begin + 1);
587 if (idx > MAX_CMD_ARGS - 7)
591 (*argv)[idx++] = g_strdup_printf(
"--jobId=%d",
jobId);
592 (*argv)[idx++] = g_strdup_printf(
"--config=%s", confdir);
593 (*argv)[idx++] = g_strdup_printf(
"--userID=%d", user_id);
594 (*argv)[idx++] = g_strdup_printf(
"--groupID=%d", group_id);
595 (*argv)[idx++] =
"--scheduler_start";
597 (*argv)[idx++] = jq_cmd_args;
641 while ((agent->
pid = fork()) < 0)
642 sleep(rand() % CONF_fork_backoff_time);
659 ERROR(
"unable to correctly set priority of agent process %d", agent->
pid);
675 *strrchr(
buffer,
'/') =
'\0';
678 ERROR(
"unable to change working directory: %s\n", strerror(errno));
681 execv(args[0], args);
689 args = g_new0(
char*, 5);
694 if (len>=
sizeof(
buffer)) {
696 log_printf(
"ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
702 args[0] =
"/usr/bin/ssh";
707 execv(args[0], args);
711 log_printf(
"ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->
owner->
id,
752 ERROR(
"invalid arguments passed to meta_agent_init()");
759 log_printf(
"ERROR failed to load %s meta agent", name);
766 strcpy(ma->
name, name);
768 strcat(ma->
raw_cmd,
" --scheduler_start");
806 int child_to_parent[2];
807 int parent_to_child[2];
813 log_printf(
"ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
814 log_printf(
"ERROR: no other information available\n");
821 log_printf(
"ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
832 agent->
status = AG_CREATED;
835 if (agent->
type == NULL)
844 ERROR(
"agent %s has been invalidated by version information", job->
agent_type);
849 if (pipe(parent_to_child) != 0)
851 ERROR(
"JOB[%d.%s] failed to create parent to child pipe", job->
id, job->
agent_type);
855 if (pipe(child_to_parent) != 0)
857 ERROR(
"JOB[%d.%s] failed to create child to parent pipe", job->
id, job->
agent_type);
864 agent->
to_child = parent_to_child[1];
901 pass->scheduler = scheduler;
904 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
933 fclose(agent->
write);
957 if ((agent = g_tree_lookup(scheduler->
agents, &pid[0])) == NULL)
959 ERROR(
"invalid agent death event: pid[%d]", pid[0]);
966 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
968 g_thread_join(agent->
thread);
972 if (WIFEXITED(status))
976 else if (WIFSIGNALED(status))
978 AGENT_CONCURRENT_PRINT(
"agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
979 if (WCOREDUMP(status))
986 AGENT_WARNING(
"agent closed unexpectedly, agent status was %s", agent_status_strings[agent->
status]);
990 if (agent->
status != AG_PAUSED && agent->
status != AG_FAILED)
996 log_printf(
"ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->
host->
name,
1006 g_tree_remove(scheduler->
agents, &agent->
pid);
1026 g_tree_insert(scheduler->
agents, &agent->
pid, agent);
1047 if (agent->
status == AG_SPAWNED)
1071 if (write(agent->
to_parent,
"@@@0\n", 5) != 5)
1089 g_tree_foreach(scheduler->
agents, (GTraverseFunc)
update, NULL);
1107 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
1108 AGENT_ERROR(
"Failed to kill agent thread cleanly");
1121 g_output_stream_write(ostr,
"\nend\n", 5, NULL, NULL);
1139 agent_status_strings[new_status]);
1143 if (agent->
status == AG_PAUSED)
1148 if (new_status == AG_PAUSED)
1155 agent->
status = new_status;
1166 kill(agent->
pid, SIGSTOP);
1179 kill(agent->
pid, SIGCONT);
1197 struct tm* time_info;
1202 strcpy(time_buf,
"(none)");
1203 time_info = localtime(&agent->
check_in);
1205 strftime(time_buf,
sizeof(time_buf),
"%F %T", localtime(&agent->
check_in));
1206 status_str = g_strdup_printf(
"agent:%d host:%s type:%s status:%s time:%s\n", agent->
pid, agent->
host->
name,
1207 agent->
type->
name, agent_status_strings[agent->
status], time_buf);
1210 g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1227 kill(agent->
pid, SIGKILL);
1244 va_start(args, fmt);
1247 tmp = g_strdup_vprintf(fmt, args);
1248 tmp[strlen(tmp) - 1] =
'\0';
1250 rc = fprintf(agent->
write,
"%s\n", tmp);
1255 rc = vfprintf(agent->
write, fmt, args);
1258 fflush(agent->
write);
1276 return write(agent->
to_parent, buf, count);
1323 if (g_tree_lookup(meta_agents, name) == NULL)
1327 g_tree_insert(meta_agents, ma->
name, ma);
1343 return (ma != NULL) && ((ma->
special & special_type) != 0);
1355 return (agent != NULL) && ((agent->
special & special_type) != 0);
1365 V_AGENT(
"AGENT[%s] run increased to %d\n", ma->
name, ma->
run_count);
1375 V_AGENT(
"AGENT[%s] run decreased to %d\n", ma->
name, ma->
run_count);
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces.
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
int aprintf(agent_t *agent, const char *fmt,...)
static int agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent's pipes.
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
ssize_t agent_write(agent_t *agent, const void *buf, int count)
#define AGENT_SEQUENTIAL_PRINT(...)
#define AGENT_CONCURRENT_PRINT(...)
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
void agent_update_event(scheduler_t *scheduler, void *unused)
void meta_agent_increase_count(meta_agent_t *ma)
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
#define AGENT_WARNING(...)
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
void agent_kill(agent_t *agent)
Unclean kill of an agent.
void agent_pause(agent_t *agent)
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
void agent_transition(agent_t *agent, agent_status new_status)
#define TEST_NULL(a, ret)
Test if paramater is NULL.
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
void agent_unpause(agent_t *agent)
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
void meta_agent_decrease_count(meta_agent_t *ma)
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
#define TEST_NULV(a)
Test if paramater is NULL.
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
#define SELECT_STRING(passed)
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
void meta_agent_destroy(meta_agent_t *ma)
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Header file with agent related operations.
#define MAX_CMD
the size of the agent's command buffer (arbitrary)
#define SAG_NOKILL
This agent should not be killed when updating the agent.
#define AGENT_STATUS_TYPES(apply)
#define MAX_NAME
the size of the agent's name buffer (arbitrary)
Event handling operations.
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
void job_fail_agent(job_t *job, void *agent)
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
void job_update(scheduler_t *scheduler, job_t *job)
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
char * job_next(job_t *job)
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
log_t * job_log(job_t *job)
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
void job_finish_agent(job_t *job, void *agent)
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
int jobId
The id of the job.
char buffer[2048]
The last thing received from the scheduler.
#define THREAD_FATAL(file,...)
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
Header file for the scheduler.
#define AGENT_CONF
Agent conf location.
#define AGENT_BINARY
Format to get agent binary.
agent_t * agent
Reference to current agent state.
scheduler_t * scheduler
Reference to current scheduler state.
time_t check_in
the time that the agent last generated anything
int to_child
file identifier to print to the child
agent_status status
the state of execution the agent is currently in
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
int from_child
file identifier to read from child
gchar * data
the data that has been sent to the agent for analysis
FILE * write
FILE* that abstracts the use of the to_child socket.
gboolean alive
flag to tell the scheduler if the agent is still alive
job_t * owner
the job that this agent is assigned to
FILE * read
FILE* that abstracts the use of the from_child socket.
uint32_t special
any special flags that the agent has set
int to_parent
file identifier to print to the parent (child stdout)
uint8_t n_updates
keeps track of the number of times the agent has updated
uint64_t total_analyzed
the total number that this agent has analyzed
host_t * host
the host that this agent will start on
GThread * thread
the thread that communicates with this agent
int from_parent
file identifier to read from the parent (child stdin)
uint8_t return_code
what was returned by the agent when it disconnected
pid_t pid
the pid of the process this agent is running in
gboolean updated
boolean flag to indicate if the scheduler has updated the data
char * agent_dir
The location on the host machine where the executables are.
char * address
The address of the host, used by ssh when starting a new agent.
char * name
The name of the host, used to store host internally to scheduler.
int32_t id
The identifier for this job.
job_status status
The current status for the job.
int32_t group_id
The id of the group that created the job.
gchar * message
Message that will be sent with job notification email.
gchar * jq_cmd_args
Command line arguments for this job.
int32_t user_id
The id of the user that created the job.
char * agent_type
The type of agent used to analyze the data.
int32_t priority
Importance of the job, maps directory to unix priority.
int32_t parent_id
The identifier for the parent of this job (its queue id)
Store the results of a regex match.
GTree * job_list
List of jobs that have been created.
gchar * sysconfigdir
The system directory that contain fossology.conf.
GList * host_queue
Round-robin queue for choosing which host use next.
GRegex * parse_agent_msg
Parses messages coming from the agents.
GTree * meta_agents
List of all meta agents available to the scheduler.
GTree * agents
List of any currently running agents.
GSequence * job_queue
heap of jobs that still need to be started