FOSSology  4.4.0
Open Source License Compliance by Open Source Software
interface.c
Go to the documentation of this file.
1 /*
2  SPDX-FileCopyrightText: © 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3 
4  SPDX-License-Identifier: GPL-2.0-only
5 */
11 /* local includes */
12 #include <agent.h>
13 #include <database.h>
14 #include <event.h>
15 #include <interface.h>
16 #include <job.h>
17 #include <logging.h>
18 #include <scheduler.h>
19 
20 /* std library includes */
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <string.h>
24 #include <limits.h>
25 
26 /* unix library includes */
27 #include <fcntl.h>
28 #include <pthread.h>
29 #include <sys/stat.h>
30 #include <sys/types.h>
31 #include <unistd.h>
32 
33 /* glib includes */
34 #include <glib.h>
35 #include <gio/gio.h>
36 
37 #define FIELD_WIDTH 10
38 #define BUFFER_SIZE 1024
39 
40 #define netw g_output_stream_write
41 
42 #define PROXY_PROTOCOL "socks5"
43 #define PROXY_DEFAULT_PORT 1080
44 
45 /* ************************************************************************** */
46 /* **** Data Types ********************************************************** */
47 /* ************************************************************************** */
48 
53 typedef struct interface_connection
54 {
55  GSocketConnection* conn;
56  GInputStream* istr;
57  GOutputStream* ostr;
59 
60 /* ************************************************************************** */
61 /* **** Local Functions ***************************************************** */
62 /* ************************************************************************** */
63 
72  GSocketConnection* conn, GThreadPool* threads)
73 {
74  interface_connection* inter = g_new0(interface_connection, 1);
75 
76  inter->conn = conn;
77  inter->istr = g_io_stream_get_input_stream((GIOStream*)inter->conn);
78  inter->ostr = g_io_stream_get_output_stream((GIOStream*)inter->conn);
79  g_thread_pool_push(threads, inter, NULL);
80 
81  return inter;
82 }
83 
93 {
94  g_object_unref(inter->conn);
95  g_free(inter);
96 }
97 
127 {
128  GMatchInfo* regex_match;
129  job_t* job;
130  char buffer[BUFFER_SIZE];
131  char org[sizeof(buffer)];
132  char* arg1, * arg2, * arg3;
133  char* cmd;
134  arg_int* params;
135  int i;
136 
137  memset(buffer, '\0', sizeof(buffer));
138 
139  while(g_input_stream_read(conn->istr, buffer, sizeof(buffer), scheduler->cancel, NULL) > 0)
140  {
141  V_INTERFACE("INTERFACE: received \"%s\"\n", buffer);
142  /* convert all characters before first ' ' to lower case */
143  memcpy(org, buffer, sizeof(buffer));
144  for(cmd = buffer; *cmd; cmd++)
145  *cmd = g_ascii_tolower(*cmd);
146  g_regex_match(scheduler->parse_interface_cmd, buffer, 0, &regex_match);
147  cmd = g_match_info_fetch(regex_match, 1);
148 
149  if(cmd == NULL)
150  {
151  g_output_stream_write(conn->ostr, "Invalid command: \"", 18, NULL, NULL);
152  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
153  g_output_stream_write(conn->ostr, "\"\n", 2, NULL, NULL);
154  g_match_info_free(regex_match);
155  WARNING("INTERFACE: invalid command: \"%s\"", buffer);
156  continue;
157  }
158 
159  /* acknowledge that you have received the command */
160  V_INTERFACE("INTERFACE: send \"received\"\n");
161  g_output_stream_write(conn->ostr, "received\n", 9, NULL, NULL);
162 
163  /* command: "close"
164  *
165  * The interface has chosen to close the connection. Return the command
166  * in acknowledgment of the command and end this thread.
167  */
168  if(strcmp(cmd, "close") == 0)
169  {
170  g_output_stream_write(conn->ostr, "CLOSE\n", 6, NULL, NULL);
171  V_INTERFACE("INTERFACE: closing connection to user interface\n");
172 
173  g_match_info_free(regex_match);
174  g_free(cmd);
175  return;
176  }
177 
178  /* command: "stop"
179  *
180  * The interface has instructed the scheduler to shut down gracefully. The
181  * scheduler will wait for all currently executing agents to finish
182  * running, then exit the vent loop.
183  */
184  else if(strcmp(cmd, "stop") == 0)
185  {
186  g_output_stream_write(conn->ostr, "CLOSE\n", 6, NULL, NULL);
187  V_INTERFACE("INTERFACE: shutting down scheduler gracefully\n");
188  event_signal(scheduler_close_event, (void*)0);
189 
190  g_match_info_free(regex_match);
191  g_free(cmd);
192  return;
193  }
194 
195  /* command: "die"
196  *
197  * The interface has instructed the scheduler to shut down. The scheduler
198  * should acknowledge the command and proceed to kill all current executing
199  * agents and exit the event loop
200  */
201  else if(strcmp(cmd, "die") == 0)
202  {
203  g_output_stream_write(conn->ostr, "CLOSE\n", 6, NULL, NULL);
204  V_INTERFACE("INTERFACE: killing the scheduler\n");
205  event_signal(scheduler_close_event, (void*)1);
206 
207  g_match_info_free(regex_match);
208  g_free(cmd);
209  return;
210  }
211 
212  /* command: "load"
213  *
214  * The interface has requested information about the load that the different
215  * hosts are under. The scheduler should respond with the status of all the
216  * hosts.
217  */
218  else if(strcmp(cmd, "load") == 0)
219  {
220  print_host_load(scheduler->host_list, conn->ostr);
221  }
222 
223  /* command: "kill <job_id> <"message">"
224  *
225  * The interface has instructed the scheduler to kill and fail a particular
226  * job. Both arguments are required for this command.
227  *
228  * job_id: The jq_pk for the job that needs to be killed
229  * message: A message that will be in the email notification and the
230  * jq_endtext field of the job queue
231  */
232  else if(strcmp(cmd, "kill") == 0)
233  {
234  arg1 = g_match_info_fetch(regex_match, 3);
235  arg2 = g_match_info_fetch(regex_match, 8);
236 
237  if(arg1)
238  i = atoi(arg1);
239  if(arg1 == NULL || arg2 == NULL || strlen(arg1) == 0 || strlen(arg2) == 0)
240  {
241  g_free(cmd);
242  cmd = g_strdup_printf("Invalid kill command: \"%s\"\n", buffer);
243  g_output_stream_write(conn->ostr, cmd, strlen(cmd), NULL, NULL);
244  }
245  else if((job = g_tree_lookup(scheduler->job_list, &i)) == NULL)
246  {
247  arg3 = g_strdup_printf(jobsql_failed, arg2, i);
248  event_signal(database_exec_event, arg3);
249  }
250  else
251  {
252  if(job->message)
253  g_free(job->message);
254  job->message = strdup(((arg2 == NULL) ? "no message" : arg2));
255  event_signal(job_fail_event, job);
256  }
257 
258  g_free(arg1);
259  g_free(arg2);
260  }
261 
262  /* command: "pause <job_id>"
263  *
264  * The interface has instructed the scheduler to pause a job. This is used
265  * to free up resources on a particular host. The argument is required and
266  * is the jq_pk for the job that needs to be paused.
267  */
268  else if(strcmp(cmd, "pause") == 0)
269  {
270  arg1 = g_match_info_fetch(regex_match, 3);
271 
272  if(arg1 == NULL || strlen(arg1) == 0)
273  {
274  arg1 = g_strdup_printf("Invalid pause command: \"%s\"\n", buffer);
275  WARNING("received invalid pause command: %s", buffer);
276  g_output_stream_write(conn->ostr, arg1, strlen(arg1), NULL, NULL);
277  g_free(arg1);
278  }
279  else
280  {
281  params = g_new0(arg_int, 1);
282  params->second = atoi(arg1);
283  params->first = g_tree_lookup(scheduler->job_list, &params->second);
284  event_signal(job_pause_event, params);
285  g_free(arg1);
286  }
287  }
288 
289  /* command: "reload"
290  *
291  * The scheduler should reload its configuration information. This should
292  * be used if a change to an agent or fossology.conf has been made since
293  * the scheduler started running.
294  */
295  else if(strcmp(cmd, "reload") == 0)
296  {
297  event_signal(scheduler_config_event, NULL);
298  }
299 
300  /* command: "agents"
301  *
302  * The interface has requested a list of agents that the scheduler is able
303  * to run correctly.
304  */
305  else if(strcmp(cmd, "agents") == 0)
306  {
307  event_signal(list_agents_event, conn->ostr);
308  }
309 
310  /* command: "status [job_id]"
311  *
312  * fetches the status of the a particular job or the scheduler. The
313  * argument is not required for this command.
314  *
315  * with job_id:
316  * print job status followed by status of agent belonging to the job
317  * without job_id:
318  * print scheduler statsu followed by status of every job
319  */
320  else if(strcmp(cmd, "status") == 0)
321  {
322  arg1 = g_match_info_fetch(regex_match, 3);
323 
324  params = g_new0(arg_int, 1);
325  params->first = conn->ostr;
326  params->second = (arg1 == NULL) ? 0 : atoi(arg1);
327  event_signal(job_status_event, params);
328 
329  g_free(arg1);
330  }
331 
332  /* command: "restart <job_id>"
333  *
334  * The interface has instructed the scheduler to restart a job that has been
335  * paused. The argument for this command is required and is the jq_pk for
336  * the job that should be restarted.
337  */
338  else if(strcmp(cmd, "restart") == 0)
339  {
340  arg1 = g_match_info_fetch(regex_match, 3);
341 
342  if(arg1 == NULL)
343  {
344  arg1 = g_strdup(buffer);
345  WARNING("received invalid restart command: %s", buffer);
346  snprintf(buffer, sizeof(buffer) - 1,
347  "ERROR: Invalid restart command: %s\n", arg1);
348  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
349  g_free(arg1);
350  }
351  else
352  {
353  params = g_new0(arg_int, 1);
354  params->second = atoi(arg1);
355  params->first = g_tree_lookup(scheduler->job_list, &params->second);
356  event_signal(job_restart_event, params);
357  g_free(arg1);
358  }
359  }
360 
361  /* command: "verbose <job_id|level> [level]"
362  *
363  * The interface has either requested a change in a verbose level, or it
364  * has requested the current verbose level. This command can have no
365  * arguments, 1 argument or 2 arguments.
366  *
367  * no arguments: respond with the verbose level of the scheduler
368  * 1 argument: change the verbose level of the scheduler to the argument
369  * 2 arguments: change the verbose level of the job with the jq_pk of the
370  * first arguement to the second argument
371  */
372  else if(strcmp(cmd, "verbose") == 0)
373  {
374  arg1 = g_match_info_fetch(regex_match, 3);
375  arg2 = g_match_info_fetch(regex_match, 5);
376 
377  if(arg1 == NULL)
378  {
379  if(verbose < 8)
380  {
381  sprintf(buffer, "level: %d\n", verbose);
382  }
383  else
384  {
385  strcpy(buffer, "mask: h d i e s a j\nmask: ");
386  for(i = 1; i < 0x10000; i <<= 1)
387  strcat(buffer, i & verbose ? "1 " : "0 ");
388  strcat(buffer, "\n");
389  }
390  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
391  }
392  else if(arg2 == NULL)
393  {
394  verbose = atoi(arg1);
395  g_free(arg1);
396  }
397  else
398  {
399  i = atoi(arg1);
400  if((job = g_tree_lookup(scheduler->job_list, &i)) == NULL)
401  {
402  g_free(cmd);
403  cmd = g_strdup_printf("Invalid verbose command: \"%s\"\n", buffer);
404  g_output_stream_write(conn->ostr, cmd, strlen(cmd), NULL, NULL);
405  }
406  else
407  {
408  job->verbose = atoi(arg2);
409  event_signal(job_verbose_event, job);
410  }
411 
412  g_free(arg1);
413  g_free(arg2);
414  }
415  }
416 
417  /* command: "priority <job_id> <level>"
418  *
419  * Scheduler should change the priority of a job. This will change the
420  * systems priority of the relevant job and change the priority of the job
421  * in the database to match. Both arguments are required for this command.
422  */
423  else if(strcmp(cmd, "priority") == 0)
424  {
425  arg1 = g_match_info_fetch(regex_match, 3);
426  arg2 = g_match_info_fetch(regex_match, 5);
427 
428  if(arg1 != NULL && arg2 != NULL)
429  {
430  i = atoi(arg1);
431 
432  params = g_new0(arg_int, 1);
433  params->first = g_tree_lookup(scheduler->job_list, &i);
434  params->second = atoi(arg2);
435  event_signal(job_priority_event, params);
436  g_free(arg1);
437  g_free(arg2);
438  }
439  else
440  {
441  if(arg1) g_free(arg1);
442  if(arg2) g_free(arg2);
443 
444  arg1 = g_strdup(buffer);
445  WARNING("Invalid priority command: %s\n", buffer);
446  snprintf(buffer, sizeof(buffer) - 1,
447  "ERROR: Invalid priority command: %s\n", arg1);
448  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
449  g_free(arg1);
450  }
451  }
452 
453  /* command: "database"
454  *
455  * The scheduler should check the database. This will normaly be sent by
456  * the ui when a new job has been queue and must be run.
457  */
458  else if(strcmp(cmd, "database") == 0)
459  {
460  event_signal(database_update_event, NULL);
461  }
462 
463  /* command: unknown
464  *
465  * The command sent does not match any of the known commands, log an error
466  * and inform the interface that this wasn't a command.
467  */
468  else
469  {
470  g_output_stream_write(conn->ostr, "Invalid command: \"", 18, NULL, NULL);
471  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
472  g_output_stream_write(conn->ostr, "\"\n", 2, NULL, NULL);
473  con_printf(main_log, "ERROR %s.%d: Interface received invalid command: %s\n", __FILE__, __LINE__, cmd);
474  }
475 
476  g_match_info_free(regex_match);
477  g_free(cmd);
478  memset(buffer, '\0', sizeof(buffer));
479  }
480 
482  return;
483 }
484 
495 {
496  GSocketListener* server_socket;
497  GSocketConnection* new_connection;
498  GError* error = NULL;
499 
500  /* validate new thread */
501  if(scheduler->i_terminate || !scheduler->i_created)
502  {
503  ERROR("Could not create server socket thread\n");
504  return (void*)0;
505  }
506 
507  /* create the server socket to listen for connections on */
508  server_socket = g_socket_listener_new();
509  if(server_socket == NULL)
510  FATAL("could not create the server socket");
511 
512  g_socket_listener_add_inet_port(server_socket, scheduler->i_port, NULL, &error);
513  if(error)
514  FATAL("[port:%d]: %s", scheduler->i_port, error->message);
515  scheduler->cancel = g_cancellable_new();
516 
517  V_INTERFACE("INTERFACE: listening port is %d\n", scheduler->i_port);
518 
519  /* wait for new connections */
520  for(;;)
521  {
522  new_connection = g_socket_listener_accept(server_socket, NULL,
523  scheduler->cancel, &error);
524 
525  if(scheduler->i_terminate)
526  break;
527  V_INTERFACE("INTERFACE: new interface connection\n");
528  if(error)
529  FATAL("INTERFACE closing for %s", error->message);
530 
531  interface_conn_init(new_connection, scheduler->workers);
532  }
533 
534  V_INTERFACE("INTERFACE: socket listening thread closing\n");
535  g_socket_listener_close(server_socket);
536  g_object_unref(server_socket);
537  return (void*)1;
538 }
539 
540 /* ************************************************************************** */
541 /* **** Constructor Destructor ********************************************** */
542 /* ************************************************************************** */
543 
554 void interface_init(scheduler_t* scheduler)
555 {
556  if(!scheduler->i_created)
557  {
558  scheduler->i_created = 1;
559  scheduler->i_terminate = 0;
560 
561  scheduler->cancel = NULL;
562  scheduler->workers = g_thread_pool_new((GFunc)interface_thread,
563  scheduler, CONF_interface_nthreads, FALSE, NULL);
564 
565 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
566  scheduler->server = g_thread_new("interface",
567  (GThreadFunc)interface_listen_thread, scheduler);
568 #else
569  scheduler->server = g_thread_create((GThreadFunc)interface_listen_thread,
570  scheduler, TRUE, NULL);
571 #endif
572 
573  while(scheduler->cancel == NULL)
574  usleep(100);
575  }
576  else
577  {
578  WARNING("Multiple attempts made to initialize the interface");
579  }
580 }
581 
589 {
590  /* only destroy the interface if it has been created */
591  if(scheduler->i_created)
592  {
593  scheduler->i_terminate = 1;
594  scheduler->i_created = 0;
595 
596  g_cancellable_cancel(scheduler->cancel);
597  g_thread_join(scheduler->server);
598  g_thread_pool_free(scheduler->workers, FALSE, TRUE);
599 
600  scheduler->server = NULL;
601  scheduler->cancel = NULL;
602  scheduler->workers = NULL;
603  }
604  else
605  {
606  WARNING("Attempt to destroy the interface without initializing it");
607  }
608 }
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
Definition: agent.c:1118
Header file with agent related operations.
Event handling operations.
int verbose
The verbose flag for the cli.
Definition: fo_cli.c:38
#define BUFFER_SIZE
Maximum buffer length.
Definition: fossconfig.c:101
void print_host_load(GTree *host_list, GOutputStream *ostr)
Prints the host information to ostr.
Definition: host.c:174
static void interface_conn_destroy(interface_connection *inter)
Free the memory associated with an interface connection.
Definition: interface.c:92
static interface_connection * interface_conn_init(GSocketConnection *conn, GThreadPool *threads)
Definition: interface.c:71
void interface_thread(interface_connection *conn, scheduler_t *scheduler)
Function that will run the thread associated with a particular interface instance.
Definition: interface.c:126
struct interface_connection interface_connection
void interface_destroy(scheduler_t *scheduler)
Closes the server socket and thread pool that service UI connections.
Definition: interface.c:588
void * interface_listen_thread(scheduler_t *scheduler)
Function that will listen for new connections to the server sockets.
Definition: interface.c:494
void interface_init(scheduler_t *scheduler)
Create the interface thread and thread pool that handle UI connections.
Definition: interface.c:554
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:406
void job_restart_event(scheduler_t *scheduler, arg_int *params)
Definition: job.c:342
void job_verbose_event(scheduler_t *scheduler, job_t *job)
Definition: job.c:244
void job_pause_event(scheduler_t *scheduler, arg_int *params)
Event to pause a job.
Definition: job.c:312
void job_status_event(scheduler_t *scheduler, arg_int *params)
Event to get the status of the scheduler or a specific job.
Definition: job.c:266
void job_priority_event(scheduler_t *scheduler, arg_int *params)
Definition: job.c:385
char buffer[2048]
The last thing received from the scheduler.
log_t * main_log
Definition: logging.c:33
Log related operations.
#define ERROR(...)
Definition: logging.h:79
#define FATAL(...)
Definition: logging.h:63
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:841
void database_exec_event(scheduler_t *scheduler, char *sql)
Definition: database.c:827
void scheduler_config_event(scheduler_t *scheduler, void *unused)
Load both the fossology configuration and all the agent configurations.
Definition: scheduler.c:991
void scheduler_close_event(scheduler_t *scheduler, void *killed)
Sets the closing flag and possibly kills all currently running agents.
Definition: scheduler.c:1014
Header file for the scheduler.
const char * jobsql_failed
Definition: event.h:46
GOutputStream * ostr
Stream to write to the interface.
Definition: interface.c:57
GInputStream * istr
Stream to read from the interface.
Definition: interface.c:56
GSocketConnection * conn
The socket that is our connection.
Definition: interface.c:55
The job structure.
Definition: job.h:51
gchar * message
Message that will be sent with job notification email.
Definition: job.h:69
int32_t verbose
The verbose level for all of the agents in this job.
Definition: job.h:71
GThread * server
Thread that is listening to the server socket.
Definition: scheduler.h:167
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:172
GRegex * parse_interface_cmd
Parses the commands received by the interface.
Definition: scheduler.h:188
GTree * host_list
List of all hosts available to the scheduler.
Definition: scheduler.h:160
GThreadPool * workers
Threads to handle incoming network communication.
Definition: scheduler.h:168
gboolean i_terminate
Has the interface been terminated.
Definition: scheduler.h:165
GCancellable * cancel
Used to stop the listening thread when it is running.
Definition: scheduler.h:169
gboolean i_created
Has the interface been created.
Definition: scheduler.h:164
uint16_t i_port
The port that the scheduler is listening on.
Definition: scheduler.h:166