Coverage for ckanext/udc/validator.py: 12%
94 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-03-30 22:15 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-03-30 22:15 +0000
1from __future__ import annotations
3import json
4import re
6import ckan.plugins.toolkit as tk
9# CKAN Config Validator
10SUPPORTED_CKAN_FIELDS = [
11 "title",
12 "description",
13 "tags",
14 "license_id",
15 "organization_and_visibility",
16 "source",
17 "version",
18 "author",
19 "author_email",
20 "maintainer",
21 "maintainer_email",
22 "portal_type",
23 "custom_fields",
24]
25REQUIRED_CKAN_FIELDS = [
26 "title",
27 "organization_and_visibility",
28]
29SUPPORTED_FIELD_TYPES = [
30 "text",
31 "date",
32 "datetime",
33 "time",
34 "number",
35 "single_select",
36 "multiple_select",
37 # Relationship / versioning helpers handled specially by the UI/schema
38 "single_dataset",
39 "multiple_datasets",
40]
43def _is_localized_text(val):
44 """
45 Accept either:
46 - a plain string, or
47 - a dict of locale -> string (must include 'en'; 'fr' optional)
48 """
49 if isinstance(val, str):
50 return True
51 if isinstance(val, dict):
52 if "en" not in val:
53 return False
54 # all provided locale values must be strings (allow empty)
55 return all(isinstance(v, str) for v in val.values())
56 return False
59def _validate_localized_field(field_obj, key, field_path):
60 """
61 If key exists in field_obj, ensure it's a valid localized text (string or {en:..., fr:...}).
62 """
63 if key in field_obj and not _is_localized_text(field_obj[key]):
64 raise tk.Invalid(
65 f"Malformed UDC Config: `{field_path}.{key}` must be a string or an object like "
66 f'{{"en": "...", "fr": "..."}} with string values (and must include "en").'
67 )
70def udc_config_validator(config_str):
71 """
72 Check whether the UDC config is valid.
73 Raise a `tk.Invalid` Error when config is not valid, otherwise return the original config string.
74 """
75 try:
76 config = json.loads(config_str)
77 except Exception:
78 raise tk.Invalid("UDC Config: Malformed JSON Format.")
80 if "maturity_model" not in config:
81 raise tk.Invalid("UDC Config: Missing `maturity_model` key.")
83 if not isinstance(config["maturity_model"], list):
84 raise tk.Invalid(
85 f"UDC Config: Expecting `maturity_model` to be a JSON List but got `{config['maturity_model'].__class__.__name__}`"
86 )
88 used_fields = set()
90 for level_idx, level in enumerate(config["maturity_model"], start=1):
91 if not ("title" in level and "name" in level and "fields" in level):
92 raise tk.Invalid(
93 'Malformed UDC Config: "title", "name" and "fields" are required for each level.'
94 )
96 if not isinstance(level["fields"], list):
97 raise tk.Invalid(
98 f"Malformed UDC Config: `fields` in level `{level.get('name','?')}` must be a list."
99 )
101 for field_idx, field in enumerate(level["fields"], start=1):
102 field_path = f"maturity_model[{level_idx-1}].fields[{field_idx-1}]"
104 # CKAN mapped field
105 if "ckanField" in field:
106 if field["ckanField"] not in SUPPORTED_CKAN_FIELDS:
107 raise tk.Invalid(
108 f"Malformed UDC Config: The provided CKAN field `{field['ckanField']}` is not supported."
109 )
110 if field["ckanField"] in used_fields:
111 raise tk.Invalid(
112 f"Malformed UDC Config: The provided CKAN field `{field['ckanField']}` is duplicated."
113 )
114 used_fields.add(field["ckanField"])
116 # Optional bilingual texts for CKAN fields too
117 _validate_localized_field(field, "label", field_path)
118 _validate_localized_field(field, "short_description", field_path)
119 _validate_localized_field(field, "long_description", field_path)
121 # Custom field
122 else:
123 if not ("name" in field and "label" in field):
124 raise tk.Invalid(
125 "Malformed UDC Config: `name` and `label` is required for custom field."
126 )
128 # name must be alphanumeric/underscore
129 if re.match(r"^\w+$", field["name"]) is None:
130 raise tk.Invalid(
131 f"Malformed UDC Config: The provided field name `{field['name']}` is not alpha-numeric."
132 )
134 # label must support bilingual (string or {en, fr})
135 _validate_localized_field(field, "label", field_path)
137 # Optional bilingual descriptions
138 _validate_localized_field(field, "short_description", field_path)
139 _validate_localized_field(field, "long_description", field_path)
141 # type (if present) must be supported
142 if (
143 field.get("type") is not None
144 and field["type"] not in SUPPORTED_FIELD_TYPES
145 ):
146 raise tk.Invalid(
147 f"Malformed UDC Config: The provided field type `{field['type']}` is not supported."
148 )
150 # duplicates
151 if field["name"] in used_fields:
152 raise tk.Invalid(
153 f"Malformed UDC Config: The provided field `{field['name']}` is duplicated."
154 )
155 used_fields.add(field["name"])
157 # Non-blocking checks for select options (allow bilingual `text`)
158 if field.get("type") in ("single_select", "multiple_select"):
159 if "options" in field:
160 if not isinstance(field["options"], list):
161 raise tk.Invalid(
162 f"Malformed UDC Config: `{field_path}.options` must be a list."
163 )
164 for opt in field["options"]:
165 if not isinstance(opt, dict):
166 raise tk.Invalid(
167 f"Malformed UDC Config: `{field_path}.options[]` items must be objects."
168 )
169 if "value" not in opt:
170 raise tk.Invalid(
171 f"Malformed UDC Config: `{field_path}.options[]` missing `value`."
172 )
173 if "text" not in opt:
174 raise tk.Invalid(
175 f"Malformed UDC Config: `{field_path}.options[]` missing `text`."
176 )
177 # Allow string or localized dict for text
178 if not (_is_localized_text(opt["text"])):
179 raise tk.Invalid(
180 f"Malformed UDC Config: `{field_path}.options[].text` must be a string or localized object."
181 )
182 if "optionsFromQuery" in field:
183 ofq = field["optionsFromQuery"]
184 if not isinstance(ofq, dict):
185 raise tk.Invalid(
186 f"Malformed UDC Config: `{field_path}.optionsFromQuery` must be an object."
187 )
188 for k in ("text", "value", "query"):
189 if k not in ofq or not isinstance(ofq[k], str):
190 raise tk.Invalid(
191 f"Malformed UDC Config: `{field_path}.optionsFromQuery.{k}` must be a string."
192 )
194 # Boolean sanity check
195 if "enable_filter_logic_toggle" in field and not isinstance(
196 field["enable_filter_logic_toggle"], bool
197 ):
198 raise tk.Invalid(
199 "Malformed UDC Config: `enable_filter_logic_toggle` must be a boolean."
200 )
202 # Check required CKAN fields
203 for field_name in REQUIRED_CKAN_FIELDS:
204 if field_name not in used_fields:
205 raise tk.Invalid(
206 f"Malformed UDC Config: Missing the required CKAN field `{field_name}`."
207 )
209 return config_str
212def udc_mapping_validator(mapping_str):
213 try:
214 mapping = json.loads(mapping_str)
215 except:
216 raise tk.Invalid("UDC Mapping: Malformed JSON Format.")
217 if not isinstance(mapping, dict):
218 raise tk.Invalid(
219 f"UDC Mapping: Expecting a JSON Object but got `{mapping.__class__.__name__}`"
220 )
222 if not mapping.get("namespaces"):
223 raise tk.Invalid("UDC Mapping: Missing namespaces field.")
224 if not mapping.get("mappings"):
225 raise tk.Invalid("UDC Mapping: Missing mappings field.")
226 return mapping_str