Coverage for ckanext/udc/solr/index.py: 56%
167 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-03-30 22:15 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-03-30 22:15 +0000
1from __future__ import annotations
2from typing import Union, Any
3import json
4import logging
5import ckan.plugins.toolkit as tk
6import ckan.plugins as plugins
7from .config import get_udc_langs
9log = logging.getLogger(__name__)
12def _jsonish(v):
13 if isinstance(v, dict):
14 return v
15 if isinstance(v, str):
16 try:
17 return json.loads(v)
18 except Exception:
19 return None
20 return None
23def _tag_names(core_tags):
24 out = []
25 for t in core_tags or []:
26 if isinstance(t, dict) and t.get("name"):
27 out.append(t["name"])
28 elif isinstance(t, str):
29 out.append(t)
30 return out
33def _safe_json_load(value: Any) -> Any:
34 """Parse JSON from a string, or pass through dict / list / None.
36 If parsing fails, return None rather than raising.
37 """
38 if value is None:
39 return None
40 if isinstance(value, (dict, list)):
41 return value
42 if isinstance(value, str):
43 try:
44 return json.loads(value)
45 except Exception:
46 return None
47 return None
50def _extract_version_single(raw: Any) -> dict[str, Any] | None:
51 """Normalize a single version JSON object.
53 Expected shape (best-effort): {"url": str, "title": str, "description": str}.
54 Accept bare strings (treated as url) and dicts with at least url or title.
55 """
56 # Already a dict with something useful
57 if isinstance(raw, dict):
58 url = raw.get("url") or raw.get("href") or ""
59 title = raw.get("title") or ""
60 desc = raw.get("description") or raw.get("notes") or ""
61 if not (url or title or desc):
62 return None
63 out: dict[str, Any] = {}
64 if url:
65 out["url"] = url
66 if title:
67 out["title"] = title
68 if desc:
69 out["description"] = desc
70 return out or None
72 # Bare string -> treat as URL
73 if isinstance(raw, str) and raw.strip():
74 return {"url": raw.strip()}
76 return None
79def _extract_version_list(raw: Any) -> list[dict[str, Any]]:
80 """Normalize list-like version field into a list of objects."""
81 if raw is None:
82 return []
83 if isinstance(raw, str):
84 parsed = _safe_json_load(raw)
85 else:
86 parsed = raw
88 items: list[dict[str, Any]] = []
89 if isinstance(parsed, list):
90 for item in parsed:
91 norm = _extract_version_single(item)
92 if norm:
93 items.append(norm)
94 else:
95 norm = _extract_version_single(parsed)
96 if norm:
97 items.append(norm)
98 return items
101def before_dataset_index(pkg_dict: dict[str, Any]) -> dict[str, Any]:
103 log.info("Running before_dataset_index hook")
104 log.info("Original document: %s", json.dumps(pkg_dict, indent=2, ensure_ascii=True))
106 # Get the UDC plugin instance
107 udcPlugin = plugins.get_plugin('udc')
109 # Make a shallow copy so we don't mutate CKAN's original
110 index = dict(pkg_dict)
112 # Do not index related packages
113 index.pop("related_packages", None)
115 langs = get_udc_langs()
116 default_lang = langs[0]
118 # multiple_select -> extras_<name> (array)
119 for field in udcPlugin.multiple_select_fields or []:
120 if field in index and isinstance(index[field], str):
121 index["extras_" + field] = [v for v in index[field].split(",") if v.strip()]
123 # CORE: title / notes translated
124 title_t = _jsonish(index.get("title_translated")) or {}
125 notes_t = _jsonish(index.get("notes_translated")) or {}
126 if default_lang not in title_t and isinstance(index.get("title"), str):
127 title_t[default_lang] = index["title"]
128 if default_lang not in notes_t and isinstance(index.get("notes"), str):
129 notes_t[default_lang] = index["notes"]
131 for L in langs:
132 v = title_t.get(L)
133 if isinstance(v, str) and v.strip():
134 index[f"title_{L}_txt"] = v
135 v = notes_t.get(L)
136 if isinstance(v, str) and v.strip():
137 index[f"notes_{L}_txt"] = v
139 # tags translated -> facet (and partial text search)
140 # Build tags_translated from provided JSON or seed from core tags for default_lang
141 tags_t = _jsonish(index.get("tags_translated")) or {}
142 if default_lang not in tags_t:
143 core_tags = _tag_names(index.get("tags"))
144 if core_tags:
145 tags_t[default_lang] = core_tags
147 for L in langs:
148 arr = tags_t.get(L) or []
149 if isinstance(arr, str):
150 arr = [arr]
151 arr = [a for a in arr if isinstance(a, str) and a.strip()]
152 if arr:
153 index[f"tags_{L}_f"] = arr
154 # Make tags searchable per language:
155 index[f"tags_{L}_txt"] = " ".join(arr)
157 # maturity model TEXT fields (multilingual JSON in-place)
159 for name in udcPlugin.text_fields:
160 raw = index.get(name)
161 if raw is None and f"extras_{name}" in index:
162 raw = index.get(f"extras_{name}")
163 parsed = _jsonish(raw)
164 if isinstance(parsed, dict):
165 obj = parsed
166 elif isinstance(raw, str) and raw.strip():
167 obj = {default_lang: raw}
168 else:
169 obj = {}
170 # Prevent JSON from going into 'extras_*' (default schema would copy to 'text')
171 index.pop(f"extras_{name}", None)
173 # Write language-aware text & facets
174 for L in langs:
175 v = obj.get(L)
176 if isinstance(v, str) and v.strip():
177 index[f"{name}_{L}_txt"] = v
178 index[f"{name}_{L}_f"] = [v]
180 # Version relationship fields
181 # ----------------------------
182 # These are stored canonically as JSON in extras `version_dataset`
183 # and `dataset_versions`. At index-time we derive URL-only and
184 # label fields suitable for filtering and display in facets/search.
186 # Drop raw extras to avoid indexing schema conflicts.
187 index.pop("extras_version_dataset", None)
188 index.pop("extras_dataset_versions", None)
190 # Single "is version of" target
191 raw_version_dataset = index.get("version_dataset")
192 vd_obj: dict[str, Any] | None = None
193 if raw_version_dataset is not None:
194 parsed = _safe_json_load(raw_version_dataset)
195 vd_obj = _extract_version_single(parsed)
197 version_dataset_url_val = None
198 version_dataset_label_val = None
199 if vd_obj:
200 url = vd_obj.get("url") or ""
201 title = vd_obj.get("title") or ""
202 desc = vd_obj.get("description") or ""
204 if url:
205 version_dataset_url_val = url
206 # Title for label: prefer explicit, otherwise fall back to description
207 label_title = title or desc or ""
208 if version_dataset_url_val and label_title:
209 version_dataset_label_val = f"{label_title} ({version_dataset_url_val})"
211 if version_dataset_url_val:
212 index["version_dataset_url"] = version_dataset_url_val
213 if version_dataset_label_val:
214 index["version_dataset_title_url"] = version_dataset_label_val
216 # Multiple "has version" targets
217 raw_dataset_versions = index.get("dataset_versions")
218 dv_list = _safe_json_load(raw_dataset_versions)
219 dv_items = _extract_version_list(dv_list)
221 urls: list[str] = []
222 labels: list[str] = []
223 for item in dv_items:
224 url = item.get("url") or ""
225 title = item.get("title") or ""
226 desc = item.get("description") or ""
227 if not (url or title or desc):
228 continue
229 if url:
230 urls.append(url)
231 label_title = title or desc or ""
232 if url and label_title:
233 labels.append(f"{label_title} ({url})")
235 if urls:
236 index["dataset_versions_url"] = urls
237 if labels:
238 index["dataset_versions_title_url"] = labels
241 # Pretty-print the final indexed document for debugging
242 log.info("Indexed document: %s", json.dumps(index, indent=2, ensure_ascii=True))
244 return index