Coverage for ckanext/udc/system/actions.py: 30%
30 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1import subprocess
2import ckan.authz as authz
3from ckan.types import Context
4import ckan.logic as logic
5from ckan.types import Context
6from ckan.common import _
9def reload_supervisord(context: Context, data: dict) -> str:
10 # Check admin
11 if not authz.is_sysadmin(context.get("user")):
12 raise logic.NotAuthorized(_("You are not authorized to view this page"))
14 task = data.get("task")
15 if task == "ckan":
16 target = "ckan-uwsgi:"
17 elif task == "worker":
18 target = "ckan-worker:"
19 elif task == "all":
20 target = "all"
21 else:
22 raise logic.ValidationError(_("task must be one of 'ckan', 'worker', or 'all'"))
24 try:
25 subprocess.run(["sudo", "supervisorctl", "restart", target], check=True)
26 except subprocess.CalledProcessError as e:
27 return {"success": False, "error": str(e)}
29 return {"success": True}
32@logic.side_effect_free
33def get_system_stats(context: Context, data: dict) -> dict:
34 """
35 Remember to add `www-data ALL=(ALL) NOPASSWD: /usr/bin/supervisorctl` to the sudoers file.
36 via `sudo visudo`
37 """
38 # Check admin
39 if not authz.is_sysadmin(context.get("user")):
40 raise logic.NotAuthorized("You are not authorized to view this page")
42 # Get CPU, memory, and disk usage
43 cpu_usage = subprocess.run(["top", "-bn1"], stdout=subprocess.PIPE).stdout.decode()
44 memory_usage = subprocess.run(
45 ["free", "-h"], stdout=subprocess.PIPE
46 ).stdout.decode()
47 disk_usage = subprocess.run(["df", "-h"], stdout=subprocess.PIPE).stdout.decode()
49 return {
50 "cpu_usage": cpu_usage,
51 "memory_usage": memory_usage,
52 "disk_usage": disk_usage,
53 }