Coverage for ckanext/udc/validator.py: 12%
94 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1from __future__ import annotations
3import json
4import re
6import ckan.plugins.toolkit as tk
9# CKAN Config Validator
10SUPPORTED_CKAN_FIELDS = [
11 "title",
12 "description",
13 "tags",
14 "license_id",
15 "organization_and_visibility",
16 "source",
17 "version",
18 "author",
19 "author_email",
20 "maintainer",
21 "maintainer_email",
22 "custom_fields",
23]
24REQUIRED_CKAN_FIELDS = [
25 "title",
26 "organization_and_visibility",
27]
28SUPPORTED_FIELD_TYPES = [
29 "text",
30 "date",
31 "datetime",
32 "time",
33 "number",
34 "single_select",
35 "multiple_select",
36 # Relationship / versioning helpers handled specially by the UI/schema
37 "single_dataset",
38 "multiple_datasets",
39]
42def _is_localized_text(val):
43 """
44 Accept either:
45 - a plain string, or
46 - a dict of locale -> string (must include 'en'; 'fr' optional)
47 """
48 if isinstance(val, str):
49 return True
50 if isinstance(val, dict):
51 if "en" not in val:
52 return False
53 # all provided locale values must be strings (allow empty)
54 return all(isinstance(v, str) for v in val.values())
55 return False
58def _validate_localized_field(field_obj, key, field_path):
59 """
60 If key exists in field_obj, ensure it's a valid localized text (string or {en:..., fr:...}).
61 """
62 if key in field_obj and not _is_localized_text(field_obj[key]):
63 raise tk.Invalid(
64 f"Malformed UDC Config: `{field_path}.{key}` must be a string or an object like "
65 f'{{"en": "...", "fr": "..."}} with string values (and must include "en").'
66 )
69def udc_config_validator(config_str):
70 """
71 Check whether the UDC config is valid.
72 Raise a `tk.Invalid` Error when config is not valid, otherwise return the original config string.
73 """
74 try:
75 config = json.loads(config_str)
76 except Exception:
77 raise tk.Invalid("UDC Config: Malformed JSON Format.")
79 if "maturity_model" not in config:
80 raise tk.Invalid("UDC Config: Missing `maturity_model` key.")
82 if not isinstance(config["maturity_model"], list):
83 raise tk.Invalid(
84 f"UDC Config: Expecting `maturity_model` to be a JSON List but got `{config['maturity_model'].__class__.__name__}`"
85 )
87 used_fields = set()
89 for level_idx, level in enumerate(config["maturity_model"], start=1):
90 if not ("title" in level and "name" in level and "fields" in level):
91 raise tk.Invalid(
92 'Malformed UDC Config: "title", "name" and "fields" are required for each level.'
93 )
95 if not isinstance(level["fields"], list):
96 raise tk.Invalid(
97 f"Malformed UDC Config: `fields` in level `{level.get('name','?')}` must be a list."
98 )
100 for field_idx, field in enumerate(level["fields"], start=1):
101 field_path = f"maturity_model[{level_idx-1}].fields[{field_idx-1}]"
103 # CKAN mapped field
104 if "ckanField" in field:
105 if field["ckanField"] not in SUPPORTED_CKAN_FIELDS:
106 raise tk.Invalid(
107 f"Malformed UDC Config: The provided CKAN field `{field['ckanField']}` is not supported."
108 )
109 if field["ckanField"] in used_fields:
110 raise tk.Invalid(
111 f"Malformed UDC Config: The provided CKAN field `{field['ckanField']}` is duplicated."
112 )
113 used_fields.add(field["ckanField"])
115 # Optional bilingual texts for CKAN fields too
116 _validate_localized_field(field, "label", field_path)
117 _validate_localized_field(field, "short_description", field_path)
118 _validate_localized_field(field, "long_description", field_path)
120 # Custom field
121 else:
122 if not ("name" in field and "label" in field):
123 raise tk.Invalid(
124 "Malformed UDC Config: `name` and `label` is required for custom field."
125 )
127 # name must be alphanumeric/underscore
128 if re.match(r"^\w+$", field["name"]) is None:
129 raise tk.Invalid(
130 f"Malformed UDC Config: The provided field name `{field['name']}` is not alpha-numeric."
131 )
133 # label must support bilingual (string or {en, fr})
134 _validate_localized_field(field, "label", field_path)
136 # Optional bilingual descriptions
137 _validate_localized_field(field, "short_description", field_path)
138 _validate_localized_field(field, "long_description", field_path)
140 # type (if present) must be supported
141 if (
142 field.get("type") is not None
143 and field["type"] not in SUPPORTED_FIELD_TYPES
144 ):
145 raise tk.Invalid(
146 f"Malformed UDC Config: The provided field type `{field['type']}` is not supported."
147 )
149 # duplicates
150 if field["name"] in used_fields:
151 raise tk.Invalid(
152 f"Malformed UDC Config: The provided field `{field['name']}` is duplicated."
153 )
154 used_fields.add(field["name"])
156 # Non-blocking checks for select options (allow bilingual `text`)
157 if field.get("type") in ("single_select", "multiple_select"):
158 if "options" in field:
159 if not isinstance(field["options"], list):
160 raise tk.Invalid(
161 f"Malformed UDC Config: `{field_path}.options` must be a list."
162 )
163 for opt in field["options"]:
164 if not isinstance(opt, dict):
165 raise tk.Invalid(
166 f"Malformed UDC Config: `{field_path}.options[]` items must be objects."
167 )
168 if "value" not in opt:
169 raise tk.Invalid(
170 f"Malformed UDC Config: `{field_path}.options[]` missing `value`."
171 )
172 if "text" not in opt:
173 raise tk.Invalid(
174 f"Malformed UDC Config: `{field_path}.options[]` missing `text`."
175 )
176 # Allow string or localized dict for text
177 if not (_is_localized_text(opt["text"])):
178 raise tk.Invalid(
179 f"Malformed UDC Config: `{field_path}.options[].text` must be a string or localized object."
180 )
181 if "optionsFromQuery" in field:
182 ofq = field["optionsFromQuery"]
183 if not isinstance(ofq, dict):
184 raise tk.Invalid(
185 f"Malformed UDC Config: `{field_path}.optionsFromQuery` must be an object."
186 )
187 for k in ("text", "value", "query"):
188 if k not in ofq or not isinstance(ofq[k], str):
189 raise tk.Invalid(
190 f"Malformed UDC Config: `{field_path}.optionsFromQuery.{k}` must be a string."
191 )
193 # Boolean sanity check
194 if "enable_filter_logic_toggle" in field and not isinstance(
195 field["enable_filter_logic_toggle"], bool
196 ):
197 raise tk.Invalid(
198 "Malformed UDC Config: `enable_filter_logic_toggle` must be a boolean."
199 )
201 # Check required CKAN fields
202 for field_name in REQUIRED_CKAN_FIELDS:
203 if field_name not in used_fields:
204 raise tk.Invalid(
205 f"Malformed UDC Config: Missing the required CKAN field `{field_name}`."
206 )
208 return config_str
211def udc_mapping_validator(mapping_str):
212 try:
213 mapping = json.loads(mapping_str)
214 except:
215 raise tk.Invalid("UDC Mapping: Malformed JSON Format.")
216 if not isinstance(mapping, dict):
217 raise tk.Invalid(
218 f"UDC Mapping: Expecting a JSON Object but got `{mapping.__class__.__name__}`"
219 )
221 if not mapping.get("namespaces"):
222 raise tk.Invalid("UDC Mapping: Missing namespaces field.")
223 if not mapping.get("mappings"):
224 raise tk.Invalid("UDC Mapping: Missing mappings field.")
225 return mapping_str