Coverage for ckanext/udc/helpers.py: 67%
163 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1from __future__ import annotations
3import json
4import traceback
5import re
6from collections import OrderedDict
7from typing import Any, Callable, Collection, KeysView, Optional, Union, cast
9from ckan.types import Schema, Context
10from ckan.common import _
11import ckan
12import ckan.plugins as plugins
13import ckan.logic as logic
14import ckan.model as model
15import ckan.plugins.toolkit as tk
16from ckan.plugins.toolkit import (chained_action, side_effect_free, chained_helper)
17import ckan.lib.helpers as h
18from ckan.common import current_user, _
20from .graph.logic import onUpdateCatalogue, onDeleteCatalogue, get_catalogue_graph
21from ckanext.udc.file_format.logic import before_package_update as before_package_update_for_file_format
23import logging
24import json
25import chalk
27log = logging.getLogger(__name__)
30import time
32# Register a chained action after `config_option_update(...)` is triggered, i.e. config is saved from the settings page.
33# We need to reload the UDC plugin to make sure the maturity model is up to date.
34@side_effect_free
35@chained_action
36def config_option_update(original_action, context, data_dict):
37 try:
38 # Call our plugin to update the config
39 log.info("config_option_update: Update UDC Config")
40 plugins.get_plugin('udc').reload_config(
41 json.loads(data_dict["ckanext.udc.config"]))
42 except:
43 log.error
45 res = original_action(context, data_dict)
46 return res
48@side_effect_free
49@chained_action
50def package_update(original_action, context, data_dict):
51 # Pre-process custom file format
52 before_package_update_for_file_format(context, data_dict)
54 result = original_action(context, data_dict)
55 try:
56 if not plugins.get_plugin('udc').disable_graphdb:
57 onUpdateCatalogue(context, result)
58 except Exception as e:
59 log.error(e)
60 print(e)
61 raise logic.ValidationError([_("Error occurred in updating the knowledge graph, please contact administrator:\n") + str(e)])
62 return result
64@side_effect_free
65@chained_action
66def package_delete(original_action, context, data_dict):
67 print(f"Package Delete: ", data_dict)
68 result = original_action(context, data_dict)
69 try:
70 if not plugins.get_plugin('udc').disable_graphdb:
71 onDeleteCatalogue(context, data_dict)
72 except Exception as e:
73 log.error(e)
74 print(e)
75 raise logic.ValidationError([_("Error occurred in updating the knowledge graph, please contact administrator:\n") + str(e)])
76 return result
79# Register a chained helpers for humanize_entity_type() to change labels.
80@chained_helper
81def humanize_entity_type(next_helper: Callable[..., Any],
82 entity_type: str, object_type: str, purpose: str):
84 if (entity_type, object_type) == ("package", "catalogue"):
85 if purpose == "main nav":
86 return _("Catalogue")
87 elif purpose == "search placeholder":
88 return _("Search Catalogue Entries")
89 elif purpose == "search_placeholder":
90 # Don't know where is this used.
91 return _("Catalogue Entry")
92 elif purpose == "create title":
93 return _("Create Catalogue Entry")
94 elif purpose == "create label":
95 return _("Create Catalogue Entry")
96 elif purpose == "add link":
97 return _("Add Catalogue Entry")
98 elif purpose == "no description":
99 return _("There is no description for this catalogue entry")
100 elif purpose == "view label":
101 return _("View Catalogue Entry")
104 original_text = next_helper(entity_type, object_type, purpose)
105 # print(entity_type, object_type, purpose, original_text)
107 return original_text
110def render_markdown(data: str, **kwargs):
111 allow_html = tk.asbool(tk.config.get("ckanext.udc.render_markdown_allow_html", False))
112 kwargs.setdefault("allow_html", allow_html)
113 return h.render_markdown(data, **kwargs)
116def get_default_facet_titles():
117 facets: dict[str, str] = OrderedDict()
119 # Copied from ckan.views.dataset.search
120 org_label = h.humanize_entity_type(
121 u'organization',
122 h.default_group_type(u'organization'),
123 u'facet label') or _(u'Organizations')
125 group_label = h.humanize_entity_type(
126 u'group',
127 h.default_group_type(u'group'),
128 u'facet label') or _(u'Groups')
130 default_facet_titles = {
131 u'organization': org_label,
132 u'groups': group_label,
133 u'tags': _(u'Tags'),
134 u'res_format': _(u'Formats'),
135 u'license_id': _(u'Licenses'),
136 }
138 for facet in h.facets():
139 if facet in default_facet_titles:
140 facets[facet] = default_facet_titles[facet]
141 else:
142 facets[facet] = facet
144 # Facet titles
145 for plugin in plugins.PluginImplementations(plugins.IFacets):
146 facets = plugin.dataset_facets(facets, "catalogue")
147 return facets
149def process_facets_fields(facets_fields: dict):
150 """For search page displaying search filters"""
151 print("facets_fields", facets_fields)
152 results = {}
153 for field in facets_fields:
154 if field.startswith("filter-logic"):
155 continue
157 if field.startswith("extras_"):
158 field_name = field[7:]
159 elif field.endswith("_ngram"):
160 field_name = field[:-6]
161 else:
162 field_name = field
164 if field_name not in results:
165 results[field_name] = {"logic": "or", "values": []}
167 field_value = facets_fields[field]
168 if isinstance(field_value, list):
169 for item in field_value:
170 results[field_name]["values"].append({
171 "ori_field": field,
172 "ori_value": item,
173 "value": item,
174 })
175 continue
177 if isinstance(field_value, dict) and 'values' in field_value:
178 values = field_value['values']
179 is_fts = field_value.get('fts', False)
180 for item in values:
181 results[field_name]["values"].append({
182 "ori_field": field,
183 "ori_value": item,
184 "value": f'Search for "{item}"' if is_fts else item,
185 })
186 elif isinstance(field_value, dict):
187 # Date or number ranges
188 min = field_value.get('min')
189 max = field_value.get('max')
191 if min:
192 results[field_name]["values"].append({
193 "ori_field": "min_" + field_name,
194 "ori_value": min,
195 "value": f"From: {min}",
196 })
197 if max:
198 results[field_name]["values"].append({
199 "ori_field": "max_" + field_name,
200 "ori_value": max,
201 "value": f"To: {max}",
202 })
205 if "filter-logic-" + field in facets_fields and facets_fields["filter-logic-" + field][0] == "and":
206 results[field_name]["logic"] = "and"
208 return results
210def get_maturity_percentages(config, pkg_dict):
211 percentages = []
212 for idx, level in enumerate(config):
213 num_not_empty = 0
214 total_size = 0
215 for field in level["fields"]:
216 if field.get("ckanField"):
217 # Skip custom_fields
218 if field.get("ckanField") in ['custom_fields']:
219 continue
220 # organization_and_visibility is always filled
221 if field.get("ckanField") == 'organization_and_visibility':
222 num_not_empty += 2
223 total_size += 1
224 # `description` is stored as `notes`
225 elif field.get("ckanField") == 'description' and pkg_dict.get("notes"):
226 num_not_empty += 1
227 # `source` is stored as `url`
228 elif field.get("ckanField") == 'source' and pkg_dict.get("url"):
229 num_not_empty += 1
230 elif pkg_dict.get(field["ckanField"]):
231 num_not_empty += 1
232 else:
233 if field.get("name") and field.get("label") and pkg_dict.get(field["name"]):
234 num_not_empty += 1
236 total_size += 1
237 percentages.append(str(round(num_not_empty / total_size * 100)) + "%")
239 return percentages
242def get_system_info(name: str):
243 return model.system_info.get_system_info(name)
246def udc_json_attr(value):
247 """Return a JSON string safe to embed in an HTML attribute.
249 - If value is already a string, assume it is JSON (or plain text) and
250 return it as-is; Jinja's autoescape will handle HTML encoding.
251 - If value is a dict/list/etc, json.dumps it to a compact string.
252 """
254 if value is None:
255 return ""
256 if isinstance(value, str):
257 return value
258 try:
259 return json.dumps(value, ensure_ascii=False)
260 except Exception:
261 return ""