34 #include <sys/types.h>
47 #define TEST_NULV(a) if(!a) { \
48 errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; }
57 #define TEST_NULL(a, ret) if(!a) { \
58 errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; }
61 #define AGENT_CREDENTIAL do { \
62 if(agent && agent->owner) \
63 log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, \
64 agent->type ? agent->type->name : "?", \
65 agent->pid, agent->host ? agent->host->name : "?"); \
67 log_printf("AGENT[%s][pid=%d][host=%s]: ", \
68 agent && agent->type ? agent->type->name : "?", \
69 agent ? agent->pid : -1, \
70 agent && agent->host ? agent->host->name : "?"); \
74 #define AGENT_LOG_CREDENTIAL do { \
75 if(agent && agent->owner) \
76 con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \
77 agent->owner->id, agent->type ? agent->type->name : "?", \
78 agent->pid, agent->host ? agent->host->name : "?"); \
82 #define AGENT_ERROR(...) do { \
83 log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \
85 log_printf(__VA_ARGS__); \
86 log_printf("\n"); } while(0)
89 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \
90 log_printf("NOTE: "); \
92 log_printf(__VA_ARGS__); \
93 log_printf("\n"); } while(0)
96 #define AGENT_WARNING(...) if(TEST_WARNING) do { \
97 log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \
99 log_printf(__VA_ARGS__); \
100 log_printf("\n"); } while(0)
103 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \
105 log_printf(__VA_ARGS__); } while(0)
108 #define AGENT_CONCURRENT_PRINT(...) do { \
109 AGENT_LOG_CREDENTIAL; \
110 con_printf(job_log(agent->owner), __VA_ARGS__); } while(0)
120 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed),
121 const char* agent_status_strings[] =
143 if (agent != excepted)
148 fclose(agent->
write);
166 if (agent->
owner == NULL)
168 log_printf(
"ERROR %s.%d: Agent pid %d has no owner; killing to prevent NULL deref\n", __FILE__, __LINE__, agent->
pid);
174 if (agent->
status == AG_SPAWNED || agent->
status == AG_RUNNING || agent->
status == AG_PAUSED)
177 if (time(NULL) - agent->
check_in > CONF_agent_death_timer && !(agent->
owner->
status == JB_PAUSED) && !nokill)
193 if (agent->
n_updates > CONF_agent_update_number && !nokill)
238 g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
239 g_output_stream_write(ostr,
" ", 1, NULL, NULL);
257 static int32_t id_gen = -1;
261 char *jq_cmd_args = 0;
263 for (iter = scheduler->
host_queue; iter != NULL; iter = iter->next)
265 host = (
host_t*) iter->data;
266 V_AGENT(
"META_AGENT[%s] testing on HOST[%s]\n", ma->
name, host->
name);
284 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
285 static GMutex version_lock;
287 static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
316 if (strncmp(
buffer,
"VERSION: ", 9) != 0)
318 if (strncmp(
buffer,
"@@@1", 4) == 0)
327 con_printf(
main_log,
"ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
335 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
336 g_mutex_lock(&version_lock);
338 g_static_mutex_lock(&version_lock);
351 con_printf(
job_log(agent->
owner),
"ERROR %s.%d: META_DATA[%s] invalid agent spawn check\n", __FILE__, __LINE__,
353 con_printf(
job_log(agent->
owner),
"ERROR: versions don't match: %s(%s) != received: %s(%s)\n",
357 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
358 g_mutex_unlock(&version_lock);
360 g_static_mutex_unlock(&version_lock);
364 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
365 g_mutex_unlock(&version_lock);
367 g_static_mutex_unlock(&version_lock);
390 if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(
buffer,
"SPECIAL", 7) != 0))
400 if (strncmp(
buffer,
"BYE", 3) == 0)
416 if (strncmp(
buffer,
"@@@1", 4) == 0)
429 fflush(agent->
write);
445 if (strncmp(
buffer,
"OK", 2) == 0)
447 if (agent->
status != AG_PAUSED)
458 else if (strncmp(
buffer,
"HEART", 5) == 0)
462 arg = g_match_info_fetch(
match, 3);
466 arg = g_match_info_fetch(
match, 6);
467 agent->
alive = (arg[0] ==
'1' || agent->
alive);
470 g_match_info_free(
match);
482 else if (strncmp(
buffer,
"EMAIL", 5) == 0)
494 else if (strncmp(
buffer,
"SPECIAL", 7) == 0)
500 arg = g_match_info_fetch(
match, 3);
501 relevant &= atoi(arg);
504 arg = g_match_info_fetch(
match, 6);
512 if (!(agent->
special & relevant))
517 g_match_info_free(
match);
527 else if (strncmp(
buffer,
"GETSPECIAL", 10) == 0)
531 arg = g_match_info_fetch(
match, 3);
532 relevant = atoi(arg);
540 g_match_info_free(
match);
548 else if (!(TVERB_AGENT))
568 static void shell_parse(
char* confdir,
int user_id,
int group_id,
char* input,
char *jq_cmd_args,
int jobId,
int* argc,
char*** argv)
573 #define MAX_CMD_ARGS 30
575 *argv = g_new0(
char*, MAX_CMD_ARGS);
578 for (curr = input; *curr; curr++)
589 (*argv)[idx++] = g_strdup(begin);
592 else if (begin == NULL)
596 else if (*begin ==
'"' && *curr ==
'"')
601 (*argv)[idx++] = g_strdup(begin + 1);
604 if (idx > MAX_CMD_ARGS - 7)
608 (*argv)[idx++] = g_strdup_printf(
"--jobId=%d",
jobId);
609 (*argv)[idx++] = g_strdup_printf(
"--config=%s", confdir);
610 (*argv)[idx++] = g_strdup_printf(
"--userID=%d", user_id);
611 (*argv)[idx++] = g_strdup_printf(
"--groupID=%d", group_id);
612 (*argv)[idx++] =
"--scheduler_start";
615 const char *
start = jq_cmd_args;
616 const char *current = jq_cmd_args;
617 gboolean in_quotes = FALSE;
619 while (*current !=
'\0')
621 if (*current ==
'\'' || *current ==
'"')
622 in_quotes = !in_quotes;
623 else if (*current ==
' ' && !in_quotes)
627 int len = current -
start;
628 char *arg = g_strndup(
start, len);
629 (*argv)[idx++] = arg;
638 char *arg = g_strndup(
start, current -
start);
639 (*argv)[idx++] = arg;
685 if (agent->
owner == NULL)
687 log_printf(
"ERROR %s.%d: Agent spawn requested but agent has no owner; aborting spawn.\n", __FILE__, __LINE__);
698 agent->
status = AG_FAILED;
703 while ((agent->
pid = fork()) < 0)
704 sleep(rand() % CONF_fork_backoff_time);
721 ERROR(
"unable to correctly set priority of agent process %d", agent->
pid);
737 *strrchr(
buffer,
'/') =
'\0';
740 ERROR(
"unable to change working directory: %s\n", strerror(errno));
743 execv(args[0], args);
751 args = g_new0(
char*, 5);
756 if (len>=
sizeof(
buffer)) {
758 log_printf(
"ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
764 args[0] =
"/usr/bin/ssh";
769 execv(args[0], args);
773 log_printf(
"ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->
owner->
id,
814 ERROR(
"invalid arguments passed to meta_agent_init()");
821 log_printf(
"ERROR failed to load %s meta agent", name);
828 strcpy(ma->
name, name);
830 strcat(ma->
raw_cmd,
" --scheduler_start");
868 int child_to_parent[2];
869 int parent_to_child[2];
875 log_printf(
"ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
876 log_printf(
"ERROR: no other information available\n");
883 log_printf(
"ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
894 agent->
status = AG_CREATED;
897 if (agent->
type == NULL)
906 ERROR(
"agent %s has been invalidated by version information", job->
agent_type);
911 if (pipe(parent_to_child) != 0)
913 ERROR(
"JOB[%d.%s] failed to create parent to child pipe", job->
id, job->
agent_type);
917 if (pipe(child_to_parent) != 0)
919 ERROR(
"JOB[%d.%s] failed to create child to parent pipe", job->
id, job->
agent_type);
926 agent->
to_child = parent_to_child[1];
963 pass->scheduler = scheduler;
966 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
995 fclose(agent->
write);
1017 int status = pid[1];
1019 if ((agent = g_tree_lookup(scheduler->
agents, &pid[0])) == NULL)
1021 ERROR(
"invalid agent death event: pid[%d]", pid[0]);
1028 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
1030 g_thread_join(agent->
thread);
1034 if (WIFEXITED(status))
1038 else if (WIFSIGNALED(status))
1040 AGENT_CONCURRENT_PRINT(
"agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
1041 if (WCOREDUMP(status))
1048 AGENT_WARNING(
"agent closed unexpectedly, agent status was %s", agent_status_strings[agent->
status]);
1052 if (agent->
status != AG_PAUSED && agent->
status != AG_FAILED)
1058 log_printf(
"ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->
host->
name,
1068 g_tree_remove(scheduler->
agents, &agent->
pid);
1088 g_tree_insert(scheduler->
agents, &agent->
pid, agent);
1111 if (agent->
owner == NULL)
1113 ERROR(
"Agent ready event received but agent has no owner. Terminating agent to prevent scheduler crash.");
1118 if (agent->
status == AG_SPAWNED)
1142 if (write(agent->
to_parent,
"@@@0\n", 5) != 5)
1160 g_tree_foreach(scheduler->
agents, (GTraverseFunc)
update, NULL);
1178 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
1179 AGENT_ERROR(
"Failed to kill agent thread cleanly");
1192 g_output_stream_write(ostr,
"\nend\n", 5, NULL, NULL);
1210 agent_status_strings[new_status]);
1214 if (agent->
status == AG_PAUSED)
1219 if (new_status == AG_PAUSED)
1226 agent->
status = new_status;
1237 kill(agent->
pid, SIGSTOP);
1250 kill(agent->
pid, SIGCONT);
1268 struct tm* time_info;
1273 strcpy(time_buf,
"(none)");
1274 time_info = localtime(&agent->
check_in);
1276 strftime(time_buf,
sizeof(time_buf),
"%F %T", localtime(&agent->
check_in));
1277 status_str = g_strdup_printf(
"agent:%d host:%s type:%s status:%s time:%s\n", agent->
pid, agent->
host->
name,
1278 agent->
type->
name, agent_status_strings[agent->
status], time_buf);
1281 g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1298 kill(agent->
pid, SIGKILL);
1315 va_start(args, fmt);
1318 tmp = g_strdup_vprintf(fmt, args);
1319 tmp[strlen(tmp) - 1] =
'\0';
1321 rc = fprintf(agent->
write,
"%s\n", tmp);
1326 rc = vfprintf(agent->
write, fmt, args);
1329 fflush(agent->
write);
1347 return write(agent->
to_parent, buf, count);
1394 if (g_tree_lookup(meta_agents, name) == NULL)
1398 g_tree_insert(meta_agents, ma->
name, ma);
1414 return (ma != NULL) && ((ma->
special & special_type) != 0);
1426 return (agent != NULL) && ((agent->
special & special_type) != 0);
1436 V_AGENT(
"AGENT[%s] run increased to %d\n", ma->
name, ma->
run_count);
1446 V_AGENT(
"AGENT[%s] run decreased to %d\n", ma->
name, ma->
run_count);
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces.
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
int aprintf(agent_t *agent, const char *fmt,...)
static int agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent's pipes.
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
ssize_t agent_write(agent_t *agent, const void *buf, int count)
#define AGENT_SEQUENTIAL_PRINT(...)
#define AGENT_CONCURRENT_PRINT(...)
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
void agent_update_event(scheduler_t *scheduler, void *unused)
void meta_agent_increase_count(meta_agent_t *ma)
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
#define AGENT_WARNING(...)
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
void agent_kill(agent_t *agent)
Unclean kill of an agent.
void agent_pause(agent_t *agent)
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
void agent_transition(agent_t *agent, agent_status new_status)
#define TEST_NULL(a, ret)
Test if paramater is NULL.
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
void agent_unpause(agent_t *agent)
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
void meta_agent_decrease_count(meta_agent_t *ma)
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
#define TEST_NULV(a)
Test if paramater is NULL.
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
#define SELECT_STRING(passed)
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
void meta_agent_destroy(meta_agent_t *ma)
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Header file with agent related operations.
#define MAX_CMD
the size of the agent's command buffer (arbitrary)
#define SAG_NOKILL
This agent should not be killed when updating the agent.
#define AGENT_STATUS_TYPES(apply)
#define MAX_NAME
the size of the agent's name buffer (arbitrary)
Event handling operations.
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
void job_fail_agent(job_t *job, void *agent)
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
void job_update(scheduler_t *scheduler, job_t *job)
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
char * job_next(job_t *job)
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
log_t * job_log(job_t *job)
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
void job_finish_agent(job_t *job, void *agent)
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
int jobId
The id of the job.
char buffer[2048]
The last thing received from the scheduler.
#define THREAD_FATAL(file,...)
start($application)
start the application Assumes application is restartable via /etc/init.d/<script>....
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
Header file for the scheduler.
#define AGENT_CONF
Agent conf location.
#define AGENT_BINARY
Format to get agent binary.
agent_t * agent
Reference to current agent state.
scheduler_t * scheduler
Reference to current scheduler state.
time_t check_in
the time that the agent last generated anything
int to_child
file identifier to print to the child
agent_status status
the state of execution the agent is currently in
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
int from_child
file identifier to read from child
gchar * data
the data that has been sent to the agent for analysis
FILE * write
FILE* that abstracts the use of the to_child socket.
gboolean alive
flag to tell the scheduler if the agent is still alive
job_t * owner
the job that this agent is assigned to
FILE * read
FILE* that abstracts the use of the from_child socket.
uint32_t special
any special flags that the agent has set
int to_parent
file identifier to print to the parent (child stdout)
uint8_t n_updates
keeps track of the number of times the agent has updated
uint64_t total_analyzed
the total number that this agent has analyzed
host_t * host
the host that this agent will start on
GThread * thread
the thread that communicates with this agent
int from_parent
file identifier to read from the parent (child stdin)
uint8_t return_code
what was returned by the agent when it disconnected
pid_t pid
the pid of the process this agent is running in
gboolean updated
boolean flag to indicate if the scheduler has updated the data
char * agent_dir
The location on the host machine where the executables are.
char * address
The address of the host, used by ssh when starting a new agent.
char * name
The name of the host, used to store host internally to scheduler.
int32_t id
The identifier for this job.
job_status status
The current status for the job.
int32_t group_id
The id of the group that created the job.
gchar * message
Message that will be sent with job notification email.
gchar * jq_cmd_args
Command line arguments for this job.
int32_t user_id
The id of the user that created the job.
char * agent_type
The type of agent used to analyze the data.
int32_t priority
Importance of the job, maps directory to unix priority.
int32_t parent_id
The identifier for the parent of this job (its queue id)
Store the results of a regex match.
GTree * job_list
List of jobs that have been created.
gchar * sysconfigdir
The system directory that contain fossology.conf.
GList * host_queue
Round-robin queue for choosing which host use next.
GRegex * parse_agent_msg
Parses messages coming from the agents.
GTree * meta_agents
List of all meta agents available to the scheduler.
GTree * agents
List of any currently running agents.
GSequence * job_queue
heap of jobs that still need to be started