Coverage for ckanext/udc/licenses/utils.py: 45%

77 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1from __future__ import annotations 

2from ckan.common import _ 

3from ckan.model.license import License as CKANLicense, DefaultLicense 

4from ckan import model 

5from ckan.lib.redis import connect_to_redis, is_redis_available 

6from typing import Any, Callable, Collection, KeysView, Optional, Union, cast 

7 

8from ckanext.udc.licenses.model import CustomLicense 

9 

10_CUSTOM_LICENSE_SET_KEY = "udc:licenses:custom_ids" 

11_LICENSE_VERSION_KEY = "udc:licenses:version" 

12_local_license_version = None 

13 

14def create_custom_license(_id, _url, _title): 

15 class AnonymousCustomLicense(DefaultLicense): 

16 id = _id 

17 url = _url 

18 

19 @property 

20 def title(self): 

21 return _(_title) 

22 return CKANLicense(AnonymousCustomLicense()) 

23 

24 

25def _get_redis_conn(): 

26 if not is_redis_available(): 

27 return None 

28 try: 

29 return connect_to_redis() 

30 except Exception: 

31 return None 

32 

33 

34def _get_custom_license_ids(): 

35 return [c.id for c in model.Session.query(CustomLicense)] 

36 

37 

38def refresh_custom_licenses(): 

39 """ 

40 Sync custom licenses from DB into the in-process license register. 

41 Uses Redis to track which licenses were added by this extension so we can 

42 safely remove deleted ones across processes. 

43 """ 

44 license_register = model.Package.get_license_register() 

45 custom_ids = set(_get_custom_license_ids()) 

46 

47 redis_conn = _get_redis_conn() 

48 previous_custom_ids = set() 

49 if redis_conn: 

50 previous_custom_ids = set(redis_conn.smembers(_CUSTOM_LICENSE_SET_KEY)) 

51 

52 # Remove deleted custom licenses 

53 if previous_custom_ids: 

54 license_register.licenses = [ 

55 l for l in license_register.licenses if l.id not in (previous_custom_ids - custom_ids) 

56 ] 

57 

58 # Insert/update current custom licenses 

59 existing_by_id = {l.id: l for l in license_register.licenses} 

60 for custom in model.Session.query(CustomLicense): 

61 new_license = create_custom_license(custom.id, custom.url, custom.title) 

62 if custom.id in existing_by_id: 

63 for idx, existing in enumerate(license_register.licenses): 

64 if existing.id == custom.id: 

65 license_register.licenses[idx] = new_license 

66 break 

67 else: 

68 license_register.licenses.append(new_license) 

69 

70 if redis_conn: 

71 pipe = redis_conn.pipeline() 

72 pipe.delete(_CUSTOM_LICENSE_SET_KEY) 

73 if custom_ids: 

74 pipe.sadd(_CUSTOM_LICENSE_SET_KEY, *custom_ids) 

75 pipe.execute() 

76 

77 

78def bump_license_version(): 

79 redis_conn = _get_redis_conn() 

80 if redis_conn: 

81 redis_conn.incr(_LICENSE_VERSION_KEY) 

82 

83 

84def refresh_license_register_if_needed(): 

85 global _local_license_version 

86 redis_conn = _get_redis_conn() 

87 if not redis_conn: 

88 return 

89 version = redis_conn.get(_LICENSE_VERSION_KEY) 

90 if version != _local_license_version: 

91 refresh_custom_licenses() 

92 _local_license_version = version 

93 

94 

95def license_options_details( 

96 existing_license_id: Optional[tuple[str, str]] = None 

97) -> list[tuple[str, str, str]]: 

98 '''Returns [(l.id, l.title, l.url), ...] for the licenses configured to be 

99 offered. Always includes the existing_license_id, if supplied. 

100 ''' 

101 refresh_license_register_if_needed() 

102 result = [] 

103 register = model.Package.get_license_register() 

104 sorted_licenses = sorted(register.values(), key=lambda x: x.title) 

105 license_ids = [license.id for license in sorted_licenses] 

106 if existing_license_id and existing_license_id not in license_ids: 

107 license_ids.insert(0, existing_license_id) 

108 

109 for license_id in license_ids: 

110 license = register[license_id] 

111 if license: 

112 result.append((license.id, _(license.title), license.url or '')) 

113 else: 

114 result.append((license_id, license_id, '')) 

115 return result