Coverage for ckanext/udc/helpers.py: 67%

163 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1from __future__ import annotations 

2 

3import json 

4import traceback 

5import re 

6from collections import OrderedDict 

7from typing import Any, Callable, Collection, KeysView, Optional, Union, cast 

8 

9from ckan.types import Schema, Context 

10from ckan.common import _ 

11import ckan 

12import ckan.plugins as plugins 

13import ckan.logic as logic 

14import ckan.model as model 

15import ckan.plugins.toolkit as tk 

16from ckan.plugins.toolkit import (chained_action, side_effect_free, chained_helper) 

17import ckan.lib.helpers as h 

18from ckan.common import current_user, _ 

19 

20from .graph.logic import onUpdateCatalogue, onDeleteCatalogue, get_catalogue_graph 

21from ckanext.udc.file_format.logic import before_package_update as before_package_update_for_file_format 

22 

23import logging 

24import json 

25import chalk 

26 

27log = logging.getLogger(__name__) 

28 

29 

30import time 

31 

32# Register a chained action after `config_option_update(...)` is triggered, i.e. config is saved from the settings page. 

33# We need to reload the UDC plugin to make sure the maturity model is up to date. 

34@side_effect_free 

35@chained_action 

36def config_option_update(original_action, context, data_dict): 

37 try: 

38 # Call our plugin to update the config 

39 log.info("config_option_update: Update UDC Config") 

40 plugins.get_plugin('udc').reload_config( 

41 json.loads(data_dict["ckanext.udc.config"])) 

42 except: 

43 log.error 

44 

45 res = original_action(context, data_dict) 

46 return res 

47 

48@side_effect_free 

49@chained_action 

50def package_update(original_action, context, data_dict): 

51 # Pre-process custom file format 

52 before_package_update_for_file_format(context, data_dict) 

53 

54 result = original_action(context, data_dict) 

55 try: 

56 if not plugins.get_plugin('udc').disable_graphdb: 

57 onUpdateCatalogue(context, result) 

58 except Exception as e: 

59 log.error(e) 

60 print(e) 

61 raise logic.ValidationError([_("Error occurred in updating the knowledge graph, please contact administrator:\n") + str(e)]) 

62 return result 

63 

64@side_effect_free 

65@chained_action 

66def package_delete(original_action, context, data_dict): 

67 print(f"Package Delete: ", data_dict) 

68 result = original_action(context, data_dict) 

69 try: 

70 if not plugins.get_plugin('udc').disable_graphdb: 

71 onDeleteCatalogue(context, data_dict) 

72 except Exception as e: 

73 log.error(e) 

74 print(e) 

75 raise logic.ValidationError([_("Error occurred in updating the knowledge graph, please contact administrator:\n") + str(e)]) 

76 return result 

77 

78 

79# Register a chained helpers for humanize_entity_type() to change labels. 

80@chained_helper 

81def humanize_entity_type(next_helper: Callable[..., Any], 

82 entity_type: str, object_type: str, purpose: str): 

83 

84 if (entity_type, object_type) == ("package", "catalogue"): 

85 if purpose == "main nav": 

86 return _("Catalogue") 

87 elif purpose == "search placeholder": 

88 return _("Search Catalogue Entries") 

89 elif purpose == "search_placeholder": 

90 # Don't know where is this used. 

91 return _("Catalogue Entry") 

92 elif purpose == "create title": 

93 return _("Create Catalogue Entry") 

94 elif purpose == "create label": 

95 return _("Create Catalogue Entry") 

96 elif purpose == "add link": 

97 return _("Add Catalogue Entry") 

98 elif purpose == "no description": 

99 return _("There is no description for this catalogue entry") 

100 elif purpose == "view label": 

101 return _("View Catalogue Entry") 

102 

103 

104 original_text = next_helper(entity_type, object_type, purpose) 

105 # print(entity_type, object_type, purpose, original_text) 

106 

107 return original_text 

108 

109 

110def render_markdown(data: str, **kwargs): 

111 allow_html = tk.asbool(tk.config.get("ckanext.udc.render_markdown_allow_html", False)) 

112 kwargs.setdefault("allow_html", allow_html) 

113 return h.render_markdown(data, **kwargs) 

114 

115 

116def get_default_facet_titles(): 

117 facets: dict[str, str] = OrderedDict() 

118 

119 # Copied from ckan.views.dataset.search 

120 org_label = h.humanize_entity_type( 

121 u'organization', 

122 h.default_group_type(u'organization'), 

123 u'facet label') or _(u'Organizations') 

124 

125 group_label = h.humanize_entity_type( 

126 u'group', 

127 h.default_group_type(u'group'), 

128 u'facet label') or _(u'Groups') 

129 

130 default_facet_titles = { 

131 u'organization': org_label, 

132 u'groups': group_label, 

133 u'tags': _(u'Tags'), 

134 u'res_format': _(u'Formats'), 

135 u'license_id': _(u'Licenses'), 

136 } 

137 

138 for facet in h.facets(): 

139 if facet in default_facet_titles: 

140 facets[facet] = default_facet_titles[facet] 

141 else: 

142 facets[facet] = facet 

143 

144 # Facet titles 

145 for plugin in plugins.PluginImplementations(plugins.IFacets): 

146 facets = plugin.dataset_facets(facets, "catalogue") 

147 return facets 

148 

149def process_facets_fields(facets_fields: dict): 

150 """For search page displaying search filters""" 

151 print("facets_fields", facets_fields) 

152 results = {} 

153 for field in facets_fields: 

154 if field.startswith("filter-logic"): 

155 continue 

156 

157 if field.startswith("extras_"): 

158 field_name = field[7:] 

159 elif field.endswith("_ngram"): 

160 field_name = field[:-6] 

161 else: 

162 field_name = field 

163 

164 if field_name not in results: 

165 results[field_name] = {"logic": "or", "values": []} 

166 

167 field_value = facets_fields[field] 

168 if isinstance(field_value, list): 

169 for item in field_value: 

170 results[field_name]["values"].append({ 

171 "ori_field": field, 

172 "ori_value": item, 

173 "value": item, 

174 }) 

175 continue 

176 

177 if isinstance(field_value, dict) and 'values' in field_value: 

178 values = field_value['values'] 

179 is_fts = field_value.get('fts', False) 

180 for item in values: 

181 results[field_name]["values"].append({ 

182 "ori_field": field, 

183 "ori_value": item, 

184 "value": f'Search for "{item}"' if is_fts else item, 

185 }) 

186 elif isinstance(field_value, dict): 

187 # Date or number ranges 

188 min = field_value.get('min') 

189 max = field_value.get('max') 

190 

191 if min: 

192 results[field_name]["values"].append({ 

193 "ori_field": "min_" + field_name, 

194 "ori_value": min, 

195 "value": f"From: {min}", 

196 }) 

197 if max: 

198 results[field_name]["values"].append({ 

199 "ori_field": "max_" + field_name, 

200 "ori_value": max, 

201 "value": f"To: {max}", 

202 }) 

203 

204 

205 if "filter-logic-" + field in facets_fields and facets_fields["filter-logic-" + field][0] == "and": 

206 results[field_name]["logic"] = "and" 

207 

208 return results 

209 

210def get_maturity_percentages(config, pkg_dict): 

211 percentages = [] 

212 for idx, level in enumerate(config): 

213 num_not_empty = 0 

214 total_size = 0 

215 for field in level["fields"]: 

216 if field.get("ckanField"): 

217 # Skip custom_fields 

218 if field.get("ckanField") in ['custom_fields']: 

219 continue 

220 # organization_and_visibility is always filled 

221 if field.get("ckanField") == 'organization_and_visibility': 

222 num_not_empty += 2 

223 total_size += 1 

224 # `description` is stored as `notes` 

225 elif field.get("ckanField") == 'description' and pkg_dict.get("notes"): 

226 num_not_empty += 1 

227 # `source` is stored as `url` 

228 elif field.get("ckanField") == 'source' and pkg_dict.get("url"): 

229 num_not_empty += 1 

230 elif pkg_dict.get(field["ckanField"]): 

231 num_not_empty += 1 

232 else: 

233 if field.get("name") and field.get("label") and pkg_dict.get(field["name"]): 

234 num_not_empty += 1 

235 

236 total_size += 1 

237 percentages.append(str(round(num_not_empty / total_size * 100)) + "%") 

238 

239 return percentages 

240 

241 

242def get_system_info(name: str): 

243 return model.system_info.get_system_info(name) 

244 

245 

246def udc_json_attr(value): 

247 """Return a JSON string safe to embed in an HTML attribute. 

248 

249 - If value is already a string, assume it is JSON (or plain text) and 

250 return it as-is; Jinja's autoescape will handle HTML encoding. 

251 - If value is a dict/list/etc, json.dumps it to a compact string. 

252 """ 

253 

254 if value is None: 

255 return "" 

256 if isinstance(value, str): 

257 return value 

258 try: 

259 return json.dumps(value, ensure_ascii=False) 

260 except Exception: 

261 return ""