13 #include <libfossrepo.h>
23 #include <sys/types.h>
26 #include <sys/resource.h>
32 #define TEST_NULV(j) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return; }
33 #define TEST_NULL(j, ret) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return ret; }
34 #define MAX_SQL 512;JOB_STATUS_TYPES
44 #define SELECT_STRING(passed) MK_STRING_LIT(JOB_##passed),
45 const char* job_status_strings[] = { JOB_STATUS_TYPES(
SELECT_STRING) };
82 gchar* status_str = g_strdup_printf(
83 "job:%d status:%s type:%s, priority:%d running:%d finished:%d failed:%d\n",
85 job_status_strings[job->
status],
92 V_JOB(
"JOB_STATUS: %s", status_str);
93 g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
113 V_JOB(
"JOB[%d]: job status changed: %s => %s\n",
114 job->
id, job_status_strings[job->
status], job_status_strings[new_status]);
135 static gint
job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
137 return ((
job_t*)a)->priority - ((
job_t*)b)->priority;
165 char* type,
char* host,
int id,
int parent_id,
int user_id,
int group_id,
int priority,
char *jq_cmd_args)
175 job->
status = JB_CHECKEDOUT;
189 g_tree_insert(job_list, &job->
id, job);
190 if(
id >= 0) g_sequence_insert_sorted(job_queue, job,
job_compare, NULL);
210 g_mutex_lock(job->
lock);
211 g_mutex_unlock(job->
lock);
212 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
213 g_mutex_clear(job->
lock);
215 g_mutex_free(job->
lock);
268 const char end[] =
"end\n";
269 GError* error = NULL;
276 memset(buf,
'\0',
sizeof(buf));
277 sprintf(buf,
"scheduler:%d revision:%s daemon:%d jobs:%d log:%s port:%d verbose:%d\n",
282 g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
287 job_t* stat = g_tree_lookup(scheduler->
job_list, ¶ms->second);
294 sprintf(buf,
"ERROR: invalid job id = %d\n", params->second);
295 g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
299 g_output_stream_write(params->first, end,
sizeof(end), NULL, NULL);
315 job_t* job = params->first;
319 if(params->first == NULL)
321 tmp_job.
id = params->second;
322 tmp_job.
status = JB_NOT_AVAILABLE;
345 job_t* job = params->first;
351 tmp_job.
id = params->second;
352 tmp_job.
status = JB_PAUSED;
360 if(job->
status != JB_PAUSED)
362 ERROR(
"attempt to restart job %d failed, job status was %s",
363 job->
id, job_status_strings[job->
status]);
390 ((
job_t*)params->first)->priority = params->second;
391 for(iter = ((
job_t*)params->first)->running_agents; iter; iter = iter->next)
392 setpriority(PRIO_PROCESS, ((
agent_t*)iter->data)->pid, params->second);
410 if(job->
status != JB_FAILED)
415 V_JOB(
"JOB[%d]: job failed, killing agents\n", job->
id);
459 V_JOB(
"JOB[%d]: job removed from system\n", job->
id);
462 ((
agent_t*)curr->data)->owner = NULL;
463 for(curr = job->
failed_agents; curr != NULL; curr = curr->next)
464 ((
agent_t*)curr->data)->owner = NULL;
466 ((
agent_t*)curr->data)->owner = NULL;
468 g_tree_remove(job_list, &job->
id);
513 job->
data = g_strdup(data);
539 if(((
agent_t*)iter->data)->status != AG_PAUSED)
542 if(job->
status != JB_PAUSED && job->
status != JB_COMPLETE && finished)
549 aprintf(iter->data,
"CLOSE\n");
578 if(job->
status == JB_CHECKEDOUT)
583 return (job->
idx == 0 && job->
data != NULL);
585 g_mutex_lock(job->
lock);
599 g_mutex_unlock(job->
lock);
621 g_mutex_lock(job->
lock);
626 g_mutex_unlock(job->
lock);
649 file_name = g_strdup_printf(
"%06d", job->
id);
654 ERROR(
"JOB[%d]: job unable to create log file: %s\n", job->
id, file_path);
660 V_JOB(
"JOB[%d]: job created log file:\n %s\n", job->
id, file_path);
683 job_t* retval = NULL;
684 GSequenceIter* beg = g_sequence_get_begin_iter(job_queue);
686 if(g_sequence_get_length(job_queue) != 0)
688 retval = g_sequence_get(beg);
689 g_sequence_remove(beg);
705 if(g_sequence_get_length(job_queue) == 0)
710 beg = g_sequence_get_begin_iter(job_queue);
711 return g_sequence_get(beg);
723 g_tree_foreach(job_list, (GTraverseFunc)
is_active, &count);
int aprintf(agent_t *agent, const char *fmt,...)
ssize_t agent_write(agent_t *agent, const void *buf, int count)
void agent_kill(agent_t *agent)
Unclean kill of an agent.
void agent_pause(agent_t *agent)
#define TEST_NULL(a, ret)
Test if paramater is NULL.
void agent_unpause(agent_t *agent)
#define TEST_NULV(a)
Test if paramater is NULL.
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Header file with agent related operations.
int verbose
The verbose flag for the cli.
char * fo_config_get(fo_conf *conf, const char *group, const char *key, GError **error)
Gets an element based on its group name and key name. If the group or key is not found,...
void job_fail_agent(job_t *job, void *agent)
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
static gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
Used to compare two different jobs in the priority queue.
void job_restart_event(scheduler_t *scheduler, arg_int *params)
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
job_t * next_job(GSequence *job_queue)
Gets the next job from the job queue.
void job_update(scheduler_t *scheduler, job_t *job)
static int is_active(int *job_id, job_t *job, int *counter)
Tests if a job is active.
static void job_transition(scheduler_t *scheduler, job_t *job, job_status new_status)
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
void job_set_data(scheduler_t *scheduler, job_t *job, char *data, int sql)
char * job_next(job_t *job)
job_t * peek_job(GSequence *job_queue)
Gets the job that is at the top of the queue if there is one.
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
void job_verbose_event(scheduler_t *scheduler, job_t *job)
void job_pause_event(scheduler_t *scheduler, arg_int *params)
Event to pause a job.
void job_status_event(scheduler_t *scheduler, arg_int *params)
Event to get the status of the scheduler or a specific job.
void job_destroy(job_t *job)
log_t * job_log(job_t *job)
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
void job_priority_event(scheduler_t *scheduler, arg_int *params)
void job_finish_agent(job_t *job, void *agent)
static int job_sstatus(int *job_id, job_t *job, GOutputStream *ostr)
Prints the jobs status to the output stream.
#define SELECT_STRING(passed)
uint32_t active_jobs(GTree *job_list)
Gets the number of jobs that are not paused.
FILE * fo_RepFwrite(char *Type, char *Filename)
Perform an fwrite. Also creates directories.
char * fo_RepMkPath(const char *Type, char *Filename)
Given a filename, construct the full path to the file.
log_t * log_new_FILE(FILE *log_file, gchar *log_name, gchar *pro_name, pid_t pro_pid)
Creates a log file structure based on an already created FILE*.
void log_destroy(log_t *log)
Free memory associated with the log file.
PGresult * database_exec(scheduler_t *scheduler, const char *sql)
Executes an sql statement for the scheduler.
void database_job_priority(scheduler_t *scheduler, job_t *job, int priority)
Changes the priority of a job queue entry in the database.
void database_update_job(scheduler_t *scheduler, job_t *job, job_status status)
Change the status of a job in the database.
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
void database_job_log(int j_id, char *log_name)
Enters the name of the log file for a job into the database.
Header file for the scheduler.
#define SafePQclear(pgres)
int32_t id
The identifier for this job.
GMutex * lock
Lock to maintain data integrity.
job_status status
The current status for the job.
uint32_t idx
The current index into the sql results.
int32_t group_id
The id of the group that created the job.
char * required_host
If not NULL, this job must run on a specific host machine.
GList * finished_agents
The list of agents that have completed their tasks.
gchar * message
Message that will be sent with job notification email.
gchar * jq_cmd_args
Command line arguments for this job.
int32_t user_id
The id of the user that created the job.
GList * failed_agents
The list of agents that failed while working.
char * agent_type
The type of agent used to analyze the data.
gchar * data
The data associated with this job (jq_args)
int32_t priority
Importance of the job, maps directory to unix priority.
int32_t parent_id
The identifier for the parent of this job (its queue id)
GList * running_agents
The list of agents assigned to this job that are still working.
PGresult * db_result
Results from the sql query (if any)
log_t * log
The log to print any agent logging messages to.
int32_t verbose
The verbose level for all of the agents in this job.
gchar * log_name
The name of the log file that will be printed to.
GTree * job_list
List of jobs that have been created.
gboolean s_pid
The pid of the scheduler process.
gboolean s_daemon
Is the scheduler being run as a daemon.
fo_conf * sysconfig
Configuration information loaded from the configuration file.
uint16_t i_port
The port that the scheduler is listening on.