Coverage for ckanext/udc/solr/index.py: 55%
162 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1from __future__ import annotations
2from typing import Union, Any
3import json
4import logging
5import ckan.plugins.toolkit as tk
6import ckan.plugins as plugins
7from .config import get_udc_langs
9log = logging.getLogger(__name__)
12def _jsonish(v):
13 if isinstance(v, dict):
14 return v
15 if isinstance(v, str):
16 try:
17 return json.loads(v)
18 except Exception:
19 return None
20 return None
23def _tag_names(core_tags):
24 out = []
25 for t in core_tags or []:
26 if isinstance(t, dict) and t.get("name"):
27 out.append(t["name"])
28 elif isinstance(t, str):
29 out.append(t)
30 return out
33def _safe_json_load(value: Any) -> Any:
34 """Parse JSON from a string, or pass through dict / list / None.
36 If parsing fails, return None rather than raising.
37 """
38 if value is None:
39 return None
40 if isinstance(value, (dict, list)):
41 return value
42 if isinstance(value, str):
43 try:
44 return json.loads(value)
45 except Exception:
46 return None
47 return None
50def _extract_version_single(raw: Any) -> dict[str, Any] | None:
51 """Normalize a single version JSON object.
53 Expected shape (best-effort): {"url": str, "title": str, "description": str}.
54 Accept bare strings (treated as url) and dicts with at least url or title.
55 """
56 # Already a dict with something useful
57 if isinstance(raw, dict):
58 url = raw.get("url") or raw.get("href") or ""
59 title = raw.get("title") or ""
60 desc = raw.get("description") or raw.get("notes") or ""
61 if not (url or title or desc):
62 return None
63 out: dict[str, Any] = {}
64 if url:
65 out["url"] = url
66 if title:
67 out["title"] = title
68 if desc:
69 out["description"] = desc
70 return out or None
72 # Bare string -> treat as URL
73 if isinstance(raw, str) and raw.strip():
74 return {"url": raw.strip()}
76 return None
79def _extract_version_list(raw: Any) -> list[dict[str, Any]]:
80 """Normalize list-like version field into a list of objects."""
81 if raw is None:
82 return []
83 if isinstance(raw, str):
84 parsed = _safe_json_load(raw)
85 else:
86 parsed = raw
88 items: list[dict[str, Any]] = []
89 if isinstance(parsed, list):
90 for item in parsed:
91 norm = _extract_version_single(item)
92 if norm:
93 items.append(norm)
94 else:
95 norm = _extract_version_single(parsed)
96 if norm:
97 items.append(norm)
98 return items
101def before_dataset_index(pkg_dict: dict[str, Any]) -> dict[str, Any]:
103 log.info("Running before_dataset_index hook")
104 log.info("Original document: %s", json.dumps(pkg_dict, indent=2, ensure_ascii=True))
106 # Get the UDC plugin instance
107 udcPlugin = plugins.get_plugin('udc')
109 # Make a shallow copy so we don't mutate CKAN's original
110 index = dict(pkg_dict)
112 # Do not index related packages
113 index.pop("related_packages", None)
115 langs = get_udc_langs()
116 default_lang = langs[0]
118 # multiple_select -> extras_<name> (array)
119 for field in udcPlugin.multiple_select_fields or []:
120 if field in index and isinstance(index[field], str):
121 index["extras_" + field] = [v for v in index[field].split(",") if v.strip()]
123 # CORE: title / notes translated
124 title_t = _jsonish(index.get("title_translated")) or {}
125 notes_t = _jsonish(index.get("notes_translated")) or {}
126 if default_lang not in title_t and isinstance(index.get("title"), str):
127 title_t[default_lang] = index["title"]
128 if default_lang not in notes_t and isinstance(index.get("notes"), str):
129 notes_t[default_lang] = index["notes"]
131 for L in langs:
132 v = title_t.get(L)
133 if isinstance(v, str) and v.strip():
134 index[f"title_{L}_txt"] = v
135 v = notes_t.get(L)
136 if isinstance(v, str) and v.strip():
137 index[f"notes_{L}_txt"] = v
139 # tags translated -> facet (and partial text search)
140 # Build tags_translated from provided JSON or seed from core tags for default_lang
141 tags_t = _jsonish(index.get("tags_translated")) or {}
142 if default_lang not in tags_t:
143 core_tags = _tag_names(index.get("tags"))
144 if core_tags:
145 tags_t[default_lang] = core_tags
147 for L in langs:
148 arr = tags_t.get(L) or []
149 if isinstance(arr, str):
150 arr = [arr]
151 arr = [a for a in arr if isinstance(a, str) and a.strip()]
152 if arr:
153 index[f"tags_{L}_f"] = arr
154 # Make tags searchable per language:
155 index[f"tags_{L}_txt"] = " ".join(arr)
157 # maturity model TEXT fields (multilingual JSON in-place)
159 for name in udcPlugin.text_fields:
160 raw = index.get(name)
161 if raw is None and f"extras_{name}" in index:
162 raw = index.get(f"extras_{name}")
163 obj = _jsonish(raw) or {}
164 # Prevent JSON from going into 'extras_*' (default schema would copy to 'text')
165 index.pop(f"extras_{name}", None)
167 # Write language-aware text & facets
168 for L in langs:
169 v = obj.get(L)
170 if isinstance(v, str) and v.strip():
171 index[f"{name}_{L}_txt"] = v
172 index[f"{name}_{L}_f"] = [v]
174 # Version relationship fields
175 # ----------------------------
176 # These are stored canonically as JSON in extras `version_dataset`
177 # and `dataset_versions`. At index-time we derive URL-only and
178 # label fields suitable for filtering and display in facets/search.
180 # Drop raw extras to avoid indexing schema conflicts.
181 index.pop("extras_version_dataset", None)
182 index.pop("extras_dataset_versions", None)
184 # Single "is version of" target
185 raw_version_dataset = index.get("version_dataset")
186 vd_obj: dict[str, Any] | None = None
187 if raw_version_dataset is not None:
188 parsed = _safe_json_load(raw_version_dataset)
189 vd_obj = _extract_version_single(parsed)
191 version_dataset_url_val = None
192 version_dataset_label_val = None
193 if vd_obj:
194 url = vd_obj.get("url") or ""
195 title = vd_obj.get("title") or ""
196 desc = vd_obj.get("description") or ""
198 if url:
199 version_dataset_url_val = url
200 # Title for label: prefer explicit, otherwise fall back to description
201 label_title = title or desc or ""
202 if version_dataset_url_val and label_title:
203 version_dataset_label_val = f"{label_title} ({version_dataset_url_val})"
205 if version_dataset_url_val:
206 index["version_dataset_url"] = version_dataset_url_val
207 if version_dataset_label_val:
208 index["version_dataset_title_url"] = version_dataset_label_val
210 # Multiple "has version" targets
211 raw_dataset_versions = index.get("dataset_versions")
212 dv_list = _safe_json_load(raw_dataset_versions)
213 dv_items = _extract_version_list(dv_list)
215 urls: list[str] = []
216 labels: list[str] = []
217 for item in dv_items:
218 url = item.get("url") or ""
219 title = item.get("title") or ""
220 desc = item.get("description") or ""
221 if not (url or title or desc):
222 continue
223 if url:
224 urls.append(url)
225 label_title = title or desc or ""
226 if url and label_title:
227 labels.append(f"{label_title} ({url})")
229 if urls:
230 index["dataset_versions_url"] = urls
231 if labels:
232 index["dataset_versions_title_url"] = labels
235 # Pretty-print the final indexed document for debugging
236 log.info("Indexed document: %s", json.dumps(index, indent=2, ensure_ascii=True))
238 return index