34 #include <sys/types.h>
47 #define TEST_NULV(a) if(!a) { \
48 errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; }
57 #define TEST_NULL(a, ret) if(!a) { \
58 errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; }
61 #define AGENT_CREDENTIAL do { \
62 if(agent && agent->owner) \
63 log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, \
64 agent->type ? agent->type->name : "?", \
65 agent->pid, agent->host ? agent->host->name : "?"); \
67 log_printf("AGENT[%s][pid=%d][host=%s]: ", \
68 agent && agent->type ? agent->type->name : "?", \
69 agent ? agent->pid : -1, \
70 agent && agent->host ? agent->host->name : "?"); \
74 #define AGENT_LOG_CREDENTIAL do { \
75 if(agent && agent->owner) \
76 con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \
77 agent->owner->id, agent->type ? agent->type->name : "?", \
78 agent->pid, agent->host ? agent->host->name : "?"); \
82 #define AGENT_ERROR(...) do { \
83 log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \
85 log_printf(__VA_ARGS__); \
86 log_printf("\n"); } while(0)
89 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \
90 log_printf("NOTE: "); \
92 log_printf(__VA_ARGS__); \
93 log_printf("\n"); } while(0)
96 #define AGENT_WARNING(...) if(TEST_WARNING) do { \
97 log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \
99 log_printf(__VA_ARGS__); \
100 log_printf("\n"); } while(0)
103 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \
105 log_printf(__VA_ARGS__); } while(0)
108 #define AGENT_CONCURRENT_PRINT(...) do { \
109 AGENT_LOG_CREDENTIAL; \
110 con_printf(job_log(agent->owner), __VA_ARGS__); } while(0)
120 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed),
121 const char* agent_status_strings[] =
143 if (agent != excepted)
167 if (agent->
owner == NULL)
169 log_printf(
"ERROR %s.%d: Agent pid %d has no owner; killing to prevent NULL deref\n", __FILE__, __LINE__, agent->
pid);
175 if (agent->
status == AG_SPAWNED || agent->
status == AG_RUNNING || agent->
status == AG_PAUSED)
178 if (time(NULL) - agent->
check_in > CONF_agent_death_timer && !(agent->
owner->
status == JB_PAUSED) && !nokill)
194 if (agent->
n_updates > CONF_agent_update_number && !nokill)
239 g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
240 g_output_stream_write(ostr,
" ", 1, NULL, NULL);
258 static int32_t id_gen = -1;
262 char *jq_cmd_args = 0;
264 for (iter = scheduler->
host_queue; iter != NULL; iter = iter->next)
266 host = (
host_t*) iter->data;
267 V_AGENT(
"META_AGENT[%s] testing on HOST[%s]\n", ma->
name, host->
name);
277 #if GLIB_CHECK_VERSION(2, 32, 0)
278 static GMutex version_lock;
280 static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
292 if (ma == NULL)
return;
293 #if GLIB_CHECK_VERSION(2, 32, 0)
294 g_mutex_lock(&version_lock);
296 g_static_mutex_lock(&version_lock);
300 log_printf(
"NOTE: version refresh: resetting cached version for agent "
301 "type \"%s\" (was \"%s\")\n", ma->
name, ma->
version);
306 #if GLIB_CHECK_VERSION(2, 32, 0)
307 g_mutex_unlock(&version_lock);
309 g_static_mutex_unlock(&version_lock);
360 if (strncmp(
buffer,
"VERSION: ", 9) != 0)
362 if (strncmp(
buffer,
"@@@1", 4) == 0)
371 con_printf(
main_log,
"ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
379 #if GLIB_CHECK_VERSION(2, 32, 0)
380 g_mutex_lock(&version_lock);
382 g_static_mutex_lock(&version_lock);
385 const char* version_str =
buffer + 9;
394 else if (strcmp(agent->
type->
version, version_str) != 0)
401 "ERROR %s.%d: META_DATA[%s] invalid agent spawn check (startup)\n",
402 __FILE__, __LINE__, agent->
type->
name);
404 "ERROR: versions don't match: %s(\"%s\") != received: %s(\"%s\")\n",
410 kill(-agent->
pid, SIGKILL);
411 #if GLIB_CHECK_VERSION(2, 32, 0)
412 g_mutex_unlock(&version_lock);
414 g_static_mutex_unlock(&version_lock);
421 "WARNING %s.%d: META_AGENT[%s] version changed: \"%s\" -> \"%s\" "
422 "(was reported by: %s, now: %s). Adopting new version; agent will respawn.\n",
423 __FILE__, __LINE__, agent->
type->
name,
433 kill(-agent->
pid, SIGKILL);
434 #if GLIB_CHECK_VERSION(2, 32, 0)
435 g_mutex_unlock(&version_lock);
437 g_static_mutex_unlock(&version_lock);
441 #if GLIB_CHECK_VERSION(2, 32, 0)
442 g_mutex_unlock(&version_lock);
444 g_static_mutex_unlock(&version_lock);
467 if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(
buffer,
"SPECIAL", 7) != 0))
477 if (strncmp(
buffer,
"BYE", 3) == 0)
485 fail_arg->
pid = agent->
pid;
486 fail_arg->
agent = agent;
498 if (strncmp(
buffer,
"@@@1", 4) == 0)
511 fflush(agent->
write);
527 if (strncmp(
buffer,
"OK", 2) == 0)
529 if (agent->
status != AG_PAUSED)
540 else if (strncmp(
buffer,
"HEART", 5) == 0)
544 arg = g_match_info_fetch(
match, 3);
548 arg = g_match_info_fetch(
match, 6);
549 agent->
alive = (arg[0] ==
'1' || agent->
alive);
552 g_match_info_free(
match);
564 else if (strncmp(
buffer,
"EMAIL", 5) == 0)
577 else if (strncmp(
buffer,
"SPECIAL", 7) == 0)
583 arg = g_match_info_fetch(
match, 3);
584 relevant &= atoi(arg);
587 arg = g_match_info_fetch(
match, 6);
595 if (!(agent->
special & relevant))
600 g_match_info_free(
match);
610 else if (strncmp(
buffer,
"GETSPECIAL", 10) == 0)
614 arg = g_match_info_fetch(
match, 3);
615 relevant = atoi(arg);
623 g_match_info_free(
match);
631 else if (!(TVERB_AGENT))
651 static void shell_parse(
char* confdir,
int user_id,
int group_id,
char* input,
char *jq_cmd_args,
int jobId,
int* argc,
char*** argv)
656 #define MAX_CMD_ARGS 30
658 *argv = g_new0(
char*, MAX_CMD_ARGS);
661 for (curr = input; *curr; curr++)
672 (*argv)[idx++] = g_strdup(begin);
675 else if (begin == NULL)
679 else if (*begin ==
'"' && *curr ==
'"')
684 (*argv)[idx++] = g_strdup(begin + 1);
687 if (idx > MAX_CMD_ARGS - 7)
691 (*argv)[idx++] = g_strdup_printf(
"--jobId=%d",
jobId);
692 (*argv)[idx++] = g_strdup_printf(
"--config=%s", confdir);
693 (*argv)[idx++] = g_strdup_printf(
"--userID=%d", user_id);
694 (*argv)[idx++] = g_strdup_printf(
"--groupID=%d", group_id);
695 (*argv)[idx++] =
"--scheduler_start";
698 const char *
start = jq_cmd_args;
699 const char *current = jq_cmd_args;
700 gboolean in_quotes = FALSE;
702 while (*current !=
'\0')
704 if (*current ==
'\'' || *current ==
'"')
705 in_quotes = !in_quotes;
706 else if (*current ==
' ' && !in_quotes)
710 int len = current -
start;
711 char *arg = g_strndup(
start, len);
712 (*argv)[idx++] = arg;
721 char *arg = g_strndup(
start, current -
start);
722 (*argv)[idx++] = arg;
769 if (agent->
owner == NULL)
771 log_printf(
"ERROR %s.%d: Agent spawn requested but agent has no owner; aborting spawn.\n", __FILE__, __LINE__);
781 agent->
status = AG_FAILED;
785 while ((agent->
pid = fork()) < 0)
786 sleep(rand() % CONF_fork_backoff_time);
806 ERROR(
"unable to correctly set priority of agent process %d", agent->
pid);
822 *strrchr(
buffer,
'/') =
'\0';
825 ERROR(
"unable to change working directory: %s\n", strerror(errno));
828 execv(args[0], args);
836 args = g_new0(
char*, 5);
841 if (len>=
sizeof(
buffer)) {
843 log_printf(
"ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
849 args[0] =
"/usr/bin/ssh";
854 execv(args[0], args);
858 log_printf(
"ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->
owner->
id,
865 setpgid(agent->
pid, agent->
pid);
902 ERROR(
"invalid arguments passed to meta_agent_init()");
909 log_printf(
"ERROR failed to load %s meta agent", name);
916 strcpy(ma->
name, name);
918 strcat(ma->
raw_cmd,
" --scheduler_start");
956 int child_to_parent[2];
957 int parent_to_child[2];
963 log_printf(
"ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
964 log_printf(
"ERROR: no other information available\n");
972 log_printf(
"ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
983 agent->
status = AG_CREATED;
988 ERROR(
"agent %s has been invalidated by version information", job->
agent_type);
992 job->
message = g_strdup(
"agent type has been invalidated");
999 if (pipe(parent_to_child) != 0)
1001 ERROR(
"JOB[%d.%s] failed to create parent to child pipe", job->
id, job->
agent_type);
1005 if (pipe(child_to_parent) != 0)
1007 ERROR(
"JOB[%d.%s] failed to create child to parent pipe", job->
id, job->
agent_type);
1008 close(parent_to_child[0]);
1009 close(parent_to_child[1]);
1016 agent->
to_child = parent_to_child[1];
1045 fclose(agent->
read);
1065 pass->scheduler = scheduler;
1066 pass->agent = agent;
1068 #if GLIB_CHECK_VERSION(2, 32, 0)
1097 fclose(agent->
write);
1101 fclose(agent->
read);
1123 int status = pid[1];
1125 if ((agent = g_tree_lookup(scheduler->
agents, &pid[0])) == NULL)
1133 ERROR(
"invalid agent death event: pid[%d]", pid[0]);
1141 if (agent->
owner == NULL)
1143 log_printf(
"ERROR %s.%d: agent_death_event for ownerless agent pid %d - cleaning up\n",
1144 __FILE__, __LINE__, (
int)pid[0]);
1145 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
1149 g_thread_join(agent->
thread);
1158 g_tree_remove(scheduler->
agents, &agent->
pid);
1168 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
1172 g_thread_join(agent->
thread);
1178 if (WIFEXITED(status))
1182 else if (WIFSIGNALED(status))
1184 AGENT_CONCURRENT_PRINT(
"agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
1185 if (WCOREDUMP(status))
1194 AGENT_WARNING(
"agent closed unexpectedly, agent status was %s", agent_status_strings[agent->
status]);
1202 if (agent->
status == AG_FAILED)
1210 else if (agent->
status != AG_PAUSED)
1219 if (agent->
owner != NULL &&
1237 else if (agent->
owner != NULL)
1243 log_printf(
"ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->
host->
name,
1253 g_tree_remove(scheduler->
agents, &agent->
pid);
1273 g_tree_insert(scheduler->
agents, &agent->
pid, agent);
1296 if (agent->
owner == NULL)
1298 ERROR(
"Agent ready event received but agent has no owner. Terminating agent to prevent scheduler crash.");
1303 if (agent->
status == AG_SPAWNED)
1333 if (write(agent->
to_parent,
"@@@0\n", 5) != 5)
1355 if (agent == NULL || agent->
owner == NULL)
1364 if (agent->
status != AG_SPAWNED && agent->
status != AG_RUNNING)
1369 if (ctx->
now - agent->
check_in <= (time_t)(3 * CONF_agent_death_timer))
1374 ctx->
list = g_list_prepend(ctx->
list, agent);
1392 g_tree_foreach(scheduler->
agents, (GTraverseFunc)
update, NULL);
1399 for (iter = zctx.
list; iter != NULL; iter = iter->next)
1403 log_printf(
"WARNING %s.%d: JOB[%d].%s[%d.%s]: agent unresponsive for %ld s"
1404 " in a FAILED job - forcing death-event cleanup\n",
1415 pid_t* pids = g_new0(pid_t, 3);
1416 pids[0] = zombie->
pid;
1421 g_list_free(zctx.
list);
1439 if (agent->
owner == NULL)
1441 AGENT_ERROR(
"agent_fail_event called on ownerless agent (pid=%d), killing cleanly", agent->
pid);
1443 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
1445 AGENT_ERROR(
"Failed to kill ownerless agent thread cleanly");
1452 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
1454 AGENT_ERROR(
"Failed to kill agent thread cleanly");
1475 if (agent != NULL && agent == fa->
agent && agent->
status != AG_FAILED)
1493 g_output_stream_write(ostr,
"\nend\n", 5, NULL, NULL);
1511 agent_status_strings[new_status]);
1515 if (agent->
status == AG_PAUSED)
1520 if (new_status == AG_PAUSED)
1527 agent->
status = new_status;
1538 kill(agent->
pid, SIGSTOP);
1551 kill(agent->
pid, SIGCONT);
1569 struct tm* time_info;
1574 strcpy(time_buf,
"(none)");
1575 time_info = localtime(&agent->
check_in);
1577 strftime(time_buf,
sizeof(time_buf),
"%F %T", localtime(&agent->
check_in));
1578 status_str = g_strdup_printf(
"agent:%d host:%s type:%s status:%s time:%s\n", agent->
pid, agent->
host->
name,
1579 agent->
type->
name, agent_status_strings[agent->
status], time_buf);
1582 g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1606 kill(-agent->
pid, SIGKILL);
1623 va_start(args, fmt);
1626 tmp = g_strdup_vprintf(fmt, args);
1627 tmp[strlen(tmp) - 1] =
'\0';
1629 rc = fprintf(agent->
write,
"%s\n", tmp);
1634 rc = vfprintf(agent->
write, fmt, args);
1637 fflush(agent->
write);
1655 return write(agent->
to_parent, buf, count);
1702 if (g_tree_lookup(meta_agents, name) == NULL)
1706 g_tree_insert(meta_agents, ma->
name, ma);
1722 return (ma != NULL) && ((ma->
special & special_type) != 0);
1734 return (agent != NULL) && ((agent->
special & special_type) != 0);
1744 V_AGENT(
"AGENT[%s] run increased to %d\n", ma->
name, ma->
run_count);
1754 V_AGENT(
"AGENT[%s] run decreased to %d\n", ma->
name, ma->
run_count);
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces.
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
int aprintf(agent_t *agent, const char *fmt,...)
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
static gboolean agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent's pipes.
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
static gboolean collect_zombie_agents(int *pid_ptr, agent_t *agent, zombie_ctx *ctx)
GTraverseFunc: collect agents in FAILED jobs silent for >3×agent_death_timer. Targets D-state process...
ssize_t agent_write(agent_t *agent, const void *buf, int count)
#define AGENT_SEQUENTIAL_PRINT(...)
#define AGENT_CONCURRENT_PRINT(...)
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
void agent_meta_version_reset(meta_agent_t *ma)
Reset a meta_agent's cached version fields under version_lock.
void agent_update_event(scheduler_t *scheduler, void *unused)
void meta_agent_increase_count(meta_agent_t *ma)
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
#define AGENT_WARNING(...)
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
void agent_kill(agent_t *agent)
Unclean kill of an agent whose job should fail.
void agent_pause(agent_t *agent)
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
void agent_transition(agent_t *agent, agent_status new_status)
#define TEST_NULL(a, ret)
Test if paramater is NULL.
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
void agent_unpause(agent_t *agent)
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
void meta_agent_decrease_count(meta_agent_t *ma)
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
void agent_fail_event_pid(scheduler_t *scheduler, void *arg)
Deferred agent_fail_event() that is safe against a freed agent.
#define TEST_NULV(a)
Test if paramater is NULL.
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
#define SELECT_STRING(passed)
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
void meta_agent_destroy(meta_agent_t *ma)
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Header file with agent related operations.
#define MAX_CMD
the size of the agent's command buffer (arbitrary)
#define SAG_NOKILL
This agent should not be killed when updating the agent.
#define AGENT_STATUS_TYPES(apply)
#define MAX_NAME
the size of the agent's name buffer (arbitrary)
Event handling operations.
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
void job_fail_agent(job_t *job, void *agent)
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
void job_update(scheduler_t *scheduler, job_t *job)
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
char * job_next(job_t *job)
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
Used to compare two different jobs in the priority queue.
log_t * job_log(job_t *job)
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
void job_finish_agent(job_t *job, void *agent)
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
int jobId
The id of the job.
char buffer[2048]
The last thing received from the scheduler.
#define THREAD_FATAL(file,...)
start($application)
start the application Assumes application is restartable via /etc/init.d/<script>....
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
Header file for the scheduler.
#define AGENT_CONF
Agent conf location.
#define AGENT_BINARY
Format to get agent binary.
agent_t * agent
Reference to current agent state.
scheduler_t * scheduler
Reference to current scheduler state.
time_t check_in
the time that the agent last generated anything
int to_child
file identifier to print to the child
agent_status status
the state of execution the agent is currently in
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
int from_child
file identifier to read from child
gboolean accounted
TRUE if this agent holds host load + run_count (owner->id > 0 at init); used to release the slot if t...
gchar * data
the data that has been sent to the agent for analysis
FILE * write
FILE* that abstracts the use of the to_child socket.
gboolean alive
flag to tell the scheduler if the agent is still alive
job_t * owner
the job that this agent is assigned to
FILE * read
FILE* that abstracts the use of the from_child socket.
uint32_t special
any special flags that the agent has set
int to_parent
file identifier to print to the parent (child stdout)
uint8_t n_updates
keeps track of the number of times the agent has updated
uint64_t total_analyzed
the total number that this agent has analyzed
host_t * host
the host that this agent will start on
GThread * thread
the thread that communicates with this agent
int from_parent
file identifier to read from the parent (child stdin)
uint8_t return_code
what was returned by the agent when it disconnected
pid_t pid
the pid of the process this agent is running in
gboolean updated
boolean flag to indicate if the scheduler has updated the data
Argument for agent_fail_event_pid().
agent_t * agent
expected agent pointer, only compared
pid_t pid
pid of the agent that reported failure
char * agent_dir
The location on the host machine where the executables are.
char * address
The address of the host, used by ssh when starting a new agent.
char * name
The name of the host, used to store host internally to scheduler.
int32_t id
The identifier for this job.
job_status status
The current status for the job.
int32_t group_id
The id of the group that created the job.
gchar * message
Message that will be sent with job notification email.
gchar * jq_cmd_args
Command line arguments for this job.
time_t checkedout_at
Timestamp when job entered JB_CHECKEDOUT (for stale detection grace period)
int32_t user_id
The id of the user that created the job.
char * agent_type
The type of agent used to analyze the data.
int32_t priority
Importance of the job, maps directory to unix priority.
int32_t parent_id
The identifier for the parent of this job (its queue id)
GList * running_agents
The list of agents assigned to this job that are still working.
Store the results of a regex match.
GTree * job_list
List of jobs that have been created.
gchar * sysconfigdir
The system directory that contain fossology.conf.
GList * host_queue
Round-robin queue for choosing which host use next.
GRegex * parse_agent_msg
Parses messages coming from the agents.
GTree * meta_agents
List of all meta agents available to the scheduler.
GTree * agents
List of any currently running agents.
GSequence * job_queue
heap of jobs that still need to be started
Context passed to collect_zombie_agents().
time_t now
Current-time snapshot for the traversal.
GList * list
Accumulates zombie agent_t pointers.