Coverage for ckanext/udc/validator.py: 12%

94 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1from __future__ import annotations 

2 

3import json 

4import re 

5 

6import ckan.plugins.toolkit as tk 

7 

8 

9# CKAN Config Validator 

10SUPPORTED_CKAN_FIELDS = [ 

11 "title", 

12 "description", 

13 "tags", 

14 "license_id", 

15 "organization_and_visibility", 

16 "source", 

17 "version", 

18 "author", 

19 "author_email", 

20 "maintainer", 

21 "maintainer_email", 

22 "custom_fields", 

23] 

24REQUIRED_CKAN_FIELDS = [ 

25 "title", 

26 "organization_and_visibility", 

27] 

28SUPPORTED_FIELD_TYPES = [ 

29 "text", 

30 "date", 

31 "datetime", 

32 "time", 

33 "number", 

34 "single_select", 

35 "multiple_select", 

36 # Relationship / versioning helpers handled specially by the UI/schema 

37 "single_dataset", 

38 "multiple_datasets", 

39] 

40 

41 

42def _is_localized_text(val): 

43 """ 

44 Accept either: 

45 - a plain string, or 

46 - a dict of locale -> string (must include 'en'; 'fr' optional) 

47 """ 

48 if isinstance(val, str): 

49 return True 

50 if isinstance(val, dict): 

51 if "en" not in val: 

52 return False 

53 # all provided locale values must be strings (allow empty) 

54 return all(isinstance(v, str) for v in val.values()) 

55 return False 

56 

57 

58def _validate_localized_field(field_obj, key, field_path): 

59 """ 

60 If key exists in field_obj, ensure it's a valid localized text (string or {en:..., fr:...}). 

61 """ 

62 if key in field_obj and not _is_localized_text(field_obj[key]): 

63 raise tk.Invalid( 

64 f"Malformed UDC Config: `{field_path}.{key}` must be a string or an object like " 

65 f'{{"en": "...", "fr": "..."}} with string values (and must include "en").' 

66 ) 

67 

68 

69def udc_config_validator(config_str): 

70 """ 

71 Check whether the UDC config is valid. 

72 Raise a `tk.Invalid` Error when config is not valid, otherwise return the original config string. 

73 """ 

74 try: 

75 config = json.loads(config_str) 

76 except Exception: 

77 raise tk.Invalid("UDC Config: Malformed JSON Format.") 

78 

79 if "maturity_model" not in config: 

80 raise tk.Invalid("UDC Config: Missing `maturity_model` key.") 

81 

82 if not isinstance(config["maturity_model"], list): 

83 raise tk.Invalid( 

84 f"UDC Config: Expecting `maturity_model` to be a JSON List but got `{config['maturity_model'].__class__.__name__}`" 

85 ) 

86 

87 used_fields = set() 

88 

89 for level_idx, level in enumerate(config["maturity_model"], start=1): 

90 if not ("title" in level and "name" in level and "fields" in level): 

91 raise tk.Invalid( 

92 'Malformed UDC Config: "title", "name" and "fields" are required for each level.' 

93 ) 

94 

95 if not isinstance(level["fields"], list): 

96 raise tk.Invalid( 

97 f"Malformed UDC Config: `fields` in level `{level.get('name','?')}` must be a list." 

98 ) 

99 

100 for field_idx, field in enumerate(level["fields"], start=1): 

101 field_path = f"maturity_model[{level_idx-1}].fields[{field_idx-1}]" 

102 

103 # CKAN mapped field 

104 if "ckanField" in field: 

105 if field["ckanField"] not in SUPPORTED_CKAN_FIELDS: 

106 raise tk.Invalid( 

107 f"Malformed UDC Config: The provided CKAN field `{field['ckanField']}` is not supported." 

108 ) 

109 if field["ckanField"] in used_fields: 

110 raise tk.Invalid( 

111 f"Malformed UDC Config: The provided CKAN field `{field['ckanField']}` is duplicated." 

112 ) 

113 used_fields.add(field["ckanField"]) 

114 

115 # Optional bilingual texts for CKAN fields too 

116 _validate_localized_field(field, "label", field_path) 

117 _validate_localized_field(field, "short_description", field_path) 

118 _validate_localized_field(field, "long_description", field_path) 

119 

120 # Custom field 

121 else: 

122 if not ("name" in field and "label" in field): 

123 raise tk.Invalid( 

124 "Malformed UDC Config: `name` and `label` is required for custom field." 

125 ) 

126 

127 # name must be alphanumeric/underscore 

128 if re.match(r"^\w+$", field["name"]) is None: 

129 raise tk.Invalid( 

130 f"Malformed UDC Config: The provided field name `{field['name']}` is not alpha-numeric." 

131 ) 

132 

133 # label must support bilingual (string or {en, fr}) 

134 _validate_localized_field(field, "label", field_path) 

135 

136 # Optional bilingual descriptions 

137 _validate_localized_field(field, "short_description", field_path) 

138 _validate_localized_field(field, "long_description", field_path) 

139 

140 # type (if present) must be supported 

141 if ( 

142 field.get("type") is not None 

143 and field["type"] not in SUPPORTED_FIELD_TYPES 

144 ): 

145 raise tk.Invalid( 

146 f"Malformed UDC Config: The provided field type `{field['type']}` is not supported." 

147 ) 

148 

149 # duplicates 

150 if field["name"] in used_fields: 

151 raise tk.Invalid( 

152 f"Malformed UDC Config: The provided field `{field['name']}` is duplicated." 

153 ) 

154 used_fields.add(field["name"]) 

155 

156 # Non-blocking checks for select options (allow bilingual `text`) 

157 if field.get("type") in ("single_select", "multiple_select"): 

158 if "options" in field: 

159 if not isinstance(field["options"], list): 

160 raise tk.Invalid( 

161 f"Malformed UDC Config: `{field_path}.options` must be a list." 

162 ) 

163 for opt in field["options"]: 

164 if not isinstance(opt, dict): 

165 raise tk.Invalid( 

166 f"Malformed UDC Config: `{field_path}.options[]` items must be objects." 

167 ) 

168 if "value" not in opt: 

169 raise tk.Invalid( 

170 f"Malformed UDC Config: `{field_path}.options[]` missing `value`." 

171 ) 

172 if "text" not in opt: 

173 raise tk.Invalid( 

174 f"Malformed UDC Config: `{field_path}.options[]` missing `text`." 

175 ) 

176 # Allow string or localized dict for text 

177 if not (_is_localized_text(opt["text"])): 

178 raise tk.Invalid( 

179 f"Malformed UDC Config: `{field_path}.options[].text` must be a string or localized object." 

180 ) 

181 if "optionsFromQuery" in field: 

182 ofq = field["optionsFromQuery"] 

183 if not isinstance(ofq, dict): 

184 raise tk.Invalid( 

185 f"Malformed UDC Config: `{field_path}.optionsFromQuery` must be an object." 

186 ) 

187 for k in ("text", "value", "query"): 

188 if k not in ofq or not isinstance(ofq[k], str): 

189 raise tk.Invalid( 

190 f"Malformed UDC Config: `{field_path}.optionsFromQuery.{k}` must be a string." 

191 ) 

192 

193 # Boolean sanity check 

194 if "enable_filter_logic_toggle" in field and not isinstance( 

195 field["enable_filter_logic_toggle"], bool 

196 ): 

197 raise tk.Invalid( 

198 "Malformed UDC Config: `enable_filter_logic_toggle` must be a boolean." 

199 ) 

200 

201 # Check required CKAN fields 

202 for field_name in REQUIRED_CKAN_FIELDS: 

203 if field_name not in used_fields: 

204 raise tk.Invalid( 

205 f"Malformed UDC Config: Missing the required CKAN field `{field_name}`." 

206 ) 

207 

208 return config_str 

209 

210 

211def udc_mapping_validator(mapping_str): 

212 try: 

213 mapping = json.loads(mapping_str) 

214 except: 

215 raise tk.Invalid("UDC Mapping: Malformed JSON Format.") 

216 if not isinstance(mapping, dict): 

217 raise tk.Invalid( 

218 f"UDC Mapping: Expecting a JSON Object but got `{mapping.__class__.__name__}`" 

219 ) 

220 

221 if not mapping.get("namespaces"): 

222 raise tk.Invalid("UDC Mapping: Missing namespaces field.") 

223 if not mapping.get("mappings"): 

224 raise tk.Invalid("UDC Mapping: Missing mappings field.") 

225 return mapping_str