FOSSology  4.5.1
Open Source License Compliance by Open Source Software
Scanners.py
1 #!/usr/bin/env python3
2 
3 # SPDX-FileCopyrightText: © 2023,2025 Siemens AG
4 # SPDX-FileContributor: Gaurav Mishra <mishra.gaurav@siemens.com>
5 
6 # SPDX-License-Identifier: GPL-2.0-only
7 
8 import fnmatch
9 import json
10 import multiprocessing
11 import os
12 from subprocess import Popen, PIPE
13 from typing import Any
14 
15 from .CliOptions import CliOptions
16 from .Packages import Packages
17 
18 
19 class ScanResult:
20  """
21  Store scan results from agents.
22 
23  :ivar file: File location
24  :ivar path: Actual location of file
25  :ivar result: License list for file
26  """
27  file: str = None
28  path: str = None
29  result: set[str] = None
30 
31  def __init__(self, file: str, path: str, result: set[str]):
32  self.filefile = file
33  self.pathpath = path
34  self.resultresult = result
35 
36 
38  """
39  Store scan results from agents with result as a list of dictionaries.
40 
41  :ivar file: File location
42  :ivar path: Actual location of file
43  :ivar result: License list for file as a list of dictionaries
44  """
45  file: str = None
46  path: str = None
47  result: list[dict] = None
48 
49  def __init__(self, file: str, path: str, result: list[dict]):
50  self.filefilefile = file
51  self.pathpathpath = path
52  self.resultresultresult = result
53 
54 
55 class Scanners:
56  """
57  Handle all the data from different scanners.
58 
59  :ivar nomos_path: path to nomos bin
60  :ivar copyright_path: path to copyright bin
61  :ivar keyword_path: path to keyword bin
62  :ivar ojo_path: path to ojo bin
63  :ivar cli_options: CliOptions object
64  """
65  nomos_path: str = '/bin/nomossa'
66  copyright_path: str = '/bin/copyright'
67  keyword_path: str = '/bin/keyword'
68  ojo_path: str = '/bin/ojo'
69 
70  def __init__(self, cli_options: CliOptions, scan_packages: Packages):
71  """
72  Initialize the cli_options
73 
74  :param cli_options: CliOptions object to use
75  :type cli_options: CliOptions
76  :param scan_packages: ScanPackages for references
77  :type scan_packages: Packages
78  """
79  self.cli_options: CliOptions = cli_options
80  self.scan_packages: Packages = scan_packages
81  self._allowlist_licenses_set_allowlist_licenses_set = set(
82  self.cli_options.allowlist.get('licenses', [])
83  )
84 
85  def get_scan_packages(self) -> Packages:
86  return self.scan_packages
87 
88  def is_excluded_path(self, path: str) -> bool:
89  """
90  Check if the path is allow listed
91 
92  The function used fnmatch to check if the path is in allow list or not.
93 
94  :param path: path to check
95  :return: True if the path is in allow list, False otherwise
96  """
97  for pattern in self.cli_options.allowlist.get('exclude', []):
98  if fnmatch.fnmatchcase(path, pattern):
99  return True
100  return False
101 
102  def __normalize_path(self, path: str, against: str) -> str:
103  """
104  Normalize the given path against the given directory.
105 
106  :param path: path to normalize
107  :param against: directory to normalize against
108  :return: Normalized path
109  """
110  if not against.endswith(os.sep):
111  against += os.sep
112  start_index_of_prefix = path.find(against)
113  if start_index_of_prefix == -1:
114  return path
115 
116  relative_path_start_index = start_index_of_prefix + len(against)
117  return path[relative_path_start_index:]
118 
120  self, scanner_path: str, dir_to_scan: str, extra_args: list[str] = None
121  ) -> dict:
122  """
123  Helper to execute a scanner command and return its JSON output.
124  """
125  command = [scanner_path, "-J", "-d", dir_to_scan]
126  if extra_args:
127  command.extend(extra_args)
128 
129  try:
130  # Use text=True for universal newlines and automatic decoding
131  process = Popen(command, stdout=PIPE, text=True, encoding='UTF-8')
132  stdout, stderr = process.communicate()
133 
134  if process.returncode != 0:
135  msg = (f"Scanner {scanner_path} exited with error code "
136  f"{process.returncode}. Stderr: {stderr}")
137  print(msg)
138  raise RuntimeError(msg)
139 
140  # Handle potential empty or malformed JSON output
141  if not stdout.strip():
142  return {}
143 
144  return json.loads(stdout.strip())
145  except FileNotFoundError as e:
146  print(f"Error: Scanner executable not found at {scanner_path}")
147  raise e
148  except json.JSONDecodeError as e:
149  print(f"Error: Failed to decode JSON from scanner {scanner_path} output.")
150  print(f"Raw output: {stdout}")
151  raise e
152  except Exception as e:
153  print(f"An unexpected error occurred while running {scanner_path}: {e}")
154  raise e
155 
156  def __get_nomos_result(self, dir_to_scan: str) -> dict:
157  """
158  Get the raw results from nomos scanner
159 
160  :return: raw json from nomos
161  """
162  extra_args = ["-S", "-l", "-n", str(multiprocessing.cpu_count() - 1)]
163  return self._execute_scanner_command_execute_scanner_command(
164  self.nomos_path, dir_to_scan, extra_args
165  )
166 
167  def __get_ojo_result(self, dir_to_scan: str) -> dict:
168  """
169  Get the raw results from ojo scanner
170 
171  :return: raw json from ojo
172  """
173  return self._execute_scanner_command_execute_scanner_command(self.ojo_path, dir_to_scan)
174 
175  def __get_copyright_results(self, dir_to_scan: str) -> dict:
176  """
177  Get the raw results from copyright scanner
178 
179  :return: raw json from copyright
180  """
181  return self._execute_scanner_command_execute_scanner_command(self.copyright_path, dir_to_scan)
182 
183  def __get_keyword_results(self, dir_to_scan: str) -> dict:
184  """
185  Get the raw results from keyword scanner
186 
187  :return: raw json from keyword
188  """
189  return self._execute_scanner_command_execute_scanner_command(self.keyword_path, dir_to_scan)
190 
192  self, component: dict, is_parent: bool, scanner_func: callable,
193  result_key: str, whole: bool = False, all_results: bool = False
194  ) -> list[ScanResult] | list[ScanResultList]:
195  """
196  Generalized function to process results from a single scanner for a given
197  component. Set `result_key` to 'results' for copyrights and 'licenses' for
198  license scanning.
199  """
200  dir_to_scan = self.cli_options.diff_dir if is_parent else os.path.join(
201  component['download_dir'], component['base_dir']
202  )
203 
204  raw_results = scanner_func(dir_to_scan)
205  processed_list: list[ScanResult] | list[ScanResultList] = []
206  raw_results_list: list[
207  dict[str, str | list[dict[str, str | int]] | None]] = []
208 
209  if isinstance(raw_results, dict):
210  if 'results' in raw_results:
211  raw_results_list = raw_results['results']
212  elif isinstance(raw_results, list):
213  raw_results_list = raw_results
214 
215  if not raw_results_list:
216  return processed_list
217 
218  for result_entry in raw_results_list:
219  # Skip if 'file' or 'results'/'licenses' key is missing or malformed
220  if (
221  'file' not in result_entry
222  or result_key not in result_entry
223  or result_entry.get(result_key) == "Unable to read file"
224  ):
225  continue
226 
227  file_path = self.__normalize_path__normalize_path(result_entry['file'], dir_to_scan)
228 
229  if self.cli_options.repo and not all_results and self.is_excluded_pathis_excluded_path(
230  file_path
231  ):
232  continue
233 
234  current_findings: set[str] | list[dict[str, Any]] = set() if not whole \
235  else []
236 
237  findings_list = result_entry.get(result_key, None)
238  if findings_list is None:
239  continue
240 
241  for finding in findings_list:
242  if finding is None:
243  continue
244 
245  if whole:
246  # Need whole JSON for ScanResultList
247  if (
248  result_key == 'results'
249  and 'type' in finding
250  and finding['type'] == 'statement'
251  and finding.get('content')
252  ):
253  current_findings.append(finding)
254  elif (
255  result_key == 'licenses'
256  and finding.get('license') != "No_license_found"
257  ):
258  current_findings.append(finding)
259  else:
260  # Need set of string for ScanResult
261  content = finding.get('content') or finding.get('license')
262  content = content.strip()
263  if (
264  result_key == 'results'
265  and 'type' in finding
266  and finding['type'] != 'statement'
267  ):
268  continue
269 
270  if content and content != "No_license_found":
271  current_findings.add(content)
272 
273  if (whole and current_findings) or (not whole and current_findings):
274  if whole:
275  processed_list.append(
276  ScanResultList(file_path, result_entry['file'], current_findings)
277  )
278  else:
279  processed_list.append(
280  ScanResult(file_path, result_entry['file'], current_findings)
281  )
282 
283  return processed_list
284 
286  self, all_results: bool = False, whole: bool = False
287  ) -> None:
288  """
289  Set the formatted results from copyright scanner for the components.
290  """
291  if not self.cli_options.scan_only_deps:
292  self.scan_packages.parent_package[
293  'COPYRIGHT_RESULT'] = self._process_single_scanner_package_process_single_scanner_package(
294  component=self.scan_packages.parent_package, is_parent=True,
295  scanner_func=self.__get_copyright_results__get_copyright_results, result_key='results',
296  whole=whole, all_results=all_results
297  )
298  for purl in self.scan_packages.dependencies.keys():
299  component = self.scan_packages.dependencies[purl]
300  component['COPYRIGHT_RESULT'] = self._process_single_scanner_package_process_single_scanner_package(
301  component=component, is_parent=False,
302  scanner_func=self.__get_copyright_results__get_copyright_results, result_key='results',
303  whole=whole, all_results=all_results
304  )
305 
306  def set_keyword_list(self, whole: bool = False) -> None:
307  """
308  Get the formatted results from keyword scanner
309  """
310  if not self.cli_options.scan_only_deps:
311  self.scan_packages.parent_package[
312  'KEYWORD_RESULT'] = self._process_single_scanner_package_process_single_scanner_package(
313  component=self.scan_packages.parent_package, is_parent=True,
314  scanner_func=self.__get_keyword_results__get_keyword_results, result_key='results',
315  whole=whole
316  )
317  for purl in self.scan_packages.dependencies.keys():
318  component = self.scan_packages.dependencies[purl]
319  component['KEYWORD_RESULT'] = self._process_single_scanner_package_process_single_scanner_package(
320  component=component, is_parent=False,
321  scanner_func=self.__get_keyword_results__get_keyword_results, result_key='results',
322  whole=whole
323  )
324 
325  def __set_license_nomos(self, whole: bool = False) -> None:
326  """
327  Update the packages with formatted results of nomos scanner
328  """
329  if not self.cli_options.scan_only_deps:
330  self.scan_packages.parent_package[
331  'NOMOS_RESULT'] = self._process_single_scanner_package_process_single_scanner_package(
332  component=self.scan_packages.parent_package, is_parent=True,
333  scanner_func=self.__get_nomos_result__get_nomos_result, result_key='licenses', whole=whole
334  )
335  for purl in self.scan_packages.dependencies.keys():
336  component = self.scan_packages.dependencies[purl]
337  component['NOMOS_RESULT'] = self._process_single_scanner_package_process_single_scanner_package(
338  component=component, is_parent=False,
339  scanner_func=self.__get_nomos_result__get_nomos_result, result_key='licenses', whole=whole
340  )
341 
342  def __set_license_ojo(self, whole: bool = False) -> None:
343  """
344  Update the packages with formatted results of ojo scanner
345  """
346  if not self.cli_options.scan_only_deps:
347  self.scan_packages.parent_package[
348  'OJO_RESULT'] = self._process_single_scanner_package_process_single_scanner_package(
349  component=self.scan_packages.parent_package, is_parent=True,
350  scanner_func=self.__get_ojo_result__get_ojo_result, result_key='licenses', whole=whole
351  )
352  for purl in self.scan_packages.dependencies.keys():
353  component = self.scan_packages.dependencies[purl]
354  component['OJO_RESULT'] = self._process_single_scanner_package_process_single_scanner_package(
355  component=component, is_parent=False,
356  scanner_func=self.__get_ojo_result__get_ojo_result, result_key='licenses', whole=whole
357  )
358 
360  self, nomos_licenses: list[ScanResult], ojo_licenses: list[ScanResult]
361  ) -> list[ScanResult]:
362  """
363  Merge the results from nomos and ojo based on file name
364  """
365  nomos_dict = {entry.file: entry for entry in nomos_licenses}
366 
367  for ojo_entry in ojo_licenses:
368  if ojo_entry.file in nomos_dict:
369  nomos_dict[ojo_entry.file].result.update(ojo_entry.result)
370  else:
371  # If an ojo entry doesn't have a corresponding nomos entry, add it
372  nomos_licenses.append(ojo_entry)
373  return nomos_licenses
374 
376  self, scan_results: list[ScanResult] = None,
377  scan_results_whole: list[ScanResultList] = None, whole: bool = False
378  ) -> list[ScanResult] | list[ScanResultList]:
379  """
380  Get results where license check failed.
381  """
382  final_results = []
383  if whole and scan_results_whole is not None:
384  for row in scan_results_whole:
385  if self.cli_options.repo and self.is_excluded_pathis_excluded_path(row.file):
386  continue
387 
388  # Filter licenses that are NOT in the allowlist
389  failed_licenses_list = [
390  lic for lic in row.result if
391  lic.get('license') not in self._allowlist_licenses_set_allowlist_licenses_set
392  ]
393  if failed_licenses_list:
394  final_results.append(
395  ScanResultList(row.file, row.path, failed_licenses_list)
396  )
397  elif not whole and scan_results is not None:
398  for row in scan_results:
399  if self.cli_options.repo and self.is_excluded_pathis_excluded_path(row.file):
400  continue
401 
402  # Filter licenses that are NOT in the allowlist
403  failed_licenses = {
404  lic for lic in row.result if
405  lic not in self._allowlist_licenses_set_allowlist_licenses_set
406  }
407  if failed_licenses:
408  final_results.append(ScanResult(row.file, row.path, failed_licenses))
409  return final_results
410 
411  def get_non_allow_listed_copyrights(self) -> list[ScanResult]:
412  """
413  Get copyrights from files which are not allow listed.
414  """
415  copyright_results = self.get_copyright_resultsget_copyright_results()
416  return [row for row in copyright_results if
417  self.cli_options.repo and not self.is_excluded_pathis_excluded_path(row.file)]
418 
419  def get_copyright_results(self) -> list[ScanResultList]:
420  """
421  Get list of copyright scan results from the package list.
422  """
423  copyright_results = []
424  copyright_results.extend(
425  self.scan_packages.parent_package.get('COPYRIGHT_RESULT', [])
426  )
427  for dep in self.scan_packages.dependencies.values():
428  copyright_results.extend(dep.get('COPYRIGHT_RESULT', []))
429  return copyright_results
430 
431  def get_keyword_results(self) -> list[ScanResultList]:
432  """
433  Get list of keywords scan results from the package list.
434  """
435  keyword_results = []
436  keyword_results.extend(
437  self.scan_packages.parent_package.get('KEYWORD_RESULT', [])
438  )
439  for dep in self.scan_packages.dependencies.values():
440  keyword_results.extend(dep.get('KEYWORD_RESULT', []))
441  return keyword_results
442 
443  def get_license_results(self) -> list[ScanResultList]:
444  """
445  Get list of license scan results from the package list.
446  """
447  scanner_results = []
448  scanner_results.extend(
449  self.scan_packages.parent_package.get('SCANNER_RESULTS', [])
450  )
451  for dep in self.scan_packages.dependencies.values():
452  scanner_results.extend(dep.get('SCANNER_RESULTS', []))
453  return scanner_results
454 
456  self, whole: bool = False
457  ) -> list[ScanResult] | list[ScanResultList]:
458  """
459  Get the formatted list of license scanner findings
460 
461  The list contains the merged result of nomos/ojo scanner based on
462  cli_options passed
463  """
464  scanner_results = self.get_license_resultsget_license_results()
465 
466  failed_licenses = self.get_non_allow_listed_resultsget_non_allow_listed_results(
467  scan_results_whole=scanner_results, whole=True
468  )
469 
470  if whole:
471  return failed_licenses
472  else:
473  # Convert ScanResultList to ScanResult for non-whole output
474  return [
475  ScanResult(
476  item.file, item.path,
477  {res['license'] for res in item.result if 'license' in res}
478  ) for item in failed_licenses
479  ]
480 
481  def set_scanner_results(self, whole: bool = False) -> None:
482  """
483  Set the key `SCANNER_RESULTS` for all components in scan_packages using
484  nomos and ojo scanners (whichever is selected).
485  """
486  if self.cli_options.nomos:
487  self.__set_license_nomos__set_license_nomos(whole)
488  if self.cli_options.ojo:
489  self.__set_license_ojo__set_license_ojo(whole)
490 
491  if self.cli_options.nomos and self.cli_options.ojo:
492  # Handle parent package separately
493  if whole:
494  self.scan_packages.parent_package[
495  'SCANNER_RESULTS'] = self.scan_packages.parent_package.get(
496  'NOMOS_RESULT', []
497  ) + self.scan_packages.parent_package.get('OJO_RESULT', [])
498  else:
499  self.scan_packages.parent_package[
500  'SCANNER_RESULTS'] = self.__merge_nomos_ojo__merge_nomos_ojo(
501  self.scan_packages.parent_package.get('NOMOS_RESULT', []),
502  self.scan_packages.parent_package.get('OJO_RESULT', [])
503  )
504  for purl in self.scan_packages.dependencies.keys():
505  component = self.scan_packages.dependencies[purl]
506  if whole:
507  # Concatenate lists for whole results
508  component['SCANNER_RESULTS'] = component.get(
509  'NOMOS_RESULT', []
510  ) + component.get(
511  'OJO_RESULT', []
512  )
513  else:
514  component['SCANNER_RESULTS'] = self.__merge_nomos_ojo__merge_nomos_ojo(
515  component.get('NOMOS_RESULT', []), component.get('OJO_RESULT', [])
516  )
517  else:
518  scanner_key = 'NOMOS_RESULT' if self.cli_options.nomos else 'OJO_RESULT'
519  # Handle parent package separately
520  self.scan_packages.parent_package[
521  'SCANNER_RESULTS'] = self.scan_packages.parent_package.get(
522  scanner_key, []
523  )
524  for purl in self.scan_packages.dependencies.keys():
525  component = self.scan_packages.dependencies[purl]
526  component['SCANNER_RESULTS'] = component.get(scanner_key, [])
list[ScanResultList] get_keyword_results(self)
Definition: Scanners.py:431
def __init__(self, CliOptions cli_options, Packages scan_packages)
Definition: Scanners.py:70
list[ScanResultList] get_license_results(self)
Definition: Scanners.py:443
dict __get_keyword_results(self, str dir_to_scan)
Definition: Scanners.py:183
None set_scanner_results(self, bool whole=False)
Definition: Scanners.py:481
None set_copyright_list(self, bool all_results=False, bool whole=False)
Definition: Scanners.py:287
dict __get_ojo_result(self, str dir_to_scan)
Definition: Scanners.py:167
dict __get_copyright_results(self, str dir_to_scan)
Definition: Scanners.py:175
list[ScanResult]|list[ScanResultList] _process_single_scanner_package(self, dict component, bool is_parent, callable scanner_func, str result_key, bool whole=False, bool all_results=False)
Definition: Scanners.py:194
dict _execute_scanner_command(self, str scanner_path, str dir_to_scan, list[str] extra_args=None)
Definition: Scanners.py:121
list[ScanResult]|list[ScanResultList] results_are_allow_listed(self, bool whole=False)
Definition: Scanners.py:457
str __normalize_path(self, str path, str against)
Definition: Scanners.py:102
dict __get_nomos_result(self, str dir_to_scan)
Definition: Scanners.py:156
None set_keyword_list(self, bool whole=False)
Definition: Scanners.py:306
list[ScanResult] __merge_nomos_ojo(self, list[ScanResult] nomos_licenses, list[ScanResult] ojo_licenses)
Definition: Scanners.py:361
None __set_license_nomos(self, bool whole=False)
Definition: Scanners.py:325
list[ScanResult]|list[ScanResultList] get_non_allow_listed_results(self, list[ScanResult] scan_results=None, list[ScanResultList] scan_results_whole=None, bool whole=False)
Definition: Scanners.py:378
None __set_license_ojo(self, bool whole=False)
Definition: Scanners.py:342
list[ScanResultList] get_copyright_results(self)
Definition: Scanners.py:419
list[ScanResult] get_non_allow_listed_copyrights(self)
Definition: Scanners.py:411
bool is_excluded_path(self, str path)
Definition: Scanners.py:88