FOSSology  4.5.1
Open Source License Compliance by Open Source Software
RepoSetup.py
1 #!/usr/bin/env python3
2 
3 # SPDX-FileCopyrightText: © 2023 Siemens AG
4 # SPDX-FileContributor: Gaurav Mishra <mishra.gaurav@siemens.com>
5 
6 # SPDX-License-Identifier: GPL-2.0-only
7 
8 import fnmatch
9 import json
10 import os
11 import re
12 import ssl
13 import urllib.request
14 from tempfile import TemporaryDirectory
15 from typing import Union
16 
17 from .ApiConfig import ApiConfig, Runner
18 from .CliOptions import CliOptions
19 
20 
21 class RepoSetup:
22  """
23  Setup temp_dir using the diff or current MR
24 
25  :ivar temp_dir: Temporary directory location for storing MR changes.
26  :ivar allowlist: Allow list from JSON
27  :ivar api_config: ApiConfig
28  :ivar cli_options: CliOptions object
29  """
30 
31  def __init__(self, cli_options: CliOptions, api_config: ApiConfig):
32  """
33  Create a temp dir
34 
35  :param cli_options: CliOptions object to get allow list from
36  :param api_config: API configuration for the CI
37  """
38  self.temp_dir: Union[TemporaryDirectory[str], TemporaryDirectory[bytes]] \
39  = TemporaryDirectory()
40  self.allowlist: dict[str, list[str]] = cli_options.allowlist
41  self.api_config: ApiConfig = api_config
42  self.cli_options: CliOptions = cli_options
43 
44  def __del__(self):
45  """
46  Clean the created temp dir
47  """
48  self.temp_dir.cleanup()
49 
50  def __is_excluded_path(self, path: str) -> bool:
51  """
52  Check if the path is allow listed
53 
54  The function used fnmatch to check if the path is in allow list or not.
55 
56  :param path: path to check
57  :return: True if the path is in allow list, False otherwise
58  """
59  path_is_excluded = False
60  for pattern in self.allowlist['exclude']:
61  if fnmatch.fnmatchcase(path, pattern):
62  path_is_excluded = True
63  break
64  return path_is_excluded
65 
66  def get_diff_dir(self) -> str:
67  """
68  Populate temp dir using the gitlab API `merge_requests`
69 
70  :return: temp dir path
71  """
72 
73  context = ssl.create_default_context()
74  context.check_hostname = False
75  context.verify_mode = ssl.CERT_NONE
76 
77  if self.api_config.running_on == Runner.GITLAB:
78  headers = {'Private-Token': self.api_config.api_token}
79  path_key = "new_path"
80  change_key = "diff"
81  api_req_url = f"{self.api_config.api_url}/projects/" \
82  f"{self.api_config.project_id}/merge_requests/" \
83  f"{self.api_config.mr_iid}/changes"
84  if self.cli_options.differential and self.cli_options.tags != ('',''):
85  tags = (self.cli_options.tags)
86  api_req_url = f"{self.api_config.api_url}/projects/" \
87  f"{self.api_config.project_id}/repository/compare?" + \
88  f"from={tags[0]}&to={tags[1]}"
89 
90  elif self.api_config.running_on == Runner.GITHUB:
91  headers = {
92  "Authorization": f"Bearer {self.api_config.api_token}",
93  "X-GitHub-Api-Version": "2022-11-28",
94  "Accept": "application/vnd.github+json"
95  }
96  api_req_url = f"{self.api_config.api_url}/repos/" \
97  f"{self.api_config.github_repo_slug}/pulls/" + \
98  f"{self.api_config.github_pull_request}/files"
99  if self.cli_options.differential and self.cli_options.tags != ('',''):
100  tags = self.cli_options.tags
101  api_req_url = f"{self.api_config.api_url}/repos/" \
102  f"{self.api_config.github_repo_slug}/compare/" + \
103  f"{tags[0]}...{tags[1]}"
104  path_key = "filename"
105  change_key = "patch"
106 
107  else:
108  api_req_url = "https://api.github.com/repos/" \
109  f"{self.api_config.travis_repo_slug}/pulls/" \
110  f"{self.api_config.travis_pull_request}/files"
111  headers = {}
112  path_key = "filename"
113  change_key = "patch"
114 
115  req = urllib.request.Request(api_req_url, headers=headers)
116  try:
117  with urllib.request.urlopen(req, context=context) as response:
118  change_response = response.read()
119  except Exception as e:
120  print(f"Unable to get URL {api_req_url}")
121  raise e
122 
123  change_response = json.loads(change_response)
124  if self.api_config.running_on == Runner.GITLAB:
125  if self.cli_options.differential and self.cli_options.tags != ('',''):
126  changes = change_response['diffs']
127  else:
128  changes = change_response['changes']
129  elif self.api_config.running_on == Runner.GITHUB:
130  if self.cli_options.differential and self.cli_options.tags != ('',''):
131  changes = change_response['files']
132  else:
133  changes = change_response
134  else:
135  changes = change_response
136 
137  for change in changes:
138  if path_key in change and change_key in change:
139  path_to_be_excluded = self.__is_excluded_path__is_excluded_path(change[path_key])
140  if path_to_be_excluded is False:
141  curr_file = os.path.join(self.temp_dir.name, change[path_key])
142  curr_dir = os.path.dirname(curr_file)
143  if curr_dir != self.temp_dir.name:
144  os.makedirs(name=curr_dir, exist_ok=True)
145  curr_file = open(file=curr_file, mode='w+', encoding='UTF-8')
146  print(change[change_key],file=curr_file)
147  return self.temp_dir.name
bool __is_excluded_path(self, str path)
Definition: RepoSetup.py:50
def __init__(self, CliOptions cli_options, ApiConfig api_config)
Definition: RepoSetup.py:31