Coverage for ckanext/udc/system/actions.py: 30%

30 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1import subprocess 

2import ckan.authz as authz 

3from ckan.types import Context 

4import ckan.logic as logic 

5from ckan.types import Context 

6from ckan.common import _ 

7 

8 

9def reload_supervisord(context: Context, data: dict) -> str: 

10 # Check admin 

11 if not authz.is_sysadmin(context.get("user")): 

12 raise logic.NotAuthorized(_("You are not authorized to view this page")) 

13 

14 task = data.get("task") 

15 if task == "ckan": 

16 target = "ckan-uwsgi:" 

17 elif task == "worker": 

18 target = "ckan-worker:" 

19 elif task == "all": 

20 target = "all" 

21 else: 

22 raise logic.ValidationError(_("task must be one of 'ckan', 'worker', or 'all'")) 

23 

24 try: 

25 subprocess.run(["sudo", "supervisorctl", "restart", target], check=True) 

26 except subprocess.CalledProcessError as e: 

27 return {"success": False, "error": str(e)} 

28 

29 return {"success": True} 

30 

31 

32@logic.side_effect_free 

33def get_system_stats(context: Context, data: dict) -> dict: 

34 """ 

35 Remember to add `www-data ALL=(ALL) NOPASSWD: /usr/bin/supervisorctl` to the sudoers file. 

36 via `sudo visudo` 

37 """ 

38 # Check admin 

39 if not authz.is_sysadmin(context.get("user")): 

40 raise logic.NotAuthorized("You are not authorized to view this page") 

41 

42 # Get CPU, memory, and disk usage 

43 cpu_usage = subprocess.run(["top", "-bn1"], stdout=subprocess.PIPE).stdout.decode() 

44 memory_usage = subprocess.run( 

45 ["free", "-h"], stdout=subprocess.PIPE 

46 ).stdout.decode() 

47 disk_usage = subprocess.run(["df", "-h"], stdout=subprocess.PIPE).stdout.decode() 

48 

49 return { 

50 "cpu_usage": cpu_usage, 

51 "memory_usage": memory_usage, 

52 "disk_usage": disk_usage, 

53 }