Coverage for ckanext/udc/user/actions.py: 68%

92 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1"""User management actions for UDC.""" 

2from __future__ import annotations 

3from typing import Any 

4from sqlalchemy import or_ 

5import ckan.plugins.toolkit as tk 

6from ckan.types import Context 

7from ckan import model 

8from ckan.logic import NotFound 

9 

10 

11@tk.side_effect_free 

12def deleted_users_list(context: Context, data_dict: dict[str, Any]) -> list[dict[str, Any]]: 

13 """Get a list of deleted users. 

14  

15 Only sysadmins can access this endpoint. 

16  

17 :returns: List of deleted user dictionaries 

18 :rtype: list of dictionaries 

19 """ 

20 tk.check_access('deleted_users_list', context, data_dict) 

21 

22 page = int(data_dict.get("page", 1) or 1) 

23 page_size = int(data_dict.get("page_size", 25) or 25) 

24 filters = data_dict.get("filters") or {} 

25 

26 query = _apply_user_filters( 

27 model.Session.query(model.User).filter(model.User.state == "deleted"), 

28 filters, 

29 ) 

30 total = query.count() 

31 deleted_users = ( 

32 query.order_by(model.User.name.asc()) 

33 .offset((page - 1) * page_size) 

34 .limit(page_size) 

35 .all() 

36 ) 

37 

38 return { 

39 "results": [_user_to_dict(user) for user in deleted_users], 

40 "total": total, 

41 "page": page, 

42 "page_size": page_size, 

43 } 

44 

45 

46def purge_deleted_users(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

47 """Purge all deleted users from the database. 

48  

49 .. warning:: This action cannot be undone! Users will be permanently removed. 

50  

51 Only sysadmins can purge deleted users. 

52  

53 :returns: Dictionary with count of purged users 

54 :rtype: dictionary 

55 """ 

56 tk.check_access('purge_deleted_users', context, data_dict) 

57 

58 selected_ids = data_dict.get("ids") or [] 

59 query = model.Session.query(model.User).filter(model.User.state == "deleted") 

60 if selected_ids: 

61 query = query.filter(model.User.id.in_(selected_ids)) 

62 deleted_users = query.all() 

63 

64 count = 0 

65 for user_to_purge in deleted_users: 

66 # Remove user memberships 

67 user_memberships = model.Session.query(model.Member).filter( 

68 model.Member.table_id == user_to_purge.id 

69 ).all() 

70 for membership in user_memberships: 

71 membership.purge() 

72 

73 # Remove package collaborations 

74 collaborations = model.Session.query(model.PackageMember).filter( 

75 model.PackageMember.user_id == user_to_purge.id 

76 ).all() 

77 for collab in collaborations: 

78 collab.purge() 

79 

80 # Purge the user 

81 user_to_purge.purge() 

82 count += 1 

83 

84 model.Session.commit() 

85 

86 return { 

87 'success': True, 

88 'count': count, 

89 'message': f'{count} deleted user(s) have been purged' 

90 } 

91 

92 

93@tk.side_effect_free 

94def udc_user_list(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

95 """List active users with pagination and column filters.""" 

96 tk.check_access("udc_user_list", context, data_dict) 

97 

98 page = int(data_dict.get("page", 1) or 1) 

99 page_size = int(data_dict.get("page_size", 25) or 25) 

100 filters = data_dict.get("filters") or {} 

101 

102 query = _apply_user_filters( 

103 model.Session.query(model.User).filter(model.User.state != "deleted"), 

104 filters, 

105 ) 

106 total = query.count() 

107 users = ( 

108 query.order_by(model.User.name.asc()) 

109 .offset((page - 1) * page_size) 

110 .limit(page_size) 

111 .all() 

112 ) 

113 

114 return { 

115 "results": [_user_to_dict(user) for user in users], 

116 "total": total, 

117 "page": page, 

118 "page_size": page_size, 

119 } 

120 

121 

122def udc_user_reset_password(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

123 """Reset a user's password (sysadmin only).""" 

124 tk.check_access("udc_user_reset_password", context, data_dict) 

125 

126 user_id = data_dict.get("id") or data_dict.get("name") 

127 new_password = data_dict.get("new_password") 

128 if not user_id or not new_password: 

129 raise tk.ValidationError({"new_password": ["Password is required."]}) 

130 

131 user_obj = model.User.get(user_id) 

132 if not user_obj: 

133 raise NotFound("User not found") 

134 

135 user_obj.set_password(new_password) 

136 model.Session.commit() 

137 

138 return {"success": True, "id": user_obj.id, "name": user_obj.name} 

139 

140 

141def udc_user_delete(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

142 """Soft-delete a user (sysadmin only).""" 

143 tk.check_access("udc_user_delete", context, data_dict) 

144 

145 user_id = data_dict.get("id") or data_dict.get("name") 

146 if not user_id: 

147 raise tk.ValidationError({"id": ["User id or name is required."]}) 

148 

149 user_obj = model.User.get(user_id) 

150 if not user_obj: 

151 raise NotFound("User not found") 

152 

153 user_obj.state = "deleted" 

154 model.Session.commit() 

155 

156 return {"success": True, "id": user_obj.id, "name": user_obj.name} 

157 

158 

159def _user_to_dict(user: model.User) -> dict[str, Any]: 

160 return { 

161 "id": user.id, 

162 "name": user.name, 

163 "fullname": user.fullname, 

164 "email": user.email, 

165 "created": user.created.isoformat() if user.created else None, 

166 "state": user.state, 

167 "sysadmin": bool(user.sysadmin), 

168 "about": user.about, 

169 } 

170 

171 

172def _apply_user_filters(query, filters: dict[str, Any]): 

173 search = (filters.get("q") or "").strip() 

174 if search: 

175 pattern = f"%{search}%" 

176 query = query.filter( 

177 or_( 

178 model.User.name.ilike(pattern), 

179 model.User.fullname.ilike(pattern), 

180 model.User.email.ilike(pattern), 

181 model.User.about.ilike(pattern), 

182 ) 

183 ) 

184 

185 name = (filters.get("name") or "").strip() 

186 if name: 

187 query = query.filter(model.User.name.ilike(f"%{name}%")) 

188 

189 fullname = (filters.get("fullname") or "").strip() 

190 if fullname: 

191 query = query.filter(model.User.fullname.ilike(f"%{fullname}%")) 

192 

193 email = (filters.get("email") or "").strip() 

194 if email: 

195 query = query.filter(model.User.email.ilike(f"%{email}%")) 

196 

197 about = (filters.get("about") or "").strip() 

198 if about: 

199 query = query.filter(model.User.about.ilike(f"%{about}%")) 

200 

201 sysadmin = filters.get("sysadmin") 

202 if sysadmin in (True, False): 

203 query = query.filter(model.User.sysadmin == sysadmin) 

204 

205 return query