10 import multiprocessing
11 from subprocess
import Popen, PIPE
12 from typing
import List, Set, Union
14 from .CliOptions
import CliOptions
19 Store scan results from agents.
21 :ivar file: File location
22 :ivar path: Actual location of file
23 :ivar result: License list for file
27 result: Set[str] =
None
29 def __init__(self, file: str, path: str, result: Set[str]):
37 Store scan results from agents with result as a list of dictionaries.
39 :ivar file: File location
40 :ivar path: Actual location of file
41 :ivar result: License list for file as a list of dictionaries
45 result: List[dict] =
None
47 def __init__(self, file: str, path: str, result: List[dict]):
55 Handle all the data from different scanners.
57 :ivar nomos_path: path to nomos bin
58 :ivar copyright_path: path to copyright bin
59 :ivar keyword_path: path to keyword bin
60 :ivar ojo_path: path to ojo bin
61 :ivar cli_options: CliOptions object
63 nomos_path: str =
'/bin/nomossa'
64 copyright_path: str =
'/bin/copyright'
65 keyword_path: str =
'/bin/keyword'
66 ojo_path: str =
'/bin/ojo'
70 Initialize the cli_options
72 :param cli_options: CliOptions object to use
73 :type cli_options: CliOptions
75 self.cli_options: CliOptions = cli_options
79 Check if the path is allow listed
81 The function used fnmatch to check if the path is in allow list or not.
83 :param path: path to check
84 :return: True if the path is in allow list, False otherwise
86 path_is_excluded =
False
87 for pattern
in self.cli_options.allowlist[
'exclude']:
88 if fnmatch.fnmatchcase(path, pattern):
89 path_is_excluded =
True
91 return path_is_excluded
95 Normalize the given path to repository root
97 :param path: path to normalize
98 :return: Normalized path
100 return path.replace(f
"{self.cli_options.diff_dir}/",
'')
104 Get the raw results from nomos scanner
106 :return: raw json from nomos
108 nomossa_process = Popen([self.nomos_path,
"-S",
"-J",
"-l",
"-d",
109 self.cli_options.diff_dir,
"-n",
110 str(multiprocessing.cpu_count() - 1)], stdout=PIPE)
111 result = nomossa_process.communicate()[0]
112 return json.loads(result.decode(
'UTF-8').strip())
116 Get the raw results from ojo scanner
118 :return: raw json from ojo
120 ojo_process = Popen([self.ojo_path,
"-J",
"-d", self.cli_options.diff_dir],
122 result = ojo_process.communicate()[0]
123 return json.loads(result.decode(
'UTF-8').strip())
127 Get the raw results from copyright scanner
129 :return: raw json from copyright
131 copyright_process = Popen([self.copyright_path,
"-J",
"-d",
132 self.cli_options.diff_dir], stdout=PIPE)
133 result = copyright_process.communicate()[0]
134 return json.loads(result.decode(
'UTF-8').strip())
138 Get the raw results from keyword scanner
140 :return: raw json from keyword
142 keyword_process = Popen([self.keyword_path,
"-J",
"-d",
143 self.cli_options.diff_dir], stdout=PIPE)
144 result = keyword_process.communicate()[0]
145 return json.loads(result.decode(
'UTF-8').strip())
148 -> Union[List[ScanResult],List[ScanResultList],bool]:
150 Get the formatted results from copyright scanner
152 :param all_results: Get all results even excluded files?
153 :type all_results: bool
154 :param: whole: return whole content from scanner
155 :return: list of findings
156 :rtype: List[ScanResult] | List[ScanResultList]
159 copyright_list =
list()
160 for result
in copyright_results:
162 if self.cli_options.repo
is True and all_results
is False and \
165 if result[
'results']
is not None and result[
'results'] !=
"Unable to " \
168 json_copyright_info =
list()
169 for finding
in result[
'results']:
171 if finding
is not None and finding[
'type'] ==
"statement" and finding[
'content'] !=
"":
172 json_copyright_info.append(finding)
174 if finding
is not None and finding[
'type'] ==
"statement":
175 content = finding[
'content'].strip()
177 contents.add(content)
178 if whole
and len(json_copyright_info) > 0:
179 copyright_list.append(
ScanResultList(path, result[
'file'], json_copyright_info))
180 elif not whole
and len(contents) > 0:
181 copyright_list.append(
ScanResult(path, result[
'file'], contents))
182 if len(copyright_list) > 0:
183 return copyright_list
186 def get_keyword_list(self, whole:bool =
False) -> Union[List[ScanResult],List[ScanResultList],bool]:
188 Get the formatted results from keyword scanner
190 :param: whole: return whole content from scanner
191 :return: List of findings
192 :rtype: List[ScanResult] | List[ScanResultList] | bool
195 keyword_list =
list()
196 for result
in keyword_results:
198 if self.cli_options.repo
is True and self.
is_excluded_pathis_excluded_path(path)
is \
201 if result[
'results']
is not None and result[
'results'] !=
"Unable to " \
204 json_keyword_info =
list()
205 for finding
in result[
'results']:
207 if finding
is not None and finding[
'content'] !=
"":
208 json_keyword_info.append(finding)
210 if finding
is not None:
211 content = finding[
'content'].strip()
213 contents.add(content)
214 if whole
and len(json_keyword_info) > 0:
215 keyword_list.append(
ScanResultList(path, result[
'file'], json_keyword_info))
216 elif not whole
and len(contents) > 0:
217 keyword_list.append(
ScanResult(path, result[
'file'], contents))
218 if len(keyword_list) > 0:
224 Get the formatted results from nomos scanner
226 :param: whole: return whole content from scanner
227 :return: list of findings
228 :rtype: List[ScanResult] | List[ScanResultList]
233 for result
in nomos_result[
'results']:
236 json_license_info =
list()
237 for scan_license
in result[
'licenses']:
239 if scan_license[
'license'] !=
"No_license_found":
240 json_license_info.append(scan_license)
242 if scan_license[
'license'] !=
'No_license_found':
243 licenses.add(scan_license[
'license'])
244 if whole
and len(json_license_info) > 0:
245 scan_result.append(
ScanResultList(path,result[
'file'], json_license_info))
246 elif not whole
and len(licenses) > 0:
247 scan_result.append(
ScanResult(path, result[
'file'], licenses))
250 def __get_license_ojo(self, whole:bool =
False) -> Union[List[ScanResult],List[ScanResultList]]:
252 Get the formatted results from ojo scanner
254 :param: whole: return whole content from scanner
255 :return: list of findings
256 :rtype: List[ScanResult] | List[ScanResultList]
260 for result
in ojo_result:
262 if result[
'results']
is not None and result[
'results'] !=
'Unable to ' \
265 json_license_info =
list()
266 for finding
in result[
'results']:
268 if finding[
'license']
is not None:
269 json_license_info.append(finding)
271 if finding[
'license']
is not None:
272 licenses.add(finding[
'license'].strip())
273 if len(licenses) > 0:
274 scan_result.append(
ScanResult(path, result[
'file'], licenses))
275 elif len(json_license_info) > 0:
276 scan_result.append(
ScanResultList(path, result[
'file'], json_license_info))
280 ojo_licenses: List[ScanResult]) -> List[ScanResult]:
282 Merge the results from nomos and ojo based on file name
284 :param nomos_licenses: formatted result form nomos
285 :param ojo_licenses: formatted result form ojo
287 :return: merged list of scanner findings
289 for ojo_entry
in ojo_licenses:
290 for nomos_entry
in nomos_licenses:
291 if ojo_entry.file == nomos_entry.file:
292 nomos_entry.result.update(ojo_entry.result)
295 nomos_licenses.append(ojo_entry)
296 return nomos_licenses
299 scan_results_whole: List[ScanResultList]=
None, \
300 whole:bool =
False) \
301 -> Union[List[ScanResult],List[ScanResultList]]:
303 Get results where license check failed.
305 :param scan_results: Scan result from ojo/nomos
306 :param scan_results_whole: Whole scan result from ojo/nomos
307 :param: whole: return whole content from scanner
309 :return: List of results with only not allowed licenses
310 :rtype: List[ScanResult] | List[ScanResultList]
313 if whole
and scan_results_whole
is not None:
314 for row
in scan_results_whole:
315 if self.cli_options.repo
is True and self.
is_excluded_pathis_excluded_path(row.file) \
318 license_info_list = row.result
319 failed_licenses_list =
list([lic
for lic
in license_info_list
if lic[
'license']
not in
320 self.cli_options.allowlist[
'licenses']])
321 if len(failed_licenses_list) > 0:
322 final_results.append(
ScanResultList(row.file, row.path, failed_licenses_list))
323 elif not whole
and scan_results
is not None:
324 for row
in scan_results:
325 if self.cli_options.repo
is True and self.
is_excluded_pathis_excluded_path(row.file) \
328 license_set = row.result
329 failed_licenses = set([lic
for lic
in license_set
if lic
not in
330 self.cli_options.allowlist[
'licenses']])
331 if len(failed_licenses) > 0:
332 final_results.append(
ScanResult(row.file, row.path, failed_licenses))
336 copyright_results: List[ScanResult]) \
339 Get copyrights from files which are not allow listed.
341 :param copyright_results: Copyright results from copyright agent
342 :return: List of scan results where copyrights found.
345 row
for row
in copyright_results
if self.cli_options.repo
is True and
351 -> Union[List[ScanResult],List[ScanResultList],bool]:
353 Get the formatted list of license scanner findings
355 The list contains the merged result of nomos/ojo scanner based on
358 :param: whole: return whole content from scanner
359 :return: merged list of scanner findings
360 :rtype: List[ScanResult] | List[ScanResultList] | bool
362 failed_licenses =
None
365 if self.cli_options.nomos:
370 if self.cli_options.ojo
is False:
373 scan_results_whole=nomos_licenses, whole=
True)
376 if self.cli_options.ojo:
381 if self.cli_options.nomos
is False:
384 scan_results_whole=ojo_licenses, whole=
True)
390 scan_results_whole=nomos_licenses + ojo_licenses, whole=
True)
393 scan_results=self.
__merge_nomos_ojo__merge_nomos_ojo(nomos_licenses, ojo_licenses))
394 if len(failed_licenses) > 0:
395 return failed_licenses
399 -> Union[List[ScanResult],List[ScanResultList]]:
401 Get scan results from nomos and ojo scanners (whichever is selected).
403 :param: whole: return whole content from scanner
404 :return: List of scan results
405 :rtype: List[ScanResult] | List[ScanResultList]
410 if self.cli_options.nomos:
415 if self.cli_options.ojo:
421 if self.cli_options.nomos
and self.cli_options.ojo:
423 return nomos_licenses + ojo_licenses
426 elif self.cli_options.nomos:
427 return nomos_licenses
Union[List[ScanResult], List[ScanResultList], bool] get_copyright_list(self, bool all_results=False, bool whole=False)
List[ScanResult] get_non_allow_listed_copyrights(self, List[ScanResult] copyright_results)
Union[List[ScanResult], List[ScanResultList]] get_scanner_results(self, bool whole=False)
List[ScanResult] __merge_nomos_ojo(self, List[ScanResult] nomos_licenses, List[ScanResult] ojo_licenses)
Union[List[ScanResult], List[ScanResultList]] __get_license_ojo(self, bool whole=False)
Union[List[ScanResult], List[ScanResultList], bool] get_keyword_list(self, bool whole=False)
Union[List[ScanResult], List[ScanResultList]] get_non_allow_listed_results(self, List[ScanResult] scan_results=None, List[ScanResultList] scan_results_whole=None, bool whole=False)
dict __get_keyword_results(self)
str __normalize_path(self, str path)
dict __get_copyright_results(self)
def __init__(self, CliOptions cli_options)
Union[List[ScanResult], List[ScanResultList]] __get_license_nomos(self, bool whole=False)
dict __get_nomos_result(self)
dict __get_ojo_result(self)
bool is_excluded_path(self, str path)
Union[List[ScanResult], List[ScanResultList], bool] results_are_allow_listed(self, bool whole=False)
list_t type structure used to keep various lists. (e.g. there are multiple lists).