FOSSology  4.5.1
Open Source License Compliance by Open Source Software
fossologyscanner.py
1 #!/usr/bin/env python3
2 
3 # SPDX-FileCopyrightText: © 2020,2023,2025 Siemens AG
4 # SPDX-FileCopyrightText: © anupam.ghosh@siemens.com
5 # SPDX-FileCopyrightText: © mishra.gaurav@siemens.com
6 
7 # SPDX-License-Identifier: GPL-2.0-only
8 
9 import argparse
10 import json
11 import logging
12 import os
13 import sys
14 import textwrap
15 from typing import IO
16 
17 # Configure logging
18 logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
19 
20 from FoScanner.ApiConfig import (ApiConfig, Runner)
21 from FoScanner.CliOptions import (CliOptions, ReportFormat)
22 from FoScanner.FormatResults import FormatResult
23 from FoScanner.Packages import Packages
24 from FoScanner.RepoSetup import RepoSetup
25 from FoScanner.Scanners import (Scanners, ScanResult)
26 from FoScanner.SpdxReport import SpdxReport
27 from FoScanner.Utils import (
28  validate_keyword_conf_file, copy_keyword_file_to_destination
29 )
30 from ScanDeps.Downloader import Downloader
31 from ScanDeps.Parsers import Parser, PythonParser, NPMParser
32 
33 
34 def get_api_config() -> ApiConfig:
35  """
36  Set the API configuration based on CI the job is running on
37 
38  :return: ApiConfig object
39  """
40  api_config = ApiConfig()
41  if 'GITLAB_CI' in os.environ:
42  api_config.running_on = Runner.GITLAB
43  api_config.api_url = os.environ.get('CI_API_V4_URL', '')
44  api_config.project_id = os.environ.get('CI_PROJECT_ID', '')
45  api_config.mr_iid = os.environ.get('CI_MERGE_REQUEST_IID', '')
46  api_config.api_token = os.environ.get('API_TOKEN', '')
47  api_config.project_name = os.environ.get('CI_PROJECT_NAME', '')
48  api_config.project_desc = os.environ.get('CI_PROJECT_DESCRIPTION', '').strip()
49  if not api_config.project_desc:
50  api_config.project_desc = None
51  api_config.project_orig = os.environ.get('CI_PROJECT_NAMESPACE', '')
52  api_config.project_url = os.environ.get('CI_PROJECT_URL', '')
53  elif os.environ.get('TRAVIS') == 'true':
54  api_config.running_on = Runner.TRAVIS
55  api_config.travis_repo_slug = os.environ.get('TRAVIS_REPO_SLUG', '')
56  api_config.travis_pull_request = os.environ.get('TRAVIS_PULL_REQUEST', '')
57  if api_config.travis_repo_slug:
58  api_config.project_name = api_config.travis_repo_slug.split("/")[-1]
59  api_config.project_orig = "/".join(api_config.travis_repo_slug.split("/")[:-2])
60  api_config.project_url = f"https://github.com/{api_config.travis_repo_slug}"
61  elif os.environ.get('GITHUB_ACTIONS') == 'true':
62  api_config.running_on = Runner.GITHUB
63  api_config.api_url = os.environ.get('GITHUB_API', 'https://api.github.com')
64  api_config.api_token = os.environ.get('GITHUB_TOKEN', '')
65  api_config.github_repo_slug = os.environ.get('GITHUB_REPOSITORY', '')
66  api_config.github_pull_request = os.environ.get('GITHUB_PULL_REQUEST', '')
67  if api_config.github_repo_slug:
68  api_config.project_name = api_config.github_repo_slug.split("/")[-1]
69  api_config.project_orig = os.environ.get('GITHUB_REPO_OWNER', '')
70  api_config.project_url = os.environ.get('GITHUB_REPO_URL', '')
71  return api_config
72 
73 
74 def get_allow_list(path: str = '') -> dict:
75  """
76  Decode json from `allowlist.json`
77 
78  :param path: path to allowlist file. Default=''
79  :return: allowlist dictionary
80  """
81  file_name = 'allowlist.json'
82  if not path:
83  if os.path.exists('whitelist.json'):
84  file_name = 'whitelist.json'
85  logging.warning(
86  "Name 'whitelist.json' is deprecated. "
87  "Please use 'allowlist.json' instead."
88  )
89  logging.info(f"Reading {file_name} file...")
90  else:
91  file_name = path
92  logging.info(f"Reading allowlist.json file from the path: '{file_name}'")
93  with open(file_name, 'r', encoding='utf-8') as f:
94  data = json.load(f)
95  return data
96 
97 
98 def print_results(
99  name: str, failed_results: list[ScanResult],
100  scan_results_with_line_number: list[dict[str, set[str]]],
101  result_file: IO
102 ):
103  """
104  Print the formatted scanner results
105 
106  :param name: Name of the scanner
107  :param failed_results: formatted scanner results to be printed
108  :param scan_results_with_line_number: List of words mapped to their line
109  numbers
110  :param result_file: File to write results to
111  """
112  line_number_map: dict[str, set[str]] = {}
113  for item in scan_results_with_line_number:
114  if item:
115  line_number_map.update(item)
116 
117  for files in failed_results:
118  logging.info(f"File: {files.file}")
119  result_file.write(f"File: {files.file}\n")
120 
121  plural_name = "s" if len(files.result) > 1 else ""
122  logging.info(f"{name}{plural_name}:")
123  result_file.write(f"{name}{plural_name}:\n")
124 
125  for result_item in files.result:
126  if isinstance(result_item, dict):
127  scanned_word = result_item.get('content') or result_item.get('license')
128  else:
129  scanned_word = str(result_item)
130 
131  if scanned_word in line_number_map:
132  lines = line_number_map[scanned_word]
133  plural_lines = "s" if len(lines) > 1 else ""
134  lines_str = ", ".join(lines)
135  formatted_output = f"{scanned_word} at line{plural_lines} {lines_str}"
136  else:
137  formatted_output = scanned_word
138 
139  logging.info(f"\t{formatted_output}")
140  result_file.write(f"\t{formatted_output}\n")
141 
142 
143 def print_log_message(
144  filename: str,
145  failed_list: bool | list[ScanResult],
146  check_value: bool, failure_text: str,
147  acceptance_text: str, scan_type: str,
148  return_val: int, scan_results_with_line_number: list[dict[str, set[str]]]
149 ) -> int:
150  """
151  Common helper function to print scan results.
152 
153  :param filename: File where results are to be stored.
154  :param failed_list: Failed scan results.
155  :param check_value: Boolean value which failed_list should have.
156  :param failure_text: Message to print in case of failures.
157  :param acceptance_text: Message to print in case of no failures.
158  :param scan_type: Type of scan to print.
159  :param return_val: Return value for program
160  :param scan_results_with_line_number: List of words mapped to their line
161  numbers
162  :return: New return value
163  """
164  with open(filename, 'w', encoding='utf-8') as report_file:
165  has_failures = False
166  if isinstance(failed_list, bool):
167  has_failures = (failed_list != check_value)
168  elif isinstance(failed_list, list):
169  has_failures = (len(failed_list) > 0)
170 
171  if has_failures:
172  logging.error(f"\u2718 {failure_text}:") # Cross mark
173  report_file.write(f"{failure_text}:\n")
174  print_results(
175  scan_type, failed_list, scan_results_with_line_number, report_file
176  )
177  if scan_type == "License":
178  return_val |= 2
179  elif scan_type == "Copyright":
180  return_val |= 4
181  elif scan_type == "Keyword":
182  return_val |= 8
183  else:
184  logging.info(f"\u2714 {acceptance_text}") # Check mark
185  report_file.write(f"{acceptance_text}\n")
186 
187  logging.info("")
188  return return_val
189 
190 
191 def _format_results_with_line_numbers(
192  scanner: Scanners, format_results: FormatResult, result_type: str, key: str
193 ) -> list[dict[str, set[str]]]:
194  """
195  Generic function to format scanner results with line numbers.
196 
197  :param scanner: Scanner object
198  :param format_results: FormatResult object
199  :param result_type: Type of results to retrieve ('keyword', 'copyright',
200  'license')
201  :param key: The key within the scan result dictionary to use for the word (
202  e.g., 'content' for copyrights and 'licenses' for license scans)
203  :return: List of dicts with key as word and value as list of line numbers of the words
204  """
205  if result_type == 'keyword':
206  scan_results = scanner.get_keyword_results()
207  elif result_type == 'copyright':
208  scan_results = scanner.get_copyright_results()
209  elif result_type == 'license':
210  # license_results can be True/None or a list, ensure it's a list
211  license_res = scanner.results_are_allow_listed(whole=True)
212  scan_results = license_res if isinstance(license_res, list) else []
213  else:
214  return []
215 
216  formatted_list_of_line_numbers = []
217  for scan_result_item in scan_results:
218  list_of_scan_results = (
219  list(scan_result_item.result)
220  if scan_result_item and scan_result_item.result
221  else []
222  )
223 
224  words_with_line_numbers = format_results.find_word_line_numbers(
225  scan_result_item.path, list_of_scan_results, key=key
226  )
227  if words_with_line_numbers:
228  formatted_list_of_line_numbers.append(words_with_line_numbers)
229  return formatted_list_of_line_numbers
230 
231 
232 def text_report(
233  cli_options: CliOptions, result_dir: str, return_val: int,
234  scanner: Scanners, format_results: FormatResult
235 ) -> int:
236  """
237  Run scanners and print results in text format.
238 
239  :param cli_options: CLI options
240  :param result_dir: Result directory location
241  :param return_val: Return value of program
242  :param scanner: Scanner object
243  :param format_results: FormatResult object
244  :return: Program's return value
245  """
246  return perform_scans(
247  cli_options, format_results, result_dir, return_val, scanner
248  )
249 
250 
251 def perform_scans(cli_options, format_results, result_dir, return_val, scanner):
252  if cli_options.nomos or cli_options.ojo:
253  logging.info("Scanning for licenses...")
254  scanner.set_scanner_results(whole=True)
255  scan_results_with_line_number = _format_results_with_line_numbers(
256  scanner=scanner, format_results=format_results,
257  result_type='license', key='license'
258  )
259  failed_licenses = scanner.results_are_allow_listed()
260  return_val = print_log_message(
261  f"{result_dir}/licenses.txt", failed_licenses, True,
262  "Following licenses found which are not allow listed",
263  "No license violation found", "License", return_val,
264  scan_results_with_line_number
265  )
266  if cli_options.copyright:
267  logging.info("Scanning for copyrights...")
268  scanner.set_copyright_list(all_results=True, whole=True)
269  failed_copyrights = scanner.get_non_allow_listed_copyrights()
270  scan_results_with_line_number = _format_results_with_line_numbers(
271  scanner=scanner, format_results=format_results,
272  result_type='copyright', key='content'
273  )
274  return_val = print_log_message(
275  f"{result_dir}/copyrights.txt",
276  failed_copyrights, False, "Following copyrights found",
277  "No copyright violation found", "Copyright", return_val,
278  scan_results_with_line_number
279  )
280  if cli_options.keyword:
281  logging.info("Scanning keywords...")
282  scanner.set_keyword_list(whole=True)
283  scan_results_with_line_number = _format_results_with_line_numbers(
284  scanner=scanner, format_results=format_results,
285  result_type='keyword', key='content'
286  )
287  keyword_results = [
288  r.result.get('content') for r in scanner.get_keyword_results()
289  if r.result and r.result.get('content')
290  ]
291 
292  return_val = print_log_message(
293  f"{result_dir}/keywords.txt",
294  keyword_results, False, "Following keywords found",
295  "No keyword violation found", "Keyword", return_val,
296  scan_results_with_line_number
297  )
298  return return_val
299 
300 
301 def bom_report(
302  cli_options: CliOptions, result_dir: str, return_val: int,
303  scanner: Scanners, format_results: FormatResult
304 ) -> int:
305  """
306  Run scanners and print results as an SBOM.
307 
308  :param cli_options: CLI options
309  :param result_dir: Result directory location
310  :param return_val: Return value
311  :param scanner: Scanner object
312  :param format_results: FormatResult object
313  :return: Program's return value
314  """
315  report_obj = SpdxReport(cli_options, scanner)
316  return_val = perform_scans(
317  cli_options, format_results, result_dir, return_val, scanner
318  )
319  logging.info("Finalizing reports...")
320  report_obj.finalize_document()
321 
322  report_name = f"{result_dir}/sbom_"
323  if cli_options.report_format == ReportFormat.SPDX_JSON:
324  report_name += "spdx.json"
325  elif cli_options.report_format == ReportFormat.SPDX_RDF:
326  report_name += "spdx.rdf"
327  elif cli_options.report_format == ReportFormat.SPDX_TAG_VALUE:
328  report_name += "spdx.spdx"
329  elif cli_options.report_format == ReportFormat.SPDX_YAML:
330  report_name += "spdx.yaml"
331 
332  logging.info(f"Validating and writing report to file {report_name}...")
333  try:
334  report_obj.write_report(report_name)
335  logging.info(f"\u2714 Saved SBOM as {report_name}")
336  except RuntimeError as e:
337  logging.error(f"Failed to write SBOM report: {e}")
338  return_val |= 1
339 
340  return return_val
341 
342 
343 def get_scan_packages(api_config: ApiConfig) -> Packages:
344  scan_packages = Packages()
345  scan_packages.parent_package = {
346  'name': api_config.project_name,
347  'description': api_config.project_desc,
348  'author': api_config.project_orig,
349  'url': api_config.project_url
350  }
351 
352  return scan_packages
353 
354 
355 def main(parsed_args):
356  """
357  Main
358 
359  :param parsed_args:
360  :return: 0 for success, error code on failure.
361  """
362  api_config = get_api_config()
363  cli_options = CliOptions()
364  cli_options.update_args(parsed_args)
365  save_dir = 'pkg_downloads'
366  scan_packages = get_scan_packages(api_config)
367 
368  try:
369  if cli_options.allowlist_path:
370  cli_options.allowlist = get_allow_list(path=cli_options.allowlist_path)
371  else:
372  cli_options.allowlist = get_allow_list()
373  except FileNotFoundError:
374  logging.warning("Unable to find allowlist.json in current dir. "
375  "Continuing without it.")
376  except json.JSONDecodeError:
377  logging.error("Error parsing allowlist.json. Please ensure it's valid JSON."
378  " Continuing without it.")
379  except Exception as e:
380  logging.error(f"An unexpected error occurred while reading allowlist: {e}."
381  " Continuing without it.")
382 
383  if cli_options.keyword and cli_options.keyword_conf_file_path:
384  keyword_conf_file_path = cli_options.keyword_conf_file_path
385  destination_path = '/usr/local/share/fossology/keyword/agent/keyword.conf'
386  is_valid, message = validate_keyword_conf_file(keyword_conf_file_path)
387  if is_valid:
388  logging.info(f"Validation of keyword file successful: {message}")
389  copy_keyword_file_to_destination(keyword_conf_file_path, destination_path)
390  else:
391  logging.error(f"Could not validate keyword file: {message}")
392 
393  if (cli_options.scan_only_deps or cli_options.repo) and cli_options.sbom_path:
394  sbom_file_path = cli_options.sbom_path
395  cli_options.parser = Parser(sbom_file_path)
396  cli_options.parser.classify_components(save_dir)
397 
398  if cli_options.parser.python_components:
399  python_parser = PythonParser()
400  python_parser.parse_components(cli_options.parser)
401 
402  if cli_options.parser.npm_components:
403  npm_parser = NPMParser()
404  npm_parser.parse_components(cli_options.parser)
405 
406  if cli_options.parser.unsupported_components:
407  for comp in cli_options.parser.unsupported_components:
408  logging.warning(
409  f"The purl {comp.get('purl', 'N/A')} is not supported. "
410  "Package will not be downloaded."
411  )
412 
413  scan_packages.dependencies = cli_options.parser.parsed_components
414 
415  try:
416  downloader = Downloader()
417  downloader.download_concurrently(cli_options.parser)
418  except Exception as e:
419  logging.error(
420  f"Something went wrong while downloading the dependencies: {e}")
421 
422  if cli_options.scan_dir:
423  cli_options.diff_dir = cli_options.dir_path
424  elif not cli_options.repo and not cli_options.scan_only_deps:
425  repo_setup = RepoSetup(cli_options, api_config)
426  cli_options.diff_dir = repo_setup.get_diff_dir()
427 
428  scanner = Scanners(cli_options, scan_packages)
429  return_val = 0
430 
431  # Populate tmp dir in unified diff format
432  format_results = FormatResult(cli_options)
433  format_results.process_files(scanner.cli_options.diff_dir)
434 
435  # Create result dir
436  result_dir = "results"
437  os.makedirs(name=result_dir, exist_ok=True)
438 
439  logging.info("Preparing scan reports...")
440  if cli_options.report_format == ReportFormat.TEXT:
441  return_val = text_report(
442  cli_options, result_dir, return_val, scanner,
443  format_results
444  )
445  else:
446  return_val = bom_report(
447  cli_options, result_dir, return_val, scanner,
448  format_results
449  )
450  return return_val
451 
452 
453 if __name__ == "__main__":
454  parser = argparse.ArgumentParser(
455  description=textwrap.dedent("""fossology scanner designed for CI""")
456  )
457  parser.add_argument(
458  "operation", type=str, help="Operations to run.", nargs='*',
459  choices=[
460  "nomos", "copyright", "keyword", "ojo", "repo", "differential",
461  "scan-only-deps", "scan-dir"
462  ]
463  )
464  parser.add_argument(
465  "--tags", type=str, nargs=2,
466  help="Tags for differential scan. Required if 'differential' is specified."
467  )
468  parser.add_argument(
469  "--report", type=str, help="Type of report to generate. Default 'TEXT'.",
470  choices=[member.name for member in ReportFormat],
471  default=ReportFormat.TEXT.name
472  )
473  parser.add_argument(
474  '--keyword-conf', type=str, help='Path to the keyword configuration file. '
475  'Use only when keyword argument is true'
476  )
477  parser.add_argument(
478  '--dir-path', type=str, help='Path to directory for scanning.'
479  )
480 
481  parser.add_argument(
482  "--allowlist-path", type=str,
483  help="Pass allowlist.json to allowlist dependencies."
484  )
485  parser.add_argument(
486  "--sbom-path", type=str,
487  help="Path to SBOM file for downloading dependencies."
488  )
489 
490  args = parser.parse_args()
491  sys.exit(main(args))
Store the options sent through the CLI.
list_t type structure used to keep various lists. (e.g. there are multiple lists).
Definition: nomos.h:308