10 import multiprocessing
11 from subprocess
import Popen, PIPE
12 from typing
import List, Set, Union
14 from .CliOptions
import CliOptions
19 Store scan results from agents.
21 :ivar file: File location
22 :ivar path: Actual location of file
23 :ivar result: License list for file
27 result: Set[str] =
None
29 def __init__(self, file: str, path: str, result: Set[str]):
37 Handle all the data from different scanners.
39 :ivar nomos_path: path to nomos bin
40 :ivar copyright_path: path to copyright bin
41 :ivar keyword_path: path to keyword bin
42 :ivar ojo_path: path to ojo bin
43 :ivar cli_options: CliOptions object
45 nomos_path: str =
'/bin/nomossa'
46 copyright_path: str =
'/bin/copyright'
47 keyword_path: str =
'/bin/keyword'
48 ojo_path: str =
'/bin/ojo'
52 Initialize the cli_options
54 :param cli_options: CliOptions object to use
55 :type cli_options: CliOptions
57 self.cli_options: CliOptions = cli_options
61 Check if the path is allow listed
63 The function used fnmatch to check if the path is in allow list or not.
65 :param path: path to check
66 :return: True if the path is in allow list, False otherwise
68 path_is_excluded =
False
69 for pattern
in self.cli_options.allowlist[
'exclude']:
70 if fnmatch.fnmatchcase(path, pattern):
71 path_is_excluded =
True
73 return path_is_excluded
77 Normalize the given path to repository root
79 :param path: path to normalize
80 :return: Normalized path
82 return path.replace(f
"{self.cli_options.diff_dir}/",
'')
86 Get the raw results from nomos scanner
88 :return: raw json from nomos
90 nomossa_process = Popen([self.nomos_path,
"-J",
"-l",
"-d",
91 self.cli_options.diff_dir,
"-n",
92 str(multiprocessing.cpu_count() - 1)], stdout=PIPE)
93 result = nomossa_process.communicate()[0]
94 return json.loads(result.decode(
'UTF-8').strip())
98 Get the raw results from ojo scanner
100 :return: raw json from ojo
102 ojo_process = Popen([self.ojo_path,
"-J",
"-d", self.cli_options.diff_dir],
104 result = ojo_process.communicate()[0]
105 return json.loads(result.decode(
'UTF-8').strip())
109 Get the raw results from copyright scanner
111 :return: raw json from copyright
113 copyright_process = Popen([self.copyright_path,
"-J",
"-d",
114 self.cli_options.diff_dir], stdout=PIPE)
115 result = copyright_process.communicate()[0]
116 return json.loads(result.decode(
'UTF-8').strip())
120 Get the raw results from keyword scanner
122 :return: raw json from keyword
124 keyword_process = Popen([self.keyword_path,
"-J",
"-d",
125 self.cli_options.diff_dir], stdout=PIPE)
126 result = keyword_process.communicate()[0]
127 return json.loads(result.decode(
'UTF-8').strip())
130 -> Union[List[ScanResult], bool]:
132 Get the formatted results from copyright scanner
134 :param all_results: Get all results even excluded files?
135 :type all_results: bool
136 :return: list of findings
137 :rtype: list[ScanResult]
140 copyright_list =
list()
141 for result
in copyright_results:
143 if self.cli_options.repo
is True and all_results
is False and \
146 if result[
'results']
is not None and result[
'results'] !=
"Unable to " \
149 for finding
in result[
'results']:
150 if finding
is not None and finding[
'type'] ==
"statement":
151 content = finding[
'content'].strip()
153 contents.add(content)
154 if len(contents) > 0:
155 copyright_list.append(
ScanResult(path, result[
'file'], contents))
156 if len(copyright_list) > 0:
157 return copyright_list
162 Get the formatted results from keyword scanner
164 :return: list of findings
167 keyword_list =
list()
168 for result
in keyword_results:
170 if self.cli_options.repo
is True and self.
is_excluded_pathis_excluded_path(path)
is \
173 if result[
'results']
is not None and result[
'results'] !=
"Unable to " \
176 for finding
in result[
'results']:
177 if finding
is not None:
178 content = finding[
'content'].strip()
180 contents.add(content)
181 if len(contents) > 0:
182 keyword_list.append(
ScanResult(path, result[
'file'], contents))
183 if len(keyword_list) > 0:
189 Get the formatted results from nomos scanner
191 :return: list of findings
195 for result
in nomos_result[
'results']:
198 for scan_license
in result[
'licenses']:
199 if scan_license !=
'No_license_found':
200 licenses.add(scan_license.strip())
201 if len(licenses) > 0:
202 scan_result.append(
ScanResult(path, result[
'file'], licenses))
207 Get the formatted results from ojo scanner
209 :return: list of findings
213 for result
in ojo_result:
215 if result[
'results']
is not None and result[
'results'] !=
'Unable to ' \
218 for finding
in result[
'results']:
219 if finding[
'license']
is not None:
220 licenses.add(finding[
'license'].strip())
221 if len(licenses) > 0:
222 scan_result.append(
ScanResult(path, result[
'file'], licenses))
226 ojo_licenses: List[ScanResult]) -> List[ScanResult]:
228 Merge the results from nomos and ojo based on file name
230 :param nomos_licenses: formatted result form nomos
231 :param ojo_licenses: formatted result form ojo
233 :return: merged list of scanner findings
235 for ojo_entry
in ojo_licenses:
236 for nomos_entry
in nomos_licenses:
237 if ojo_entry.file == nomos_entry.file:
238 nomos_entry.result.update(ojo_entry.result)
241 nomos_licenses.append(ojo_entry)
242 return nomos_licenses
247 Get results where license check failed.
249 :param scan_results: Scan result from ojo/nomos
250 :return: List of results with only not allowed licenses
253 for row
in scan_results:
254 if self.cli_options.repo
is True and self.
is_excluded_pathis_excluded_path(row.file) \
257 license_set = row.result
258 failed_licenses = set([lic
for lic
in license_set
if lic
not in
259 self.cli_options.allowlist[
'licenses']])
260 if len(failed_licenses) > 0:
261 final_results.append(
ScanResult(row.file, row.path, failed_licenses))
265 copyright_results: List[ScanResult]) \
268 Get copyrights from files which are not allow listed.
270 :param copyright_results: Copyright results from copyright agent
271 :return: List of scan results where copyrights found.
274 row
for row
in copyright_results
if self.cli_options.repo
is True and
281 Get the formatted list of license scanner findings
283 The list contains the merged result of nomos/ojo scanner based on
286 :return: merged list of scanner findings
288 failed_licenses =
None
291 if self.cli_options.nomos:
293 if self.cli_options.ojo
is False:
295 if self.cli_options.ojo:
297 if self.cli_options.nomos
is False:
302 if len(failed_licenses) > 0:
303 return failed_licenses
308 Get scan results from nomos and ojo scanners (whichever is selected).
310 :return: List of scan results
315 if self.cli_options.nomos:
317 if self.cli_options.ojo:
320 if self.cli_options.nomos
and self.cli_options.ojo:
322 elif self.cli_options.nomos:
323 return nomos_licenses
List[ScanResult] get_non_allow_listed_copyrights(self, List[ScanResult] copyright_results)
List[ScanResult] get_scanner_results(self)
List[ScanResult] __merge_nomos_ojo(self, List[ScanResult] nomos_licenses, List[ScanResult] ojo_licenses)
Union[List[ScanResult], bool] results_are_allow_listed(self)
Union[List[ScanResult], bool] get_copyright_list(self, bool all_results=False)
dict __get_keyword_results(self)
List[ScanResult] __get_license_nomos(self)
str __normalize_path(self, str path)
Union[List[ScanResult], bool] get_keyword_list(self)
dict __get_copyright_results(self)
def __init__(self, CliOptions cli_options)
dict __get_nomos_result(self)
List[ScanResult] __get_license_ojo(self)
dict __get_ojo_result(self)
List[ScanResult] get_non_allow_listed_results(self, List[ScanResult] scan_results)
bool is_excluded_path(self, str path)
list_t type structure used to keep various lists. (e.g. there are multiple lists).