Coverage for ckanext/udc/validator.py: 12%

94 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-03-30 22:15 +0000

1from __future__ import annotations 

2 

3import json 

4import re 

5 

6import ckan.plugins.toolkit as tk 

7 

8 

9# CKAN Config Validator 

10SUPPORTED_CKAN_FIELDS = [ 

11 "title", 

12 "description", 

13 "tags", 

14 "license_id", 

15 "organization_and_visibility", 

16 "source", 

17 "version", 

18 "author", 

19 "author_email", 

20 "maintainer", 

21 "maintainer_email", 

22 "portal_type", 

23 "custom_fields", 

24] 

25REQUIRED_CKAN_FIELDS = [ 

26 "title", 

27 "organization_and_visibility", 

28] 

29SUPPORTED_FIELD_TYPES = [ 

30 "text", 

31 "date", 

32 "datetime", 

33 "time", 

34 "number", 

35 "single_select", 

36 "multiple_select", 

37 # Relationship / versioning helpers handled specially by the UI/schema 

38 "single_dataset", 

39 "multiple_datasets", 

40] 

41 

42 

43def _is_localized_text(val): 

44 """ 

45 Accept either: 

46 - a plain string, or 

47 - a dict of locale -> string (must include 'en'; 'fr' optional) 

48 """ 

49 if isinstance(val, str): 

50 return True 

51 if isinstance(val, dict): 

52 if "en" not in val: 

53 return False 

54 # all provided locale values must be strings (allow empty) 

55 return all(isinstance(v, str) for v in val.values()) 

56 return False 

57 

58 

59def _validate_localized_field(field_obj, key, field_path): 

60 """ 

61 If key exists in field_obj, ensure it's a valid localized text (string or {en:..., fr:...}). 

62 """ 

63 if key in field_obj and not _is_localized_text(field_obj[key]): 

64 raise tk.Invalid( 

65 f"Malformed UDC Config: `{field_path}.{key}` must be a string or an object like " 

66 f'{{"en": "...", "fr": "..."}} with string values (and must include "en").' 

67 ) 

68 

69 

70def udc_config_validator(config_str): 

71 """ 

72 Check whether the UDC config is valid. 

73 Raise a `tk.Invalid` Error when config is not valid, otherwise return the original config string. 

74 """ 

75 try: 

76 config = json.loads(config_str) 

77 except Exception: 

78 raise tk.Invalid("UDC Config: Malformed JSON Format.") 

79 

80 if "maturity_model" not in config: 

81 raise tk.Invalid("UDC Config: Missing `maturity_model` key.") 

82 

83 if not isinstance(config["maturity_model"], list): 

84 raise tk.Invalid( 

85 f"UDC Config: Expecting `maturity_model` to be a JSON List but got `{config['maturity_model'].__class__.__name__}`" 

86 ) 

87 

88 used_fields = set() 

89 

90 for level_idx, level in enumerate(config["maturity_model"], start=1): 

91 if not ("title" in level and "name" in level and "fields" in level): 

92 raise tk.Invalid( 

93 'Malformed UDC Config: "title", "name" and "fields" are required for each level.' 

94 ) 

95 

96 if not isinstance(level["fields"], list): 

97 raise tk.Invalid( 

98 f"Malformed UDC Config: `fields` in level `{level.get('name','?')}` must be a list." 

99 ) 

100 

101 for field_idx, field in enumerate(level["fields"], start=1): 

102 field_path = f"maturity_model[{level_idx-1}].fields[{field_idx-1}]" 

103 

104 # CKAN mapped field 

105 if "ckanField" in field: 

106 if field["ckanField"] not in SUPPORTED_CKAN_FIELDS: 

107 raise tk.Invalid( 

108 f"Malformed UDC Config: The provided CKAN field `{field['ckanField']}` is not supported." 

109 ) 

110 if field["ckanField"] in used_fields: 

111 raise tk.Invalid( 

112 f"Malformed UDC Config: The provided CKAN field `{field['ckanField']}` is duplicated." 

113 ) 

114 used_fields.add(field["ckanField"]) 

115 

116 # Optional bilingual texts for CKAN fields too 

117 _validate_localized_field(field, "label", field_path) 

118 _validate_localized_field(field, "short_description", field_path) 

119 _validate_localized_field(field, "long_description", field_path) 

120 

121 # Custom field 

122 else: 

123 if not ("name" in field and "label" in field): 

124 raise tk.Invalid( 

125 "Malformed UDC Config: `name` and `label` is required for custom field." 

126 ) 

127 

128 # name must be alphanumeric/underscore 

129 if re.match(r"^\w+$", field["name"]) is None: 

130 raise tk.Invalid( 

131 f"Malformed UDC Config: The provided field name `{field['name']}` is not alpha-numeric." 

132 ) 

133 

134 # label must support bilingual (string or {en, fr}) 

135 _validate_localized_field(field, "label", field_path) 

136 

137 # Optional bilingual descriptions 

138 _validate_localized_field(field, "short_description", field_path) 

139 _validate_localized_field(field, "long_description", field_path) 

140 

141 # type (if present) must be supported 

142 if ( 

143 field.get("type") is not None 

144 and field["type"] not in SUPPORTED_FIELD_TYPES 

145 ): 

146 raise tk.Invalid( 

147 f"Malformed UDC Config: The provided field type `{field['type']}` is not supported." 

148 ) 

149 

150 # duplicates 

151 if field["name"] in used_fields: 

152 raise tk.Invalid( 

153 f"Malformed UDC Config: The provided field `{field['name']}` is duplicated." 

154 ) 

155 used_fields.add(field["name"]) 

156 

157 # Non-blocking checks for select options (allow bilingual `text`) 

158 if field.get("type") in ("single_select", "multiple_select"): 

159 if "options" in field: 

160 if not isinstance(field["options"], list): 

161 raise tk.Invalid( 

162 f"Malformed UDC Config: `{field_path}.options` must be a list." 

163 ) 

164 for opt in field["options"]: 

165 if not isinstance(opt, dict): 

166 raise tk.Invalid( 

167 f"Malformed UDC Config: `{field_path}.options[]` items must be objects." 

168 ) 

169 if "value" not in opt: 

170 raise tk.Invalid( 

171 f"Malformed UDC Config: `{field_path}.options[]` missing `value`." 

172 ) 

173 if "text" not in opt: 

174 raise tk.Invalid( 

175 f"Malformed UDC Config: `{field_path}.options[]` missing `text`." 

176 ) 

177 # Allow string or localized dict for text 

178 if not (_is_localized_text(opt["text"])): 

179 raise tk.Invalid( 

180 f"Malformed UDC Config: `{field_path}.options[].text` must be a string or localized object." 

181 ) 

182 if "optionsFromQuery" in field: 

183 ofq = field["optionsFromQuery"] 

184 if not isinstance(ofq, dict): 

185 raise tk.Invalid( 

186 f"Malformed UDC Config: `{field_path}.optionsFromQuery` must be an object." 

187 ) 

188 for k in ("text", "value", "query"): 

189 if k not in ofq or not isinstance(ofq[k], str): 

190 raise tk.Invalid( 

191 f"Malformed UDC Config: `{field_path}.optionsFromQuery.{k}` must be a string." 

192 ) 

193 

194 # Boolean sanity check 

195 if "enable_filter_logic_toggle" in field and not isinstance( 

196 field["enable_filter_logic_toggle"], bool 

197 ): 

198 raise tk.Invalid( 

199 "Malformed UDC Config: `enable_filter_logic_toggle` must be a boolean." 

200 ) 

201 

202 # Check required CKAN fields 

203 for field_name in REQUIRED_CKAN_FIELDS: 

204 if field_name not in used_fields: 

205 raise tk.Invalid( 

206 f"Malformed UDC Config: Missing the required CKAN field `{field_name}`." 

207 ) 

208 

209 return config_str 

210 

211 

212def udc_mapping_validator(mapping_str): 

213 try: 

214 mapping = json.loads(mapping_str) 

215 except: 

216 raise tk.Invalid("UDC Mapping: Malformed JSON Format.") 

217 if not isinstance(mapping, dict): 

218 raise tk.Invalid( 

219 f"UDC Mapping: Expecting a JSON Object but got `{mapping.__class__.__name__}`" 

220 ) 

221 

222 if not mapping.get("namespaces"): 

223 raise tk.Invalid("UDC Mapping: Missing namespaces field.") 

224 if not mapping.get("mappings"): 

225 raise tk.Invalid("UDC Mapping: Missing mappings field.") 

226 return mapping_str