Coverage for ckanext/udc/organization/actions.py: 15%

182 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-03-30 22:15 +0000

1"""Organization management actions for UDC.""" 

2from __future__ import annotations 

3 

4from typing import Any 

5 

6from sqlalchemy import or_ 

7 

8import ckan.plugins.toolkit as tk 

9from ckan import model 

10from ckan.logic import NotFound 

11from ckan.types import Context 

12from ckanext.activity.model.activity import Activity 

13 

14 

15@tk.side_effect_free 

16def udc_organization_list(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

17 """List organizations with pagination and filters (sysadmin only).""" 

18 tk.check_access("udc_organization_list", context, data_dict) 

19 

20 page = int(data_dict.get("page", 1) or 1) 

21 page_size = int(data_dict.get("page_size", 25) or 25) 

22 filters = data_dict.get("filters") or {} 

23 

24 query = model.Session.query(model.Group).filter(model.Group.is_organization.is_(True)) 

25 query = _apply_org_filters(query, filters) 

26 total = query.count() 

27 

28 orgs = ( 

29 query.order_by(model.Group.name.asc()) 

30 .offset((page - 1) * page_size) 

31 .limit(page_size) 

32 .all() 

33 ) 

34 

35 return { 

36 "results": [_org_to_dict(org) for org in orgs], 

37 "total": total, 

38 "page": page, 

39 "page_size": page_size, 

40 } 

41 

42 

43@tk.side_effect_free 

44def udc_deleted_organization_list(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

45 """List deleted organizations with pagination and filters (sysadmin only).""" 

46 tk.check_access("udc_deleted_organization_list", context, data_dict) 

47 

48 page = int(data_dict.get("page", 1) or 1) 

49 page_size = int(data_dict.get("page_size", 25) or 25) 

50 filters = data_dict.get("filters") or {} 

51 filters = {**filters, "state": "deleted"} 

52 

53 query = model.Session.query(model.Group).filter(model.Group.is_organization.is_(True)) 

54 query = _apply_org_filters(query, filters) 

55 total = query.count() 

56 

57 orgs = ( 

58 query.order_by(model.Group.name.asc()) 

59 .offset((page - 1) * page_size) 

60 .limit(page_size) 

61 .all() 

62 ) 

63 

64 return { 

65 "results": [_org_to_dict(org) for org in orgs], 

66 "total": total, 

67 "page": page, 

68 "page_size": page_size, 

69 } 

70 

71 

72@tk.side_effect_free 

73def udc_organization_packages_list(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

74 """List packages for an organization with pagination and filters (sysadmin only).""" 

75 tk.check_access("udc_organization_packages_list", context, data_dict) 

76 

77 org = _get_org(data_dict) 

78 page = int(data_dict.get("page", 1) or 1) 

79 page_size = int(data_dict.get("page_size", 25) or 25) 

80 filters = data_dict.get("filters") or {} 

81 

82 query = model.Session.query(model.Package).filter(model.Package.owner_org == org.id) 

83 query = _apply_package_filters(query, filters) 

84 total = query.count() 

85 

86 packages = ( 

87 query.order_by(model.Package.name.asc()) 

88 .offset((page - 1) * page_size) 

89 .limit(page_size) 

90 .all() 

91 ) 

92 

93 return { 

94 "organization": _org_to_dict(org), 

95 "results": [_package_to_dict(pkg) for pkg in packages], 

96 "total": total, 

97 "page": page, 

98 "page_size": page_size, 

99 } 

100 

101 

102@tk.side_effect_free 

103def udc_organization_packages_ids(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

104 """Return package ids for an organization (sysadmin only).""" 

105 tk.check_access("udc_organization_packages_ids", context, data_dict) 

106 

107 org = _get_org(data_dict) 

108 filters = data_dict.get("filters") or {} 

109 

110 query = model.Session.query(model.Package.id).filter(model.Package.owner_org == org.id) 

111 query = _apply_package_filters(query, filters) 

112 

113 ids = [row[0] for row in query.all()] 

114 return {"organization": _org_to_dict(org), "ids": ids, "total": len(ids)} 

115 

116 

117def udc_organization_packages_delete(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

118 """Delete (soft-delete) packages for an organization (sysadmin only).""" 

119 tk.check_access("udc_organization_packages_delete", context, data_dict) 

120 

121 org = _get_org(data_dict) 

122 package_ids = data_dict.get("ids") or [] 

123 

124 if not isinstance(package_ids, list): 

125 raise tk.ValidationError({"ids": ["Provide a list of package ids."]}) 

126 if not package_ids: 

127 raise tk.ValidationError({"ids": ["No packages selected."]}) 

128 

129 errors: list[dict[str, str]] = [] 

130 deleted = 0 

131 delete_action = tk.get_action("package_delete") 

132 

133 for pkg_id in package_ids: 

134 try: 

135 delete_action(context, {"id": pkg_id}) 

136 deleted += 1 

137 except Exception as exc: # pragma: no cover - forward exact error to UI 

138 errors.append({"id": str(pkg_id), "error": str(exc)}) 

139 

140 return {"success": not errors, "deleted": deleted, "errors": errors} 

141 

142 

143def udc_organization_delete(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

144 """Soft-delete organizations (sysadmin only).""" 

145 tk.check_access("udc_organization_delete", context, data_dict) 

146 

147 org_ids = data_dict.get("ids") or [] 

148 if not isinstance(org_ids, list): 

149 raise tk.ValidationError({"ids": ["Provide a list of organization ids."]}) 

150 if not org_ids: 

151 raise tk.ValidationError({"ids": ["No organizations selected."]}) 

152 

153 errors: list[dict[str, str]] = [] 

154 deleted = 0 

155 delete_action = tk.get_action("organization_delete") 

156 

157 for org_id in org_ids: 

158 try: 

159 delete_action(context, {"id": org_id}) 

160 deleted += 1 

161 except Exception as exc: # pragma: no cover - pass exact backend error 

162 errors.append({"id": str(org_id), "error": str(exc)}) 

163 

164 return {"success": not errors, "deleted": deleted, "errors": errors} 

165 

166 

167def udc_purge_deleted_organizations(context: Context, data_dict: dict[str, Any]) -> dict[str, Any]: 

168 """Purge deleted organizations permanently (sysadmin only).""" 

169 tk.check_access("udc_purge_deleted_organizations", context, data_dict) 

170 

171 selected_ids = data_dict.get("ids") or [] 

172 if not isinstance(selected_ids, list): 

173 raise tk.ValidationError({"ids": ["Provide a list of organization ids."]}) 

174 

175 query = model.Session.query(model.Group).filter( 

176 model.Group.is_organization.is_(True), 

177 model.Group.state == "deleted", 

178 ) 

179 if selected_ids: 

180 query = query.filter(model.Group.id.in_(selected_ids)) 

181 

182 orgs = query.all() 

183 errors: list[dict[str, str]] = [] 

184 purged = 0 

185 purge_action = tk.get_action("organization_purge") 

186 

187 for org in orgs: 

188 try: 

189 purge_action(context, {"id": org.id}) 

190 purged += 1 

191 except Exception as exc: # pragma: no cover - pass exact backend error 

192 errors.append({"id": str(org.id), "error": str(exc)}) 

193 

194 return {"success": not errors, "purged": purged, "errors": errors} 

195 

196 

197def _get_org(data_dict: dict[str, Any]) -> model.Group: 

198 org_id = data_dict.get("org_id") or data_dict.get("id") or data_dict.get("name") 

199 if not org_id: 

200 raise tk.ValidationError({"org_id": ["Organization id or name is required."]}) 

201 org = model.Group.get(org_id) 

202 if not org or not org.is_organization: 

203 raise NotFound("Organization not found") 

204 return org 

205 

206 

207def _org_to_dict(org: model.Group) -> dict[str, Any]: 

208 title = getattr(org, "title", None) or getattr(org, "display_name", None) or org.name 

209 return { 

210 "id": org.id, 

211 "name": org.name, 

212 "title": title, 

213 "description": org.description, 

214 "state": org.state, 

215 "created": org.created.isoformat() if org.created else None, 

216 } 

217 

218 

219def _package_to_dict(package: model.Package) -> dict[str, Any]: 

220 return { 

221 "id": package.id, 

222 "name": package.name, 

223 "title": package.title, 

224 "state": package.state, 

225 "private": bool(package.private), 

226 "metadata_modified": package.metadata_modified.isoformat() if package.metadata_modified else None, 

227 } 

228 

229 

230def _apply_org_filters(query, filters: dict[str, Any]): 

231 search = (filters.get("q") or "").strip() 

232 if search: 

233 pattern = f"%{search}%" 

234 query = query.filter( 

235 or_( 

236 model.Group.name.ilike(pattern), 

237 model.Group.title.ilike(pattern), 

238 model.Group.description.ilike(pattern), 

239 ) 

240 ) 

241 

242 name = (filters.get("name") or "").strip() 

243 if name: 

244 query = query.filter(model.Group.name.ilike(f"%{name}%")) 

245 

246 title = (filters.get("title") or "").strip() 

247 if title: 

248 query = query.filter(model.Group.title.ilike(f"%{title}%")) 

249 

250 state = filters.get("state") 

251 if state: 

252 query = query.filter(model.Group.state == state) 

253 else: 

254 query = query.filter(model.Group.state != "deleted") 

255 

256 if _as_bool(filters.get("creator_deleted_or_purged")): 

257 query = _filter_orgs_by_creator_deleted_or_purged(query) 

258 

259 return query 

260 

261 

262def _apply_package_filters(query, filters: dict[str, Any]): 

263 search = (filters.get("q") or "").strip() 

264 if search: 

265 pattern = f"%{search}%" 

266 query = query.filter( 

267 or_( 

268 model.Package.name.ilike(pattern), 

269 model.Package.title.ilike(pattern), 

270 model.Package.notes.ilike(pattern), 

271 ) 

272 ) 

273 

274 name = (filters.get("name") or "").strip() 

275 if name: 

276 query = query.filter(model.Package.name.ilike(f"%{name}%")) 

277 

278 title = (filters.get("title") or "").strip() 

279 if title: 

280 query = query.filter(model.Package.title.ilike(f"%{title}%")) 

281 

282 state = filters.get("state") 

283 if state: 

284 query = query.filter(model.Package.state == state) 

285 else: 

286 query = query.filter(model.Package.state != "deleted") 

287 

288 return query 

289 

290 

291def _filter_orgs_by_creator_deleted_or_purged(query): 

292 org_ids = [row[0] for row in query.with_entities(model.Group.id).all()] 

293 if not org_ids: 

294 return query.filter(model.Group.id == "__none__") 

295 

296 creator_map = _org_creator_user_map(org_ids) 

297 creator_ids = {creator_id for creator_id in creator_map.values() if creator_id} 

298 if not creator_ids: 

299 return query.filter(model.Group.id == "__none__") 

300 

301 user_rows = ( 

302 model.Session.query(model.User.id, model.User.state) 

303 .filter(model.User.id.in_(creator_ids)) 

304 .all() 

305 ) 

306 user_state_map = {row[0]: row[1] for row in user_rows} 

307 

308 matching_org_ids = [] 

309 for org_id, creator_id in creator_map.items(): 

310 if not creator_id: 

311 continue 

312 creator_state = user_state_map.get(creator_id) 

313 if creator_state == "deleted" or creator_state is None: 

314 matching_org_ids.append(org_id) 

315 

316 if not matching_org_ids: 

317 return query.filter(model.Group.id == "__none__") 

318 

319 return query.filter(model.Group.id.in_(matching_org_ids)) 

320 

321 

322def _org_creator_user_map(org_ids: list[str]) -> dict[str, str | None]: 

323 rows = ( 

324 model.Session.query(Activity.object_id, Activity.user_id, Activity.timestamp) 

325 .filter( 

326 Activity.activity_type == "new organization", 

327 Activity.object_id.in_(org_ids), 

328 ) 

329 .order_by(Activity.object_id.asc(), Activity.timestamp.asc()) 

330 .all() 

331 ) 

332 

333 creator_map: dict[str, str | None] = {} 

334 for object_id, user_id, _timestamp in rows: 

335 if object_id not in creator_map: 

336 creator_map[object_id] = user_id 

337 return creator_map 

338 

339 

340def _as_bool(value: Any) -> bool: 

341 if isinstance(value, bool): 

342 return value 

343 if isinstance(value, str): 

344 return value.strip().lower() in {"1", "true", "yes", "on"} 

345 if isinstance(value, int): 

346 return value == 1 

347 return False