Coverage for ckanext/udc/user/actions.py: 68%
92 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1"""User management actions for UDC."""
2from __future__ import annotations
3from typing import Any
4from sqlalchemy import or_
5import ckan.plugins.toolkit as tk
6from ckan.types import Context
7from ckan import model
8from ckan.logic import NotFound
11@tk.side_effect_free
12def deleted_users_list(context: Context, data_dict: dict[str, Any]) -> list[dict[str, Any]]:
13 """Get a list of deleted users.
15 Only sysadmins can access this endpoint.
17 :returns: List of deleted user dictionaries
18 :rtype: list of dictionaries
19 """
20 tk.check_access('deleted_users_list', context, data_dict)
22 page = int(data_dict.get("page", 1) or 1)
23 page_size = int(data_dict.get("page_size", 25) or 25)
24 filters = data_dict.get("filters") or {}
26 query = _apply_user_filters(
27 model.Session.query(model.User).filter(model.User.state == "deleted"),
28 filters,
29 )
30 total = query.count()
31 deleted_users = (
32 query.order_by(model.User.name.asc())
33 .offset((page - 1) * page_size)
34 .limit(page_size)
35 .all()
36 )
38 return {
39 "results": [_user_to_dict(user) for user in deleted_users],
40 "total": total,
41 "page": page,
42 "page_size": page_size,
43 }
46def purge_deleted_users(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
47 """Purge all deleted users from the database.
49 .. warning:: This action cannot be undone! Users will be permanently removed.
51 Only sysadmins can purge deleted users.
53 :returns: Dictionary with count of purged users
54 :rtype: dictionary
55 """
56 tk.check_access('purge_deleted_users', context, data_dict)
58 selected_ids = data_dict.get("ids") or []
59 query = model.Session.query(model.User).filter(model.User.state == "deleted")
60 if selected_ids:
61 query = query.filter(model.User.id.in_(selected_ids))
62 deleted_users = query.all()
64 count = 0
65 for user_to_purge in deleted_users:
66 # Remove user memberships
67 user_memberships = model.Session.query(model.Member).filter(
68 model.Member.table_id == user_to_purge.id
69 ).all()
70 for membership in user_memberships:
71 membership.purge()
73 # Remove package collaborations
74 collaborations = model.Session.query(model.PackageMember).filter(
75 model.PackageMember.user_id == user_to_purge.id
76 ).all()
77 for collab in collaborations:
78 collab.purge()
80 # Purge the user
81 user_to_purge.purge()
82 count += 1
84 model.Session.commit()
86 return {
87 'success': True,
88 'count': count,
89 'message': f'{count} deleted user(s) have been purged'
90 }
93@tk.side_effect_free
94def udc_user_list(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
95 """List active users with pagination and column filters."""
96 tk.check_access("udc_user_list", context, data_dict)
98 page = int(data_dict.get("page", 1) or 1)
99 page_size = int(data_dict.get("page_size", 25) or 25)
100 filters = data_dict.get("filters") or {}
102 query = _apply_user_filters(
103 model.Session.query(model.User).filter(model.User.state != "deleted"),
104 filters,
105 )
106 total = query.count()
107 users = (
108 query.order_by(model.User.name.asc())
109 .offset((page - 1) * page_size)
110 .limit(page_size)
111 .all()
112 )
114 return {
115 "results": [_user_to_dict(user) for user in users],
116 "total": total,
117 "page": page,
118 "page_size": page_size,
119 }
122def udc_user_reset_password(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
123 """Reset a user's password (sysadmin only)."""
124 tk.check_access("udc_user_reset_password", context, data_dict)
126 user_id = data_dict.get("id") or data_dict.get("name")
127 new_password = data_dict.get("new_password")
128 if not user_id or not new_password:
129 raise tk.ValidationError({"new_password": ["Password is required."]})
131 user_obj = model.User.get(user_id)
132 if not user_obj:
133 raise NotFound("User not found")
135 user_obj.set_password(new_password)
136 model.Session.commit()
138 return {"success": True, "id": user_obj.id, "name": user_obj.name}
141def udc_user_delete(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]:
142 """Soft-delete a user (sysadmin only)."""
143 tk.check_access("udc_user_delete", context, data_dict)
145 user_id = data_dict.get("id") or data_dict.get("name")
146 if not user_id:
147 raise tk.ValidationError({"id": ["User id or name is required."]})
149 user_obj = model.User.get(user_id)
150 if not user_obj:
151 raise NotFound("User not found")
153 user_obj.state = "deleted"
154 model.Session.commit()
156 return {"success": True, "id": user_obj.id, "name": user_obj.name}
159def _user_to_dict(user: model.User) -> dict[str, Any]:
160 return {
161 "id": user.id,
162 "name": user.name,
163 "fullname": user.fullname,
164 "email": user.email,
165 "created": user.created.isoformat() if user.created else None,
166 "state": user.state,
167 "sysadmin": bool(user.sysadmin),
168 "about": user.about,
169 }
172def _apply_user_filters(query, filters: dict[str, Any]):
173 search = (filters.get("q") or "").strip()
174 if search:
175 pattern = f"%{search}%"
176 query = query.filter(
177 or_(
178 model.User.name.ilike(pattern),
179 model.User.fullname.ilike(pattern),
180 model.User.email.ilike(pattern),
181 model.User.about.ilike(pattern),
182 )
183 )
185 name = (filters.get("name") or "").strip()
186 if name:
187 query = query.filter(model.User.name.ilike(f"%{name}%"))
189 fullname = (filters.get("fullname") or "").strip()
190 if fullname:
191 query = query.filter(model.User.fullname.ilike(f"%{fullname}%"))
193 email = (filters.get("email") or "").strip()
194 if email:
195 query = query.filter(model.User.email.ilike(f"%{email}%"))
197 about = (filters.get("about") or "").strip()
198 if about:
199 query = query.filter(model.User.about.ilike(f"%{about}%"))
201 sysadmin = filters.get("sysadmin")
202 if sysadmin in (True, False):
203 query = query.filter(model.User.sysadmin == sysadmin)
205 return query