Coverage for ckanext/udc/licenses/utils.py: 45%
77 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1from __future__ import annotations
2from ckan.common import _
3from ckan.model.license import License as CKANLicense, DefaultLicense
4from ckan import model
5from ckan.lib.redis import connect_to_redis, is_redis_available
6from typing import Any, Callable, Collection, KeysView, Optional, Union, cast
8from ckanext.udc.licenses.model import CustomLicense
10_CUSTOM_LICENSE_SET_KEY = "udc:licenses:custom_ids"
11_LICENSE_VERSION_KEY = "udc:licenses:version"
12_local_license_version = None
14def create_custom_license(_id, _url, _title):
15 class AnonymousCustomLicense(DefaultLicense):
16 id = _id
17 url = _url
19 @property
20 def title(self):
21 return _(_title)
22 return CKANLicense(AnonymousCustomLicense())
25def _get_redis_conn():
26 if not is_redis_available():
27 return None
28 try:
29 return connect_to_redis()
30 except Exception:
31 return None
34def _get_custom_license_ids():
35 return [c.id for c in model.Session.query(CustomLicense)]
38def refresh_custom_licenses():
39 """
40 Sync custom licenses from DB into the in-process license register.
41 Uses Redis to track which licenses were added by this extension so we can
42 safely remove deleted ones across processes.
43 """
44 license_register = model.Package.get_license_register()
45 custom_ids = set(_get_custom_license_ids())
47 redis_conn = _get_redis_conn()
48 previous_custom_ids = set()
49 if redis_conn:
50 previous_custom_ids = set(redis_conn.smembers(_CUSTOM_LICENSE_SET_KEY))
52 # Remove deleted custom licenses
53 if previous_custom_ids:
54 license_register.licenses = [
55 l for l in license_register.licenses if l.id not in (previous_custom_ids - custom_ids)
56 ]
58 # Insert/update current custom licenses
59 existing_by_id = {l.id: l for l in license_register.licenses}
60 for custom in model.Session.query(CustomLicense):
61 new_license = create_custom_license(custom.id, custom.url, custom.title)
62 if custom.id in existing_by_id:
63 for idx, existing in enumerate(license_register.licenses):
64 if existing.id == custom.id:
65 license_register.licenses[idx] = new_license
66 break
67 else:
68 license_register.licenses.append(new_license)
70 if redis_conn:
71 pipe = redis_conn.pipeline()
72 pipe.delete(_CUSTOM_LICENSE_SET_KEY)
73 if custom_ids:
74 pipe.sadd(_CUSTOM_LICENSE_SET_KEY, *custom_ids)
75 pipe.execute()
78def bump_license_version():
79 redis_conn = _get_redis_conn()
80 if redis_conn:
81 redis_conn.incr(_LICENSE_VERSION_KEY)
84def refresh_license_register_if_needed():
85 global _local_license_version
86 redis_conn = _get_redis_conn()
87 if not redis_conn:
88 return
89 version = redis_conn.get(_LICENSE_VERSION_KEY)
90 if version != _local_license_version:
91 refresh_custom_licenses()
92 _local_license_version = version
95def license_options_details(
96 existing_license_id: Optional[tuple[str, str]] = None
97) -> list[tuple[str, str, str]]:
98 '''Returns [(l.id, l.title, l.url), ...] for the licenses configured to be
99 offered. Always includes the existing_license_id, if supplied.
100 '''
101 refresh_license_register_if_needed()
102 result = []
103 register = model.Package.get_license_register()
104 sorted_licenses = sorted(register.values(), key=lambda x: x.title)
105 license_ids = [license.id for license in sorted_licenses]
106 if existing_license_id and existing_license_id not in license_ids:
107 license_ids.insert(0, existing_license_id)
109 for license_id in license_ids:
110 license = register[license_id]
111 if license:
112 result.append((license.id, _(license.title), license.url or ''))
113 else:
114 result.append((license_id, license_id, ''))
115 return result