12 #include <libfossrepo.h>
17 #include <interface.h>
31 #include <sys/types.h>
44 #define TEST_ERROR(error, ...) \
47 log_printf("ERROR %s.%d: %s\n", \
48 __FILE__, __LINE__, error->message); \
49 log_printf("ERROR %s.%d: ", __FILE__, __LINE__); \
50 log_printf(__VA_ARGS__); \
52 g_clear_error(&error); \
62 #define SELECT_DECLS(type, name, l_op, w_op, val) type CONF_##name = val;
70 #define MASK_SIGCHLD (1 << 0)
71 #define MASK_SIGALRM (1 << 1)
72 #define MASK_SIGTERM (1 << 2)
73 #define MASK_SIGQUIT (1 << 3)
74 #define MASK_SIGHUP (1 << 4)
114 case SIGCHLD: __sync_fetch_and_or(&sigmask, MASK_SIGCHLD);
break;
115 case SIGTERM: __sync_fetch_and_or(&sigmask, MASK_SIGTERM);
break;
116 case SIGQUIT: __sync_fetch_and_or(&sigmask, MASK_SIGQUIT);
break;
117 case SIGHUP: __sync_fetch_and_or(&sigmask, MASK_SIGHUP);
break;
119 case SIGCHLD: sigmask |= MASK_SIGCHLD;
break;
120 case SIGALRM: sigmask |= MASK_SIGALRM;
break;
121 case SIGTERM: sigmask |= MASK_SIGTERM;
break;
122 case SIGQUIT: sigmask |= MASK_SIGQUIT;
break;
123 case SIGHUP: sigmask |= MASK_SIGHUP ;
break;
145 static time_t last_update = 0;
152 mask = __sync_fetch_and_and(&sigmask, 0);
160 last_update = time(NULL);
168 if(mask & MASK_SIGCHLD)
175 while((n = waitpid(-1, &status, WNOHANG)) > 0)
177 V_SCHED(
"SIGNALS: received sigchld for pid %d\n", n);
178 pass = g_new0(pid_t, 3);
190 if(mask & MASK_SIGTERM)
192 V_SCHED(
"SIGNALS: Scheduler received terminate signal, shutting down gracefully\n");
202 if(mask & MASK_SIGQUIT)
204 V_SCHED(
"SIGNALS: Scheduler received quit signal, shutting down scheduler\n");
213 if(mask & MASK_SIGHUP)
215 V_SCHED(
"SIGNALS: Scheduler received SGIHUP, reloading configuration data\n");
226 if((time(NULL) - last_update) > CONF_agent_update_interval )
228 V_SPECIAL(
"SIGNALS: Performing agent and database update\n");
231 last_update = time(NULL);
254 ret->
s_pid = getpid();
297 "([A-Z]+):([ \t]+)(\\d+)(([ \t]+)(\\d))?",
314 "\\$([A-Z_]*)(\\.([a-zA-Z_]*)\\.([a-zA-Z_]*))?",
336 "(\\w+)(\\s+(-?\\d+))?(\\s+((-?\\d+)|(\"(.*)\")))?",
337 0, G_REGEX_MATCH_NEWLINE_LF, NULL);
380 if(scheduler->
workers) g_thread_pool_free(scheduler->
workers, FALSE, TRUE);
392 g_tree_unref(scheduler->
agents);
450 if(j == NULL || j->
status != JB_CHECKEDOUT || j->
id < 0)
469 ctx->
list = g_list_prepend(ctx->
list, j);
484 static time_t last_stale_check = 0;
485 time_t now = time(NULL);
494 if((now - last_stale_check) < CONF_agent_update_interval)
499 last_stale_check = now;
502 if(g_tree_nnodes(scheduler->
job_list) == 0)
516 for(GList* iter = ctx.
list; iter != NULL; iter = iter->next)
520 log_printf(
"WARNING %s.%d: JOB[%d] type:%s is CHECKEDOUT with no agents"
521 " - failing as stale\n",
525 j->
message = g_strdup(
"job checked out but no agent was ever started (stale)");
530 GSequenceIter* sit = g_sequence_get_begin_iter(scheduler->
job_queue);
531 while(!g_sequence_iter_is_end(sit))
533 if(g_sequence_get(sit) == j)
535 g_sequence_remove(sit);
538 sit = g_sequence_iter_next(sit);
546 g_list_free(ctx.
list);
567 static job_t* job = NULL;
568 static host_t* host = NULL;
569 static int lockout = 0;
572 int n_agents = g_tree_nnodes(scheduler->
agents);
576 if(scheduler->
s_startup && n_agents == 0)
583 if(
closing && n_agents == 0 && n_jobs == 0)
589 if(lockout && n_agents == 0 && n_jobs == 0)
592 if(job == NULL && !lockout)
596 GList* skipped_jobs = NULL;
604 V_SCHED(
"JOB_INIT: Unable to run agent %s due to max_run limit.\n",
609 skipped_jobs = g_list_prepend(skipped_jobs, job);
617 host = g_tree_lookup(scheduler->
host_list, LOCAL_HOST);
623 skipped_jobs = g_list_prepend(skipped_jobs, job);
639 skipped_jobs = g_list_prepend(skipped_jobs, job);
645 job->
message = g_strdup(
"ERROR: jq_host not in the agent list!");
659 time_t now = time(NULL);
660 GSequenceIter* sit = g_sequence_get_begin_iter(scheduler->
job_queue);
661 while(!g_sequence_iter_is_end(sit))
663 ((
job_t*)g_sequence_get(sit))->checkedout_at = now;
664 sit = g_sequence_iter_next(sit);
676 V_SCHED(
"JOB_INIT: exclusive, postponing initialization\n");
680 V_SCHED(
"Starting JOB[%d].%s\n", job->
id, job->
agent_type);
686 for(GList* sl = skipped_jobs; sl != NULL; sl = sl->next)
690 g_list_free(skipped_jobs);
693 if(job != NULL && n_agents == 0 && n_jobs == 0)
720 #define GU_HEADER "DIRECTORIES"
721 #define GU_GROUP "PROJECTGROUP"
722 #define GU_USER "PROJECTUSER"
740 fo_config_get (config, GU_HEADER, GU_GROUP, NULL) : PROJECT_GROUP;
743 fo_config_get (config, GU_HEADER, GU_USER, NULL) : PROJECT_USER;
746 grp = getgrnam(group);
749 fprintf(stderr,
"FATAL %s.%d: could not find group \"%s\"\n",
750 __FILE__, __LINE__, group);
751 fprintf(stderr,
"FATAL set_usr_grp() aborting due to error: %s\n",
757 setgroups(1, &(grp->gr_gid));
758 if((setgid(grp->gr_gid) != 0) || (setegid(grp->gr_gid) != 0))
760 fprintf(stderr,
"FATAL %s.%d: %s must be run as root or %s\n",
761 __FILE__, __LINE__, process_name, user);
762 fprintf(stderr,
"FATAL Set group '%s' aborting due to error: %s\n",
763 group, strerror(errno));
768 pwd = getpwnam(user);
771 fprintf(stderr,
"FATAL %s.%d: user '%s' not found\n",
772 __FILE__, __LINE__, user);
777 if((setuid(pwd->pw_uid) != 0) || (seteuid(pwd->pw_uid) != 0))
779 fprintf(stderr,
"FATAL %s.%d: %s must run this as %s\n",
780 __FILE__, __LINE__, process_name, user);
781 fprintf(stderr,
"FATAL SETUID aborting due to error: %s\n",
797 gchar f_name[FILENAME_MAX];
802 pid_t s_pid = getpid();
804 if((dp = opendir(
"/proc/")) == NULL)
806 fprintf(stderr,
"ERROR %s.%d: Could not open /proc/ file system\n",
811 while((ep = readdir(dp)) != NULL)
815 snprintf(f_name,
sizeof(f_name),
"/proc/%s/cmdline", ep->d_name);
816 if((file = fopen(f_name,
"rt")))
818 if(fgets(f_name,
sizeof(f_name), file) != NULL &&
819 strstr(f_name,
"fo_scheduler") && s_pid != atoi(ep->d_name))
821 NOTIFY(
"KILL: send signal to process %s\n", ep->d_name);
823 kill(atoi(ep->d_name), SIGQUIT);
825 kill(atoi(ep->d_name), SIGTERM);
887 GList** ret = (GList**)data;
889 *ret = g_list_append(*ret, key);
906 for(iter = keys; iter != NULL; iter = iter->next)
907 g_tree_remove(tree, iter->data);
927 uint32_t special = 0;
932 GError* error = NULL;
936 if((dp = opendir(dirname)) == NULL)
938 FATAL(
"Could not open agent config directory: %s", dirname);
944 while((ep = readdir(dp)) != NULL)
946 if(ep->d_name[0] !=
'.')
948 dirname = g_strdup_printf(
"%s/%s/%s/%s.conf",
954 V_SCHED(
"CONFIG: Could not find %s\n", dirname);
955 g_clear_error(&error);
959 V_SCHED(
"CONFIG: loading config file %s\n", dirname);
963 log_printf(
"ERROR: %s must have a \"default\" group\n", dirname);
964 log_printf(
"ERROR: cause by %s.%d\n", __FILE__, __LINE__);
971 TEST_ERROR(error,
"%s: the special key should be of type list", dirname);
972 for(i = 0; i <
max; i++)
975 TEST_ERROR(error,
"%s: failed to load element %d of special list",
979 if(strncmp(cmd,
"EXCLUSIVE", 9) == 0)
981 else if(strncmp(cmd,
"NOEMAIL", 7) == 0)
983 else if(strncmp(cmd,
"NOKILL", 6) == 0)
985 else if(strncmp(cmd,
"LOCAL", 6) == 0)
987 else if(strlen(cmd) != 0)
988 WARNING(
"%s: Invalid special type for agent %s: %s",
994 TEST_ERROR(error,
"%s: the default group must have a command key", dirname);
996 TEST_ERROR(error,
"%s: the default group must have a max key", dirname);
1000 V_SCHED(
"CONFIG: could not create meta agent using %s\n", ep->d_name);
1002 else if(TVERB_SCHED)
1004 log_printf(
"CONFIG: added new agent\n");
1005 log_printf(
" name = %s\n", name);
1006 log_printf(
" command = %s\n", cmd);
1007 log_printf(
" max = %d\n",
max);
1008 log_printf(
" special = %d\n", special);
1036 int32_t special = 0;
1038 gchar dirbuf[FILENAME_MAX];
1039 GError* error = NULL;
1048 tmp = g_strdup_printf(
"%s/fossology.conf", scheduler->
sysconfigdir);
1050 if(error)
FATAL(
"%s", error->message);
1057 if(scheduler->
i_port == 0)
1059 "FOSSOLOGY",
"port", &error));
1077 for(i = 0; i < special; i++)
1082 WARNING(
"%s\n", error->message);
1083 g_clear_error(&error);
1087 sscanf(tmp,
"%s %s %d", addbuf, dirbuf, &
max);
1092 log_printf(
"CONFIG: added new host\n");
1093 log_printf(
" name = %s\n", keys[i]);
1094 log_printf(
" address = %s\n", addbuf);
1095 log_printf(
" directory = %s\n", dirbuf);
1096 log_printf(
" max = %d\n",
max);
1102 ERROR(
"configuration file failed repository validation");
1103 ERROR(
"The offending line: \"%s\"", tmp);
1109 tmp = g_strdup_printf(
"%s/VERSION", scheduler->
sysconfigdir);
1111 if(error)
FATAL(
"%s", error->message);
1118 GError* ver_err = NULL;
1119 const gchar* new_ver =
fo_config_get(version,
"BUILD",
"COMMIT_HASH", &ver_err);
1122 g_clear_error(&ver_err);
1123 new_ver =
"unknown";
1129 log_printf(
"NOTE: scheduler version initialised to \"%s\"\n",
1135 log_printf(
"NOTE: scheduler version changed: \"%s\" -> \"%s\"\n",
1162 #define SELECT_CONF_INIT(type, name, l_op, w_op, val) \
1163 if(fo_config_has_key(scheduler->sysconfig, "SCHEDULER", #name)) \
1164 CONF_##name = l_op(fo_config_get(scheduler->sysconfig, "SCHEDULER", #name, NULL)); \
1165 V_SPECIAL("CONFIG: %s == " MK_STRING_LIT(w_op) "\n", #name, CONF_##name );
1167 #undef SELECT_CONF_INIT
1184 if((ret = daemon(0, 0)) != 0)
1187 scheduler->
s_pid = getpid();
1281 if (agent->
status != AG_SPAWNED)
1289 log_printf(
"NOTE: version refresh: sending SIGKILL to agent pid %d type \"%s\"\n",
1291 kill(-agent->
pid, SIGKILL);
1309 ctx.scheduler = scheduler;
1312 log_printf(
"NOTE: scheduler_version_refresh: killing all running agents "
1313 "for version update\n");
1315 g_tree_foreach(scheduler->
agents,
1318 log_printf(
"NOTE: scheduler_version_refresh: sent SIGKILL to %d agent(s); "
1319 "they will respawn automatically\n", ctx.count);
1330 int len = strlen(str);
1333 for(i = 0; i < len; i++)
1334 if(!isdigit(str[i]))
1349 return strcmp((
char*)a, (
char*)b);
1361 gint
int_compare(gconstpointer a, gconstpointer b, gpointer user_data)
1363 return *(
int*)a - *(
int*)b;
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
void agent_meta_version_reset(meta_agent_t *ma)
Reset a meta_agent's cached version fields under version_lock.
void agent_update_event(scheduler_t *scheduler, void *unused)
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
void meta_agent_destroy(meta_agent_t *ma)
Header file with agent related operations.
#define SAG_NOEMAIL
This agent should not send notification emails.
#define SAG_NOKILL
This agent should not be killed when updating the agent.
#define SAG_EXCLUSIVE
This agent must not run at the same time as any other agent.
#define SAG_LOCAL
This agent should only run on localhost.
void event_loop_terminate()
Stops the event loop from executing.
void event_loop_destroy()
Frees any memory associated with the event queue.
Event handling operations.
char * fo_config_get_list(fo_conf *conf, char *group, char *key, int idx, GError **error)
int fo_config_list_length(fo_conf *conf, char *group, char *key, GError **error)
Gets the length of the list associated with a particular list key.
void fo_config_free(fo_conf *conf)
Frees the memory associated with the internal configuration data structures.
fo_conf * fo_config_load(char *rawname, GError **error)
Load the configuration information from the provided file.
char * fo_config_get(fo_conf *conf, const char *group, const char *key, GError **error)
Gets an element based on its group name and key name. If the group or key is not found,...
int fo_config_has_group(fo_conf *conf, char *group)
Checks if the currently parsed configuration file has a specific group.
void fo_config_join(fo_conf *dst, fo_conf *src, GError **error)
Takes all groups and key from a fo_conf and adds them to another.
char ** fo_config_key_set(fo_conf *conf, char *group, int *length)
Gets the set of key names for a particular group.
int fo_config_has_key(fo_conf *conf, char *group, char *key)
Checks if the a specific group in the currently parsed configuration file has a specific key.
FOSSology library to read config file.
@ fo_missing_file
File is missing.
void host_insert(host_t *host, scheduler_t *scheduler)
Inserts a new host into the scheduler structure.
host_t * host_init(char *name, char *address, char *agent_dir, int max)
Creates a new host, and adds it to the host list.
void host_destroy(host_t *host)
Frees and uninitializes any memory associated with the host struct.
host_t * get_host(GList **queue, uint8_t num)
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
job_t * next_job(GSequence *job_queue)
Gets the next job from the job queue.
job_t * peek_job(GSequence *job_queue)
Gets the job that is at the top of the queue if there is one.
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
Used to compare two different jobs in the priority queue.
void job_destroy(job_t *job)
uint32_t active_jobs(GTree *job_list)
Gets the number of jobs that are not paused.
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
char * fo_RepValidate(fo_conf *config)
validates the repository configuration information.
log_t * log_new(gchar *log_name, gchar *pro_name, pid_t pro_pid)
Creates a new log.
void log_destroy(log_t *log)
Free memory associated with the log file.
void database_init(scheduler_t *scheduler)
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
void email_init(scheduler_t *scheduler)
Loads information about the email that will be sent for job notifications.
int verbose
The verbose level.
void scheduler_config_event(scheduler_t *scheduler, void *unused)
Load both the fossology configuration and all the agent configurations.
int closing
Set if scheduler is shutting down.
void scheduler_agent_config(scheduler_t *scheduler)
Loads a particular agents configuration file.
int scheduler_daemonize(scheduler_t *scheduler)
Daemonizes the scheduler.
GThread * main_thread
Pointer to the main thread.
void scheduler_version_refresh(scheduler_t *scheduler, void *unused)
Event run when the scheduler's own version changed.
gint string_is_num(gchar *str)
Checks if a string is entirely composed of numeric characters.
void g_tree_clear(GTree *tree)
Clears the contents of a GTree.
static gboolean isMaxLimitReached(meta_agent_t *agent)
Check if the current agent's max limit is respected.
void scheduler_close_event(scheduler_t *scheduler, void *killed)
Sets the closing flag and possibly kills all currently running agents.
void scheduler_test_agents(scheduler_t *scheduler, void *unused)
Event used when the scheduler tests the agents.
static gboolean g_tree_collect(gpointer key, gpointer value, gpointer data)
GTraverseFunc used by g_tree_clear to collect all the keys in a tree.
void scheduler_update(scheduler_t *scheduler)
Update function called after every event.
void scheduler_sig_handle(int signo)
Handles any signals sent to the scheduler that are not SIGCHLD.
void scheduler_clear_config(scheduler_t *scheduler)
Clears any information that is loaded when loading the configuration.
scheduler_t * scheduler_init(gchar *sysconfigdir, log_t *log)
Create a new scheduler object.
gint int_compare(gconstpointer a, gconstpointer b, gpointer user_data)
void scheduler_foss_config(scheduler_t *scheduler)
Loads the configuration data from fossology.conf.
gint string_compare(gconstpointer a, gconstpointer b, gpointer user_data)
static void reap_stale_jobs(scheduler_t *scheduler)
Reap CHECKEDOUT jobs that never had an agent spawned.
void set_usr_grp(gchar *process_name, fo_conf *config)
static gboolean collect_stale_jobs(gpointer key, gpointer val, gpointer data)
GTraverseFunc: collect CHECKEDOUT jobs with no agents that have been waiting longer than CONF_agent_u...
static gboolean version_refresh_kill_agent(int *pid_ptr, agent_t *agent, version_refresh_ctx *ctx)
GTraverseFunc: respawn a not-yet-working agent on the new binary.
#define TEST_ERROR(error,...)
int kill_scheduler(int force)
Kills all other running scheduler.
void scheduler_destroy(scheduler_t *scheduler)
Free any memory associated with a scheduler_t.
void scheduler_signal(scheduler_t *scheduler)
Function that handles certain signals being delivered to the scheduler.
Header file for the scheduler.
#define SELECT_DECLS(type, name, l_op, w_op, val)
#define CONF_VARIABLES_TYPES(apply)
#define AGENT_CONF
Agent conf location.
agent_status status
the state of execution the agent is currently in
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
uint8_t return_code
what was returned by the agent when it disconnected
pid_t pid
the pid of the process this agent is running in
int running
The number of agents currently running on this host.
int max
The max number of agents that can run on this host.
int32_t id
The identifier for this job.
job_status status
The current status for the job.
char * required_host
If not NULL, this job must run on a specific host machine.
GList * finished_agents
The list of agents that have completed their tasks.
gchar * message
Message that will be sent with job notification email.
time_t checkedout_at
Timestamp when job entered JB_CHECKEDOUT (for stale detection grace period)
GList * failed_agents
The list of agents that failed while working.
char * agent_type
The type of agent used to analyze the data.
gchar * data
The data associated with this job (jq_args)
GList * running_agents
The list of agents assigned to this job that are still working.
gboolean s_pause
Has the scheduler been paused.
GThread * server
Thread that is listening to the server socket.
GTree * job_list
List of jobs that have been created.
gchar * email_header
The beginning of the email message.
gchar * email_command
The command that will sends emails, usually mailx.
GRegex * parse_interface_cmd
Parses the commands received by the interface.
gchar * sysconfigdir
The system directory that contain fossology.conf.
PGconn * db_conn
The database connection.
gboolean s_startup
Has the scheduler finished startup tests.
GList * host_queue
Round-robin queue for choosing which host use next.
GTree * host_list
List of all hosts available to the scheduler.
log_t * main_log
The main log file for the scheduler.
gboolean s_pid
The pid of the scheduler process.
GRegex * parse_agent_msg
Parses messages coming from the agents.
gchar * logdir
The directory to put the log file in.
GTree * meta_agents
List of all meta agents available to the scheduler.
GThreadPool * workers
Threads to handle incoming network communication.
gboolean default_footer
Is the footer the default footer.
gchar * process_name
The name of the scheduler process.
gboolean i_terminate
Has the interface been terminated.
GTree * agents
List of any currently running agents.
gboolean logcmdline
Was the log file set by the command line.
gchar * email_subject
The subject to be used for emails.
GCancellable * cancel
Used to stop the listening thread when it is running.
GRegex * parse_db_email
Parses database email text.
gboolean i_created
Has the interface been created.
gchar * email_footer
The end of the email message.
GSequence * job_queue
heap of jobs that still need to be started
gchar * scheduler_version
The version string read from VERSION file at last (re)load.
gboolean s_daemon
Is the scheduler being run as a daemon.
gboolean default_header
Is the header the default header.
gchar * host_url
The url that is used to get to the FOSSology instance.
fo_conf * sysconfig
Configuration information loaded from the configuration file.
uint16_t i_port
The port that the scheduler is listening on.
Context for collect_stale_jobs traversal: carries current time so each node can apply a per-job grace...
GList * list
Accumulates stale job_t pointers.
time_t now
Current time snapshot for the traversal.
Context structure for the version-refresh tree traversal.