10 import multiprocessing
12 from subprocess
import Popen, PIPE
13 from typing
import Any
15 from .CliOptions
import CliOptions
16 from .Packages
import Packages
21 Store scan results from agents.
23 :ivar file: File location
24 :ivar path: Actual location of file
25 :ivar result: License list for file
29 result: set[str] =
None
31 def __init__(self, file: str, path: str, result: set[str]):
39 Store scan results from agents with result as a list of dictionaries.
41 :ivar file: File location
42 :ivar path: Actual location of file
43 :ivar result: License list for file as a list of dictionaries
47 result: list[dict] =
None
49 def __init__(self, file: str, path: str, result: list[dict]):
57 Handle all the data from different scanners.
59 :ivar nomos_path: path to nomos bin
60 :ivar copyright_path: path to copyright bin
61 :ivar keyword_path: path to keyword bin
62 :ivar ojo_path: path to ojo bin
63 :ivar cli_options: CliOptions object
65 nomos_path: str =
'/bin/nomossa'
66 copyright_path: str =
'/bin/copyright'
67 keyword_path: str =
'/bin/keyword'
68 ojo_path: str =
'/bin/ojo'
70 def __init__(self, cli_options: CliOptions, scan_packages: Packages):
72 Initialize the cli_options
74 :param cli_options: CliOptions object to use
75 :type cli_options: CliOptions
76 :param scan_packages: ScanPackages for references
77 :type scan_packages: Packages
79 self.cli_options: CliOptions = cli_options
80 self.scan_packages: Packages = scan_packages
82 self.cli_options.allowlist.get(
'licenses', [])
85 def get_scan_packages(self) -> Packages:
86 return self.scan_packages
90 Check if the path is allow listed
92 The function used fnmatch to check if the path is in allow list or not.
94 :param path: path to check
95 :return: True if the path is in allow list, False otherwise
97 for pattern
in self.cli_options.allowlist.get(
'exclude', []):
98 if fnmatch.fnmatchcase(path, pattern):
104 Normalize the given path against the given directory.
106 :param path: path to normalize
107 :param against: directory to normalize against
108 :return: Normalized path
110 if not against.endswith(os.sep):
112 start_index_of_prefix = path.find(against)
113 if start_index_of_prefix == -1:
116 relative_path_start_index = start_index_of_prefix + len(against)
117 return path[relative_path_start_index:]
120 self, scanner_path: str, dir_to_scan: str, extra_args: list[str] =
None
123 Helper to execute a scanner command and return its JSON output.
125 command = [scanner_path,
"-J",
"-d", dir_to_scan]
127 command.extend(extra_args)
131 process = Popen(command, stdout=PIPE, text=
True, encoding=
'UTF-8')
132 stdout, stderr = process.communicate()
134 if process.returncode != 0:
135 msg = (f
"Scanner {scanner_path} exited with error code "
136 f
"{process.returncode}. Stderr: {stderr}")
138 raise RuntimeError(msg)
141 if not stdout.strip():
144 return json.loads(stdout.strip())
145 except FileNotFoundError
as e:
146 print(f
"Error: Scanner executable not found at {scanner_path}")
148 except json.JSONDecodeError
as e:
149 print(f
"Error: Failed to decode JSON from scanner {scanner_path} output.")
150 print(f
"Raw output: {stdout}")
152 except Exception
as e:
153 print(f
"An unexpected error occurred while running {scanner_path}: {e}")
158 Get the raw results from nomos scanner
160 :return: raw json from nomos
162 extra_args = [
"-S",
"-l",
"-n", str(multiprocessing.cpu_count() - 1)]
164 self.nomos_path, dir_to_scan, extra_args
169 Get the raw results from ojo scanner
171 :return: raw json from ojo
177 Get the raw results from copyright scanner
179 :return: raw json from copyright
185 Get the raw results from keyword scanner
187 :return: raw json from keyword
192 self, component: dict, is_parent: bool, scanner_func: callable,
193 result_key: str, whole: bool =
False, all_results: bool =
False
194 ) -> list[ScanResult] | list[ScanResultList]:
196 Generalized function to process results from a single scanner for a given
197 component. Set `result_key` to 'results' for copyrights and 'licenses' for
200 dir_to_scan = self.cli_options.diff_dir
if is_parent
else os.path.join(
201 component[
'download_dir'], component[
'base_dir']
204 raw_results = scanner_func(dir_to_scan)
205 processed_list: list[ScanResult] | list[ScanResultList] = []
206 raw_results_list: list[
207 dict[str, str | list[dict[str, str | int]] |
None]] = []
209 if isinstance(raw_results, dict):
210 if 'results' in raw_results:
211 raw_results_list = raw_results[
'results']
212 elif isinstance(raw_results, list):
213 raw_results_list = raw_results
215 if not raw_results_list:
216 return processed_list
218 for result_entry
in raw_results_list:
221 'file' not in result_entry
222 or result_key
not in result_entry
223 or result_entry.get(result_key) ==
"Unable to read file"
227 file_path = self.
__normalize_path__normalize_path(result_entry[
'file'], dir_to_scan)
229 if self.cli_options.repo
and not all_results
and self.
is_excluded_pathis_excluded_path(
234 current_findings: set[str] | list[dict[str, Any]] = set()
if not whole \
237 findings_list = result_entry.get(result_key,
None)
238 if findings_list
is None:
241 for finding
in findings_list:
248 result_key ==
'results'
249 and 'type' in finding
250 and finding[
'type'] ==
'statement'
251 and finding.get(
'content')
253 current_findings.append(finding)
255 result_key ==
'licenses'
256 and finding.get(
'license') !=
"No_license_found"
258 current_findings.append(finding)
261 content = finding.get(
'content')
or finding.get(
'license')
262 content = content.strip()
264 result_key ==
'results'
265 and 'type' in finding
266 and finding[
'type'] !=
'statement'
270 if content
and content !=
"No_license_found":
271 current_findings.add(content)
273 if (whole
and current_findings)
or (
not whole
and current_findings):
275 processed_list.append(
279 processed_list.append(
280 ScanResult(file_path, result_entry[
'file'], current_findings)
283 return processed_list
286 self, all_results: bool =
False, whole: bool =
False
289 Set the formatted results from copyright scanner for the components.
291 if not self.cli_options.scan_only_deps:
292 self.scan_packages.parent_package[
294 component=self.scan_packages.parent_package, is_parent=
True,
296 whole=whole, all_results=all_results
298 for purl
in self.scan_packages.dependencies.keys():
299 component = self.scan_packages.dependencies[purl]
301 component=component, is_parent=
False,
303 whole=whole, all_results=all_results
308 Get the formatted results from keyword scanner
310 if not self.cli_options.scan_only_deps:
311 self.scan_packages.parent_package[
313 component=self.scan_packages.parent_package, is_parent=
True,
317 for purl
in self.scan_packages.dependencies.keys():
318 component = self.scan_packages.dependencies[purl]
320 component=component, is_parent=
False,
327 Update the packages with formatted results of nomos scanner
329 if not self.cli_options.scan_only_deps:
330 self.scan_packages.parent_package[
332 component=self.scan_packages.parent_package, is_parent=
True,
333 scanner_func=self.
__get_nomos_result__get_nomos_result, result_key=
'licenses', whole=whole
335 for purl
in self.scan_packages.dependencies.keys():
336 component = self.scan_packages.dependencies[purl]
338 component=component, is_parent=
False,
339 scanner_func=self.
__get_nomos_result__get_nomos_result, result_key=
'licenses', whole=whole
344 Update the packages with formatted results of ojo scanner
346 if not self.cli_options.scan_only_deps:
347 self.scan_packages.parent_package[
349 component=self.scan_packages.parent_package, is_parent=
True,
350 scanner_func=self.
__get_ojo_result__get_ojo_result, result_key=
'licenses', whole=whole
352 for purl
in self.scan_packages.dependencies.keys():
353 component = self.scan_packages.dependencies[purl]
355 component=component, is_parent=
False,
356 scanner_func=self.
__get_ojo_result__get_ojo_result, result_key=
'licenses', whole=whole
360 self, nomos_licenses: list[ScanResult], ojo_licenses: list[ScanResult]
361 ) -> list[ScanResult]:
363 Merge the results from nomos and ojo based on file name
365 nomos_dict = {entry.file: entry
for entry
in nomos_licenses}
367 for ojo_entry
in ojo_licenses:
368 if ojo_entry.file
in nomos_dict:
369 nomos_dict[ojo_entry.file].result.update(ojo_entry.result)
372 nomos_licenses.append(ojo_entry)
373 return nomos_licenses
376 self, scan_results: list[ScanResult] =
None,
377 scan_results_whole: list[ScanResultList] =
None, whole: bool =
False
378 ) -> list[ScanResult] | list[ScanResultList]:
380 Get results where license check failed.
383 if whole
and scan_results_whole
is not None:
384 for row
in scan_results_whole:
385 if self.cli_options.repo
and self.
is_excluded_pathis_excluded_path(row.file):
389 failed_licenses_list = [
390 lic
for lic
in row.result
if
393 if failed_licenses_list:
394 final_results.append(
397 elif not whole
and scan_results
is not None:
398 for row
in scan_results:
399 if self.cli_options.repo
and self.
is_excluded_pathis_excluded_path(row.file):
404 lic
for lic
in row.result
if
408 final_results.append(
ScanResult(row.file, row.path, failed_licenses))
413 Get copyrights from files which are not allow listed.
416 return [row
for row
in copyright_results
if
417 self.cli_options.repo
and not self.
is_excluded_pathis_excluded_path(row.file)]
421 Get list of copyright scan results from the package list.
423 copyright_results = []
424 copyright_results.extend(
425 self.scan_packages.parent_package.get(
'COPYRIGHT_RESULT', [])
427 for dep
in self.scan_packages.dependencies.values():
428 copyright_results.extend(dep.get(
'COPYRIGHT_RESULT', []))
429 return copyright_results
433 Get list of keywords scan results from the package list.
436 keyword_results.extend(
437 self.scan_packages.parent_package.get(
'KEYWORD_RESULT', [])
439 for dep
in self.scan_packages.dependencies.values():
440 keyword_results.extend(dep.get(
'KEYWORD_RESULT', []))
441 return keyword_results
445 Get list of license scan results from the package list.
448 scanner_results.extend(
449 self.scan_packages.parent_package.get(
'SCANNER_RESULTS', [])
451 for dep
in self.scan_packages.dependencies.values():
452 scanner_results.extend(dep.get(
'SCANNER_RESULTS', []))
453 return scanner_results
456 self, whole: bool =
False
457 ) -> list[ScanResult] | list[ScanResultList]:
459 Get the formatted list of license scanner findings
461 The list contains the merged result of nomos/ojo scanner based on
467 scan_results_whole=scanner_results, whole=
True
471 return failed_licenses
476 item.file, item.path,
477 {res[
'license']
for res
in item.result
if 'license' in res}
478 )
for item
in failed_licenses
483 Set the key `SCANNER_RESULTS` for all components in scan_packages using
484 nomos and ojo scanners (whichever is selected).
486 if self.cli_options.nomos:
488 if self.cli_options.ojo:
491 if self.cli_options.nomos
and self.cli_options.ojo:
494 self.scan_packages.parent_package[
495 'SCANNER_RESULTS'] = self.scan_packages.parent_package.get(
497 ) + self.scan_packages.parent_package.get(
'OJO_RESULT', [])
499 self.scan_packages.parent_package[
501 self.scan_packages.parent_package.get(
'NOMOS_RESULT', []),
502 self.scan_packages.parent_package.get(
'OJO_RESULT', [])
504 for purl
in self.scan_packages.dependencies.keys():
505 component = self.scan_packages.dependencies[purl]
508 component[
'SCANNER_RESULTS'] = component.get(
515 component.get(
'NOMOS_RESULT', []), component.get(
'OJO_RESULT', [])
518 scanner_key =
'NOMOS_RESULT' if self.cli_options.nomos
else 'OJO_RESULT'
520 self.scan_packages.parent_package[
521 'SCANNER_RESULTS'] = self.scan_packages.parent_package.get(
524 for purl
in self.scan_packages.dependencies.keys():
525 component = self.scan_packages.dependencies[purl]
526 component[
'SCANNER_RESULTS'] = component.get(scanner_key, [])
list[ScanResultList] get_keyword_results(self)
def __init__(self, CliOptions cli_options, Packages scan_packages)
list[ScanResultList] get_license_results(self)
dict __get_keyword_results(self, str dir_to_scan)
None set_scanner_results(self, bool whole=False)
None set_copyright_list(self, bool all_results=False, bool whole=False)
dict __get_ojo_result(self, str dir_to_scan)
dict __get_copyright_results(self, str dir_to_scan)
list[ScanResult]|list[ScanResultList] _process_single_scanner_package(self, dict component, bool is_parent, callable scanner_func, str result_key, bool whole=False, bool all_results=False)
dict _execute_scanner_command(self, str scanner_path, str dir_to_scan, list[str] extra_args=None)
list[ScanResult]|list[ScanResultList] results_are_allow_listed(self, bool whole=False)
str __normalize_path(self, str path, str against)
dict __get_nomos_result(self, str dir_to_scan)
None set_keyword_list(self, bool whole=False)
list[ScanResult] __merge_nomos_ojo(self, list[ScanResult] nomos_licenses, list[ScanResult] ojo_licenses)
None __set_license_nomos(self, bool whole=False)
list[ScanResult]|list[ScanResultList] get_non_allow_listed_results(self, list[ScanResult] scan_results=None, list[ScanResultList] scan_results_whole=None, bool whole=False)
None __set_license_ojo(self, bool whole=False)
list[ScanResultList] get_copyright_results(self)
list[ScanResult] get_non_allow_listed_copyrights(self)
bool is_excluded_path(self, str path)