FOSSology  4.5.1
Open Source License Compliance by Open Source Software
osadl_convertor.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # SPDX-FileCopyrightText: © 2024 Siemens AG
4 # SPDX-FileContributor: Gaurav Mishra <mishra.gaurav@siemens.com>
5 #
6 # SPDX-License-Identifier: GPL-2.0-only
7 
8 import argparse
9 import json
10 import logging
11 import textwrap
12 import time
13 from typing import Union, Optional
14 
15 import osadl_matrix
16 import psycopg2
17 import yaml
18 
19 logging.basicConfig()
20 
21 logger = logging.getLogger("osadl_convertor")
22 logger.setLevel(logging.INFO)
23 
24 
25 class MatrixItem:
26  """
27  Class to hold information about a single rule.
28  :ivar __first_license: First license of the rule.
29  :ivar __second_license: Second license of the rule.
30  :ivar __first_type: First type of the license.
31  :ivar __second_type: Second type of the license.
32  :ivar __result: Compatibility result of the rule.
33  :ivar __comment: Comment on the rule.
34  """
35  def __init__(self):
36  self.__first_license__first_license: Optional[str] = None
37  self.__second_license__second_license: Optional[str] = None
38  self.__first_type__first_type: Optional[str] = None
39  self.__second_type__second_type: Optional[str] = None
40  self.__result__result: Optional[Union[bool, osadl_matrix.OSADLCompatibility]] = \
41  osadl_matrix.OSADLCompatibility.NO
42  self.__comment__comment: str = ""
43 
44  @property
45  def first_license(self) -> Optional[str]:
46  """
47  Get name of the first license.
48  """
49  return self.__first_license__first_license
50 
51  @first_license.setter
52  def first_license(self, first_license: str) -> None:
53  """
54  Set name of the first license.
55  """
56  self.__first_license__first_license = first_license
57 
58  @property
59  def second_license(self) -> Optional[str]:
60  """
61  Get name of the second license.
62  """
63  return self.__second_license__second_license
64 
65  @second_license.setter
66  def second_license(self, second_license: str) -> None:
67  """
68  Set name of the second license.
69  """
70  self.__second_license__second_license = second_license
71 
72  @property
73  def first_type(self) -> Optional[str]:
74  """
75  Get type of the first license.
76  """
77  return self.__first_type__first_type
78 
79  @first_type.setter
80  def first_type(self, first_type: Optional[str]) -> None:
81  """
82  Set type of the first license.
83  """
84  self.__first_type__first_type = first_type
85 
86  @property
87  def second_type(self) -> Optional[str]:
88  """
89  Get type of the second license.
90  """
91  return self.__second_type__second_type
92 
93  @second_type.setter
94  def second_type(self, second_type: Optional[str]) -> None:
95  """
96  Set type of the second license.
97  """
98  self.__second_type__second_type = second_type
99 
100  @property
101  def result(self) -> bool:
102  """
103  Get result of the rule as boolean.
104  """
105  if isinstance(self.__result__result, bool):
106  return self.__result__result
107  if self.__result__result == osadl_matrix.OSADLCompatibility.YES \
108  or self.__result__result == osadl_matrix.OSADLCompatibility.CHECKDEP:
109  return True
110  return False
111 
112  @result.setter
113  def result(self,
114  result: Union[osadl_matrix.OSADLCompatibility, bool]) -> None:
115  """
116  Set result of the rule. It can be boolean or an object of OSADLCompatibility
117  enum.
118  """
119  self.__result__result = result
120 
121  @property
122  def comment(self) -> str:
123  """
124  Get comment on the rule.
125  :return:
126  """
127  return self.__comment__comment
128 
129  @comment.setter
130  def comment(self, comment: str) -> None:
131  """
132  Set comment on the rule.
133  """
134  self.__comment__comment = comment
135 
136  def __eq__(self, other) -> bool:
137  """
138  Two rules are equal if:
139 
140  - They talk about the same licenses and have same result.
141  - They talk about the same license types and have same result.
142  """
143  if (
144  (
145  self.first_licensefirst_licensefirst_licensefirst_license is not None and
146  self.second_licensesecond_licensesecond_licensesecond_license is not None and
147  other.first_license is not None and
148  other.second_license is not None
149  ) and ((
150  self.first_licensefirst_licensefirst_licensefirst_license == other.first_license and
151  self.second_licensesecond_licensesecond_licensesecond_license == other.second_license
152  ) or (
153  self.first_licensefirst_licensefirst_licensefirst_license == other.second_license and
154  self.second_licensesecond_licensesecond_licensesecond_license == other.first_license
155  ))
156  ) or ((
157  self.first_typefirst_typefirst_typefirst_type is not None and
158  self.second_typesecond_typesecond_typesecond_type is not None and
159  other.first_type is not None and
160  other.second_type is not None
161  ) and ((
162  self.first_typefirst_typefirst_typefirst_type == other.first_type and
163  self.second_typesecond_typesecond_typesecond_type == other.second_type
164  ) or (
165  self.first_typefirst_typefirst_typefirst_type == other.second_type and
166  self.second_typesecond_typesecond_typesecond_type == other.first_type
167  ))) and self.resultresultresultresult == other.result:
168  return True
169  return False
170 
171  def __repr__(self):
172  return f"{self.__class__.__name__}(firstname={self.first_license}, " \
173  f"secondname={self.second_license}, firsttype={self.first_type}, " \
174  f"secondtype={self.second_type}, compatibility={self.result}, " \
175  f"comment={self.comment})"
176 
177 
178 def compliance_representer(dumper: yaml.Dumper, data: MatrixItem) -> yaml.Node:
179  """
180  Represent MatrixItem (rules) in format FOSSology understands as a YAML map.
181  """
182  value = [
183  (dumper.represent_data("firstname"),
184  dumper.represent_data(data.first_license)),
185  (dumper.represent_data("secondname"),
186  dumper.represent_data(data.second_license)),
187  (dumper.represent_data("firsttype"),
188  dumper.represent_data(data.first_type)),
189  (dumper.represent_data("secondtype"),
190  dumper.represent_data(data.second_type)),
191  (dumper.represent_data("compatibility"),
192  dumper.represent_data(data.result)),
193  (dumper.represent_data("comment"),
194  dumper.represent_data(data.comment))
195  ]
196 
197  return yaml.nodes.MappingNode(u"tag:yaml.org,2002:map", value)
198 
199 
201  """
202  Handle license information from FOSSology.
203  """
204  def __init__(self, host: str, port: str, user: str, password: str,
205  database: str):
206  """
207  Create connection to DB.
208  :param host: Host of the database.
209  :param port: Port of the database.
210  :param user: User of the database.
211  :param password: Password of the database.
212  :param database: Name of the database.
213  """
214  self.__conn__conn = psycopg2.connect(dbname=database, user=user,
215  password=password, host=host, port=port)
216 
217  def get_license_type(self, license_name: str) -> Optional[str]:
218  """
219  Get the type of the license from DB.
220  :param license_name: Name of the license to get type for.
221  :return: Type of the license if found, None otherwise.
222  """
223  cur = self.__conn__conn.cursor()
224  cur.execute("SELECT rf_licensetype FROM license_ref WHERE "
225  "lower(rf_shortname) = lower(%s);", (license_name,))
226  resp = cur.fetchone()
227  if resp is not None:
228  return resp[0]
229  return None
230 
231  def different_type_exists(self) -> bool:
232  """
233  Check if different type of licenses exists in DB.
234 
235  Check if threshold of "Permissive" or None license type (default type) is
236  bellow 80% of the total licenses in database.
237  :return: True if threshold is bellow 80%, False otherwise.
238  """
239  cur = self.__conn__conn.cursor()
240  cur.execute("SELECT rf_licensetype, count(*) AS count "
241  "FROM license_ref GROUP BY rf_licensetype;")
242  resp: list[tuple[Optional[str], int]] = cur.fetchall()
243  if len(resp) < 2:
244  return False
245  total_count = 0
246  permissive_count = 0
247  for row in resp:
248  type_name = "None" if row[0] is None else row[0]
249  type_count = row[1]
250  total_count += type_count
251  if type_name == "Permissive" or type_name == "None":
252  permissive_count += type_count
253  return (permissive_count / total_count) < 0.8
254 
255  def get_license_types(self) -> list[Optional[str]]:
256  """
257  Get list of different license types from database.
258  :return: List of license types in DB.
259  """
260  cur = self.__conn__conn.cursor()
261  cur.execute("SELECT DISTINCT rf_licensetype FROM license_ref;")
262  resp: list[tuple[Optional[str]]] = cur.fetchall()
263  type_list: list[Optional[str]] = []
264  for row in resp:
265  if row is not None:
266  type_list.append(row[0])
267  return type_list
268 
269  def license_exists(self, license_name: str) -> bool:
270  """
271  Check if there is a license with the given name in database or not.
272  :param license_name: Name to check
273  :return: True if exists, False otherwise
274  """
275  cur = self.__conn__conn.cursor()
276  cur.execute("SELECT 1 FROM license_ref WHERE "
277  "lower(rf_shortname) = lower(%s);", (license_name,))
278  resp = cur.fetchone()
279  if resp is not None:
280  return resp[0] == 1
281  return False
282 
283 
284 def remove_items(compatibility_matrix: list[MatrixItem],
285  first_type: Optional[str], second_type: Optional[str],
286  result: bool) -> list[MatrixItem]:
287  """
288  Given the type of licenses and result of the rule, remove them from the list.
289 
290  Stores licenses if:
291  - They are based on only types (already filtered)
292  - Their license types and results do not match.
293  :param compatibility_matrix: List of rules to filter from.
294  :param first_type: First license type for removal.
295  :param second_type: Second license type for removal.
296  :param result: Result of rule for removal.
297  :return: Filtered list of rules.
298  """
299  return [item for item in compatibility_matrix
300  if (
301  item.first_license is None and item.second_license is None
302  ) or (((
303  item.first_type != first_type or item.second_type != second_type
304  ) and (
305  item.first_type != second_type or item.second_type != first_type
306  )) or item.result != result)]
307 
308 def remove_type_for_license(compatibility_matrix: list[MatrixItem]) \
309  -> list[MatrixItem]:
310  """
311  Remove license type information from rules if they contain licenses. It
312  should be called after remove_items().
313  :param compatibility_matrix: List of rules.
314  :return: List of rules with license types removed.
315  """
316  new_list = []
317  for item in [item for item in compatibility_matrix
318  if item.first_license is not None]:
319  item.first_type = None
320  item.second_type = None
321  new_list.append(item)
322  new_list.extend([item for item in compatibility_matrix
323  if item.first_license is None])
324  return new_list
325 
326 
327 def reduce_matrix(license_handler: LicenseHandler,
328  compatibility_matrix: list[MatrixItem],
329  type_dict: dict[tuple[str, str, bool], int]):
330  """
331  Reduce the original list of rules by combining rules based on license types
332  and other criteria. The function also checks if the license type threshold
333  is passed to reduce the list based on types. If not, it simply removes the
334  license types from the rules.
335 
336  Requires dictionary of license type and result in following format:
337 
338  ```
339  { ('license_type_1', 'license_type_2', result<bool>): count of occurrences }
340  ```
341  :param license_handler: Object of LicenseHandler
342  :param compatibility_matrix: List of rules
343  :param type_dict: Dictionary storing information about license type and
344  result counts.
345  :return: Reduced list of rules.
346  """
347  if not license_handler.different_type_exists():
348  for item in compatibility_matrix:
349  item.first_type = None
350  item.second_type = None
351  return compatibility_matrix
352 
353  license_types = license_handler.get_license_types()
354  reduced_list = compatibility_matrix
355  for license_type_first in license_types:
356  for license_type_second in license_types:
357  true_count = 0
358  false_count = 0
359  if (license_type_first, license_type_second, True) in type_dict:
360  true_count = type_dict[(license_type_first, license_type_second, True)]
361  elif (license_type_second, license_type_first, True) in type_dict:
362  true_count = type_dict[(license_type_second, license_type_first, True)]
363  if (license_type_first, license_type_second, False) in type_dict:
364  false_count = type_dict[(license_type_first, license_type_second,
365  False)]
366  elif (license_type_second, license_type_first, False) in type_dict:
367  false_count = type_dict[(license_type_second, license_type_first,
368  False)]
369  max_type = true_count > false_count
370  reduced_list = remove_items(reduced_list, license_type_first,
371  license_type_second, max_type)
372  type_only_item = MatrixItem()
373  type_only_item.first_type = license_type_first
374  type_only_item.second_type = license_type_second
375  type_only_item.result = max_type
376  type_only_item.comment = f"{type_only_item.first_type} -> " \
377  f"{type_only_item.second_type} -> " \
378  f"{type_only_item.result}"
379  reduced_list.append(type_only_item)
380  return remove_type_for_license(reduced_list)
381 
382 
383 def save_yaml(location: str, compliance_matrix: list[MatrixItem]) -> None:
384  with open(location, "w") as save_file:
385  yaml.add_representer(MatrixItem, compliance_representer)
386  yaml.dump({
387  "default": False,
388  "rules": compliance_matrix
389  }, save_file)
390  logger.info(f"Saved {len(compliance_matrix)} rules in {location}.")
391 
392 
393 def convert_json_to_matrix(license_handler: LicenseHandler, json_loc: str) \
394  -> tuple[list[MatrixItem], dict[tuple[str, str, bool], int]]:
395  """
396  Convert the OSADL matrix JSON from library into list of rules. The rules
397  are made sure to be not duplicated. The type of license is also added to
398  the rules.
399  :param license_handler: LicenseHandler object
400  :param json_loc: Location of OSADL JSON
401  :return: List of rules and type dictionary for reduce_matrix()
402  """
403  matrix: Union[dict[str, dict[str, str]], None] = None
404  compatibility_matrix: list[MatrixItem] = []
405  type_dict: dict[tuple[str, str, bool], int] = dict()
406  with open(json_loc, "r") as jsoninput:
407  matrix = json.load(jsoninput)
408  if matrix is None:
409  raise Exception("Unable to read JSON")
410  for first_license, comp_list in matrix.items():
411  if first_license in ["timestamp", "timeformat"] or not \
412  license_handler.license_exists(first_license):
413  continue
414  for second_license, result in comp_list.items():
415  if not license_handler.license_exists(second_license):
416  continue
417  row = MatrixItem()
418  row.first_license = first_license
419  row.second_license = second_license
420  row.result = osadl_matrix.OSADLCompatibility.from_text(result)
421  row.comment = f"{first_license} -> {second_license} -> {row.result}"
422  row.first_type = license_handler.get_license_type(row.first_license)
423  row.second_type = license_handler.get_license_type(row.second_license)
424  if row not in compatibility_matrix:
425  logger.debug(row.comment)
426  compatibility_matrix.append(row)
427  updated = False
428  if (row.first_type, row.second_type, row.result) not in type_dict:
429  if (row.second_type, row.first_type, row.result) in type_dict:
430  type_dict[(row.second_type, row.first_type, row.result)] += 1
431  updated = True
432  if not updated:
433  type_dict[(row.first_type, row.second_type, row.result)] = \
434  type_dict.get((row.first_type, row.second_type, row.result), 0) + 1
435  return compatibility_matrix, type_dict
436 
437 
438 def main(parsed_args):
439  start_time = time.time()
440  license_handler = LicenseHandler(parsed_args.host, parsed_args.port,
441  parsed_args.user, parsed_args.password,
442  parsed_args.database)
443  compatibility_matrix, type_dict = convert_json_to_matrix(
444  license_handler, osadl_matrix.OSADL_MATRIX_JSON)
445  reduce_start = int(round(time.time() * 1000))
446  reduced_list = reduce_matrix(license_handler, compatibility_matrix, type_dict)
447  reduce_end = int(round(time.time() * 1000))
448  save_yaml(parsed_args.yaml, reduced_list)
449  time_taken = time.time() - start_time
450  logger.info(f"Took {(reduce_end - reduce_start):.2f} ms for reducing list.")
451  logger.info(f"Took {time_taken:.2f} seconds for processing.")
452 
453 
454 if __name__ == "__main__":
455  parser = argparse.ArgumentParser(
456  description=textwrap.dedent("""
457  Convert OSADL matrix to FOSSology's compatibility YAML
458  """)
459  )
460  parser.add_argument(
461  "--user", type=str, help="Database username", default="fossy"
462  )
463  parser.add_argument(
464  "--password", type=str, help="Database password", required=True
465  )
466  parser.add_argument(
467  "--database", type=str, help="Database name", default="fossology"
468  )
469  parser.add_argument(
470  "--host", type=str, help="Database host", default="localhost"
471  )
472  parser.add_argument(
473  "--port", type=str, help="Database port", default="5432"
474  )
475  parser.add_argument(
476  "--yaml", type=str, help="Location to store result file", required=True
477  )
478  parser.add_argument(
479  "-d", "--debug", action="store_true", help="Increase verbosity",
480  default=False
481  )
482  args = parser.parse_args()
483  if args.debug:
484  logger.setLevel(logging.DEBUG)
485  main(args)
bool license_exists(self, str license_name)
def __init__(self, str host, str port, str user, str password, str database)
list[Optional[str]] get_license_types(self)
Optional[str] get_license_type(self, str license_name)
Optional[str] first_license(self)
Optional[str] second_type(self)
None first_license(self, str first_license)
None result(self, Union[osadl_matrix.OSADLCompatibility, bool] result)
Optional[str] first_type(self)
Optional[str] second_license(self)
None second_license(self, str second_license)
None second_type(self, Optional[str] second_type)
None first_type(self, Optional[str] first_type)