Coverage for ckanext/udc/organization/actions.py: 15%
182 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-03-30 22:15 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-03-30 22:15 +0000
1"""Organization management actions for UDC."""
2from __future__ import annotations
4from typing import Any
6from sqlalchemy import or_
8import ckan.plugins.toolkit as tk
9from ckan import model
10from ckan.logic import NotFound
11from ckan.types import Context
12from ckanext.activity.model.activity import Activity
15@tk.side_effect_free
16def udc_organization_list(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
17 """List organizations with pagination and filters (sysadmin only)."""
18 tk.check_access("udc_organization_list", context, data_dict)
20 page = int(data_dict.get("page", 1) or 1)
21 page_size = int(data_dict.get("page_size", 25) or 25)
22 filters = data_dict.get("filters") or {}
24 query = model.Session.query(model.Group).filter(model.Group.is_organization.is_(True))
25 query = _apply_org_filters(query, filters)
26 total = query.count()
28 orgs = (
29 query.order_by(model.Group.name.asc())
30 .offset((page - 1) * page_size)
31 .limit(page_size)
32 .all()
33 )
35 return {
36 "results": [_org_to_dict(org) for org in orgs],
37 "total": total,
38 "page": page,
39 "page_size": page_size,
40 }
43@tk.side_effect_free
44def udc_deleted_organization_list(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
45 """List deleted organizations with pagination and filters (sysadmin only)."""
46 tk.check_access("udc_deleted_organization_list", context, data_dict)
48 page = int(data_dict.get("page", 1) or 1)
49 page_size = int(data_dict.get("page_size", 25) or 25)
50 filters = data_dict.get("filters") or {}
51 filters = {**filters, "state": "deleted"}
53 query = model.Session.query(model.Group).filter(model.Group.is_organization.is_(True))
54 query = _apply_org_filters(query, filters)
55 total = query.count()
57 orgs = (
58 query.order_by(model.Group.name.asc())
59 .offset((page - 1) * page_size)
60 .limit(page_size)
61 .all()
62 )
64 return {
65 "results": [_org_to_dict(org) for org in orgs],
66 "total": total,
67 "page": page,
68 "page_size": page_size,
69 }
72@tk.side_effect_free
73def udc_organization_packages_list(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
74 """List packages for an organization with pagination and filters (sysadmin only)."""
75 tk.check_access("udc_organization_packages_list", context, data_dict)
77 org = _get_org(data_dict)
78 page = int(data_dict.get("page", 1) or 1)
79 page_size = int(data_dict.get("page_size", 25) or 25)
80 filters = data_dict.get("filters") or {}
82 query = model.Session.query(model.Package).filter(model.Package.owner_org == org.id)
83 query = _apply_package_filters(query, filters)
84 total = query.count()
86 packages = (
87 query.order_by(model.Package.name.asc())
88 .offset((page - 1) * page_size)
89 .limit(page_size)
90 .all()
91 )
93 return {
94 "organization": _org_to_dict(org),
95 "results": [_package_to_dict(pkg) for pkg in packages],
96 "total": total,
97 "page": page,
98 "page_size": page_size,
99 }
102@tk.side_effect_free
103def udc_organization_packages_ids(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
104 """Return package ids for an organization (sysadmin only)."""
105 tk.check_access("udc_organization_packages_ids", context, data_dict)
107 org = _get_org(data_dict)
108 filters = data_dict.get("filters") or {}
110 query = model.Session.query(model.Package.id).filter(model.Package.owner_org == org.id)
111 query = _apply_package_filters(query, filters)
113 ids = [row[0] for row in query.all()]
114 return {"organization": _org_to_dict(org), "ids": ids, "total": len(ids)}
117def udc_organization_packages_delete(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
118 """Delete (soft-delete) packages for an organization (sysadmin only)."""
119 tk.check_access("udc_organization_packages_delete", context, data_dict)
121 org = _get_org(data_dict)
122 package_ids = data_dict.get("ids") or []
124 if not isinstance(package_ids, list):
125 raise tk.ValidationError({"ids": ["Provide a list of package ids."]})
126 if not package_ids:
127 raise tk.ValidationError({"ids": ["No packages selected."]})
129 errors: list[dict[str, str]] = []
130 deleted = 0
131 delete_action = tk.get_action("package_delete")
133 for pkg_id in package_ids:
134 try:
135 delete_action(context, {"id": pkg_id})
136 deleted += 1
137 except Exception as exc: # pragma: no cover - forward exact error to UI
138 errors.append({"id": str(pkg_id), "error": str(exc)})
140 return {"success": not errors, "deleted": deleted, "errors": errors}
143def udc_organization_delete(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
144 """Soft-delete organizations (sysadmin only)."""
145 tk.check_access("udc_organization_delete", context, data_dict)
147 org_ids = data_dict.get("ids") or []
148 if not isinstance(org_ids, list):
149 raise tk.ValidationError({"ids": ["Provide a list of organization ids."]})
150 if not org_ids:
151 raise tk.ValidationError({"ids": ["No organizations selected."]})
153 errors: list[dict[str, str]] = []
154 deleted = 0
155 delete_action = tk.get_action("organization_delete")
157 for org_id in org_ids:
158 try:
159 delete_action(context, {"id": org_id})
160 deleted += 1
161 except Exception as exc: # pragma: no cover - pass exact backend error
162 errors.append({"id": str(org_id), "error": str(exc)})
164 return {"success": not errors, "deleted": deleted, "errors": errors}
167def udc_purge_deleted_organizations(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
168 """Purge deleted organizations permanently (sysadmin only)."""
169 tk.check_access("udc_purge_deleted_organizations", context, data_dict)
171 selected_ids = data_dict.get("ids") or []
172 if not isinstance(selected_ids, list):
173 raise tk.ValidationError({"ids": ["Provide a list of organization ids."]})
175 query = model.Session.query(model.Group).filter(
176 model.Group.is_organization.is_(True),
177 model.Group.state == "deleted",
178 )
179 if selected_ids:
180 query = query.filter(model.Group.id.in_(selected_ids))
182 orgs = query.all()
183 errors: list[dict[str, str]] = []
184 purged = 0
185 purge_action = tk.get_action("organization_purge")
187 for org in orgs:
188 try:
189 purge_action(context, {"id": org.id})
190 purged += 1
191 except Exception as exc: # pragma: no cover - pass exact backend error
192 errors.append({"id": str(org.id), "error": str(exc)})
194 return {"success": not errors, "purged": purged, "errors": errors}
197def _get_org(data_dict: dict[str, Any]) -> model.Group:
198 org_id = data_dict.get("org_id") or data_dict.get("id") or data_dict.get("name")
199 if not org_id:
200 raise tk.ValidationError({"org_id": ["Organization id or name is required."]})
201 org = model.Group.get(org_id)
202 if not org or not org.is_organization:
203 raise NotFound("Organization not found")
204 return org
207def _org_to_dict(org: model.Group) -> dict[str, Any]:
208 title = getattr(org, "title", None) or getattr(org, "display_name", None) or org.name
209 return {
210 "id": org.id,
211 "name": org.name,
212 "title": title,
213 "description": org.description,
214 "state": org.state,
215 "created": org.created.isoformat() if org.created else None,
216 }
219def _package_to_dict(package: model.Package) -> dict[str, Any]:
220 return {
221 "id": package.id,
222 "name": package.name,
223 "title": package.title,
224 "state": package.state,
225 "private": bool(package.private),
226 "metadata_modified": package.metadata_modified.isoformat() if package.metadata_modified else None,
227 }
230def _apply_org_filters(query, filters: dict[str, Any]):
231 search = (filters.get("q") or "").strip()
232 if search:
233 pattern = f"%{search}%"
234 query = query.filter(
235 or_(
236 model.Group.name.ilike(pattern),
237 model.Group.title.ilike(pattern),
238 model.Group.description.ilike(pattern),
239 )
240 )
242 name = (filters.get("name") or "").strip()
243 if name:
244 query = query.filter(model.Group.name.ilike(f"%{name}%"))
246 title = (filters.get("title") or "").strip()
247 if title:
248 query = query.filter(model.Group.title.ilike(f"%{title}%"))
250 state = filters.get("state")
251 if state:
252 query = query.filter(model.Group.state == state)
253 else:
254 query = query.filter(model.Group.state != "deleted")
256 if _as_bool(filters.get("creator_deleted_or_purged")):
257 query = _filter_orgs_by_creator_deleted_or_purged(query)
259 return query
262def _apply_package_filters(query, filters: dict[str, Any]):
263 search = (filters.get("q") or "").strip()
264 if search:
265 pattern = f"%{search}%"
266 query = query.filter(
267 or_(
268 model.Package.name.ilike(pattern),
269 model.Package.title.ilike(pattern),
270 model.Package.notes.ilike(pattern),
271 )
272 )
274 name = (filters.get("name") or "").strip()
275 if name:
276 query = query.filter(model.Package.name.ilike(f"%{name}%"))
278 title = (filters.get("title") or "").strip()
279 if title:
280 query = query.filter(model.Package.title.ilike(f"%{title}%"))
282 state = filters.get("state")
283 if state:
284 query = query.filter(model.Package.state == state)
285 else:
286 query = query.filter(model.Package.state != "deleted")
288 return query
291def _filter_orgs_by_creator_deleted_or_purged(query):
292 org_ids = [row[0] for row in query.with_entities(model.Group.id).all()]
293 if not org_ids:
294 return query.filter(model.Group.id == "__none__")
296 creator_map = _org_creator_user_map(org_ids)
297 creator_ids = {creator_id for creator_id in creator_map.values() if creator_id}
298 if not creator_ids:
299 return query.filter(model.Group.id == "__none__")
301 user_rows = (
302 model.Session.query(model.User.id, model.User.state)
303 .filter(model.User.id.in_(creator_ids))
304 .all()
305 )
306 user_state_map = {row[0]: row[1] for row in user_rows}
308 matching_org_ids = []
309 for org_id, creator_id in creator_map.items():
310 if not creator_id:
311 continue
312 creator_state = user_state_map.get(creator_id)
313 if creator_state == "deleted" or creator_state is None:
314 matching_org_ids.append(org_id)
316 if not matching_org_ids:
317 return query.filter(model.Group.id == "__none__")
319 return query.filter(model.Group.id.in_(matching_org_ids))
322def _org_creator_user_map(org_ids: list[str]) -> dict[str, str | None]:
323 rows = (
324 model.Session.query(Activity.object_id, Activity.user_id, Activity.timestamp)
325 .filter(
326 Activity.activity_type == "new organization",
327 Activity.object_id.in_(org_ids),
328 )
329 .order_by(Activity.object_id.asc(), Activity.timestamp.asc())
330 .all()
331 )
333 creator_map: dict[str, str | None] = {}
334 for object_id, user_id, _timestamp in rows:
335 if object_id not in creator_map:
336 creator_map[object_id] = user_id
337 return creator_map
340def _as_bool(value: Any) -> bool:
341 if isinstance(value, bool):
342 return value
343 if isinstance(value, str):
344 return value.strip().lower() in {"1", "true", "yes", "on"}
345 if isinstance(value, int):
346 return value == 1
347 return False