Coverage for ckanext/udc/solr/index.py: 55%

162 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1from __future__ import annotations 

2from typing import Union, Any 

3import json 

4import logging 

5import ckan.plugins.toolkit as tk 

6import ckan.plugins as plugins 

7from .config import get_udc_langs 

8 

9log = logging.getLogger(__name__) 

10 

11 

12def _jsonish(v): 

13 if isinstance(v, dict): 

14 return v 

15 if isinstance(v, str): 

16 try: 

17 return json.loads(v) 

18 except Exception: 

19 return None 

20 return None 

21 

22 

23def _tag_names(core_tags): 

24 out = [] 

25 for t in core_tags or []: 

26 if isinstance(t, dict) and t.get("name"): 

27 out.append(t["name"]) 

28 elif isinstance(t, str): 

29 out.append(t) 

30 return out 

31 

32 

33def _safe_json_load(value: Any) -> Any: 

34 """Parse JSON from a string, or pass through dict / list / None. 

35 

36 If parsing fails, return None rather than raising. 

37 """ 

38 if value is None: 

39 return None 

40 if isinstance(value, (dict, list)): 

41 return value 

42 if isinstance(value, str): 

43 try: 

44 return json.loads(value) 

45 except Exception: 

46 return None 

47 return None 

48 

49 

50def _extract_version_single(raw: Any) -> dict[str, Any] | None: 

51 """Normalize a single version JSON object. 

52 

53 Expected shape (best-effort): {"url": str, "title": str, "description": str}. 

54 Accept bare strings (treated as url) and dicts with at least url or title. 

55 """ 

56 # Already a dict with something useful 

57 if isinstance(raw, dict): 

58 url = raw.get("url") or raw.get("href") or "" 

59 title = raw.get("title") or "" 

60 desc = raw.get("description") or raw.get("notes") or "" 

61 if not (url or title or desc): 

62 return None 

63 out: dict[str, Any] = {} 

64 if url: 

65 out["url"] = url 

66 if title: 

67 out["title"] = title 

68 if desc: 

69 out["description"] = desc 

70 return out or None 

71 

72 # Bare string -> treat as URL 

73 if isinstance(raw, str) and raw.strip(): 

74 return {"url": raw.strip()} 

75 

76 return None 

77 

78 

79def _extract_version_list(raw: Any) -> list[dict[str, Any]]: 

80 """Normalize list-like version field into a list of objects.""" 

81 if raw is None: 

82 return [] 

83 if isinstance(raw, str): 

84 parsed = _safe_json_load(raw) 

85 else: 

86 parsed = raw 

87 

88 items: list[dict[str, Any]] = [] 

89 if isinstance(parsed, list): 

90 for item in parsed: 

91 norm = _extract_version_single(item) 

92 if norm: 

93 items.append(norm) 

94 else: 

95 norm = _extract_version_single(parsed) 

96 if norm: 

97 items.append(norm) 

98 return items 

99 

100 

101def before_dataset_index(pkg_dict: dict[str, Any]) -> dict[str, Any]: 

102 

103 log.info("Running before_dataset_index hook") 

104 log.info("Original document: %s", json.dumps(pkg_dict, indent=2, ensure_ascii=True)) 

105 

106 # Get the UDC plugin instance 

107 udcPlugin = plugins.get_plugin('udc') 

108 

109 # Make a shallow copy so we don't mutate CKAN's original 

110 index = dict(pkg_dict) 

111 

112 # Do not index related packages 

113 index.pop("related_packages", None) 

114 

115 langs = get_udc_langs() 

116 default_lang = langs[0] 

117 

118 # multiple_select -> extras_<name> (array) 

119 for field in udcPlugin.multiple_select_fields or []: 

120 if field in index and isinstance(index[field], str): 

121 index["extras_" + field] = [v for v in index[field].split(",") if v.strip()] 

122 

123 # CORE: title / notes translated 

124 title_t = _jsonish(index.get("title_translated")) or {} 

125 notes_t = _jsonish(index.get("notes_translated")) or {} 

126 if default_lang not in title_t and isinstance(index.get("title"), str): 

127 title_t[default_lang] = index["title"] 

128 if default_lang not in notes_t and isinstance(index.get("notes"), str): 

129 notes_t[default_lang] = index["notes"] 

130 

131 for L in langs: 

132 v = title_t.get(L) 

133 if isinstance(v, str) and v.strip(): 

134 index[f"title_{L}_txt"] = v 

135 v = notes_t.get(L) 

136 if isinstance(v, str) and v.strip(): 

137 index[f"notes_{L}_txt"] = v 

138 

139 # tags translated -> facet (and partial text search) 

140 # Build tags_translated from provided JSON or seed from core tags for default_lang 

141 tags_t = _jsonish(index.get("tags_translated")) or {} 

142 if default_lang not in tags_t: 

143 core_tags = _tag_names(index.get("tags")) 

144 if core_tags: 

145 tags_t[default_lang] = core_tags 

146 

147 for L in langs: 

148 arr = tags_t.get(L) or [] 

149 if isinstance(arr, str): 

150 arr = [arr] 

151 arr = [a for a in arr if isinstance(a, str) and a.strip()] 

152 if arr: 

153 index[f"tags_{L}_f"] = arr 

154 # Make tags searchable per language: 

155 index[f"tags_{L}_txt"] = " ".join(arr) 

156 

157 # maturity model TEXT fields (multilingual JSON in-place) 

158 

159 for name in udcPlugin.text_fields: 

160 raw = index.get(name) 

161 if raw is None and f"extras_{name}" in index: 

162 raw = index.get(f"extras_{name}") 

163 obj = _jsonish(raw) or {} 

164 # Prevent JSON from going into 'extras_*' (default schema would copy to 'text') 

165 index.pop(f"extras_{name}", None) 

166 

167 # Write language-aware text & facets 

168 for L in langs: 

169 v = obj.get(L) 

170 if isinstance(v, str) and v.strip(): 

171 index[f"{name}_{L}_txt"] = v 

172 index[f"{name}_{L}_f"] = [v] 

173 

174 # Version relationship fields 

175 # ---------------------------- 

176 # These are stored canonically as JSON in extras `version_dataset` 

177 # and `dataset_versions`. At index-time we derive URL-only and 

178 # label fields suitable for filtering and display in facets/search. 

179 

180 # Drop raw extras to avoid indexing schema conflicts. 

181 index.pop("extras_version_dataset", None) 

182 index.pop("extras_dataset_versions", None) 

183 

184 # Single "is version of" target 

185 raw_version_dataset = index.get("version_dataset") 

186 vd_obj: dict[str, Any] | None = None 

187 if raw_version_dataset is not None: 

188 parsed = _safe_json_load(raw_version_dataset) 

189 vd_obj = _extract_version_single(parsed) 

190 

191 version_dataset_url_val = None 

192 version_dataset_label_val = None 

193 if vd_obj: 

194 url = vd_obj.get("url") or "" 

195 title = vd_obj.get("title") or "" 

196 desc = vd_obj.get("description") or "" 

197 

198 if url: 

199 version_dataset_url_val = url 

200 # Title for label: prefer explicit, otherwise fall back to description 

201 label_title = title or desc or "" 

202 if version_dataset_url_val and label_title: 

203 version_dataset_label_val = f"{label_title} ({version_dataset_url_val})" 

204 

205 if version_dataset_url_val: 

206 index["version_dataset_url"] = version_dataset_url_val 

207 if version_dataset_label_val: 

208 index["version_dataset_title_url"] = version_dataset_label_val 

209 

210 # Multiple "has version" targets 

211 raw_dataset_versions = index.get("dataset_versions") 

212 dv_list = _safe_json_load(raw_dataset_versions) 

213 dv_items = _extract_version_list(dv_list) 

214 

215 urls: list[str] = [] 

216 labels: list[str] = [] 

217 for item in dv_items: 

218 url = item.get("url") or "" 

219 title = item.get("title") or "" 

220 desc = item.get("description") or "" 

221 if not (url or title or desc): 

222 continue 

223 if url: 

224 urls.append(url) 

225 label_title = title or desc or "" 

226 if url and label_title: 

227 labels.append(f"{label_title} ({url})") 

228 

229 if urls: 

230 index["dataset_versions_url"] = urls 

231 if labels: 

232 index["dataset_versions_title_url"] = labels 

233 

234 

235 # Pretty-print the final indexed document for debugging 

236 log.info("Indexed document: %s", json.dumps(index, indent=2, ensure_ascii=True)) 

237 

238 return index