Coverage for ckanext/udc/file_format/logic.py: 23%
75 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1from ckan import model, authz, logic
2import ckan.plugins as plugins
3import logging
4import re
5import json
6from ckan.common import _
8from ckanext.udc.graph.queries import get_client
9from ckanext.udc.graph.preload import dropdown_reload
11log = logging.getLogger(__name__)
13GRAPH = "http://data.urbandatacentre.ca/custom-file-format"
14PREFIX = GRAPH + "#"
17def file_format_create(context, data_dict):
18 """
19 Any user can create a custom file format.
20 """
22 if plugins.get_plugin('udc').disable_graphdb:
23 raise logic.ValidationError(_("GraphDB integration is not enabled"))
25 user = context.get("user")
26 id = data_dict.get('id')
27 label = data_dict.get('label')
29 if not user:
30 raise logic.ValidationError(_("You are not logged in"))
32 if not label:
33 raise logic.ValidationError(_("file format label is required"))
35 if not id:
36 # Replace whitespaces
37 id = re.sub(r'\s', '-', label)
38 # Remove unsafe characters in URL
39 id = re.sub(r'[<>#%\{\}|\^~\[\]]/', '', id)
41 query = f"""
42 PREFIX cff: <{PREFIX}>
43 PREFIX term: <http://purl.org/dc/terms/>
44 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
45 INSERT DATA {{
46 GRAPH <{GRAPH}> {{
47 cff:{id} a term:MediaType, owl:NamedIndividual;
48 rdfs:label {json.dumps(label)}
49 }}
50 }}
51 """
52 client = get_client()
53 try:
54 client.execute_sparql(query)
55 except Exception as e:
56 raise logic.ActionError("Error in KG:" + str(e))
58 dropdown_reload("file_format")
60 return {"success": True, "id": PREFIX + id}
63@logic.side_effect_free
64def file_formats_get(context):
65 """
66 Any user can get all custom file formats.
67 """
69 if plugins.get_plugin('udc').disable_graphdb:
70 raise logic.ValidationError("GraphDB integration is not enabled")
72 query = f"""
73 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
74 SELECT ?uri ?label FROM <{GRAPH}> WHERE {{
75 ?uri a term:MediaType;
76 rdfs:label ?label.
77 }}
78 """
79 client = get_client()
80 try:
81 data = client.execute_sparql(query)
82 except Exception as e:
83 raise logic.ActionError(_("Error in KG:") + str(e))
85 result = []
86 for item in result["results"]["bindings"]:
87 result.append({
88 "id": item["uri"]["value"],
89 "label": item["label"]["value"]
90 })
91 return {"success": True, "data": data}
94def file_format_delete(context, data_dict):
95 """
96 Admin can delete a custom file format.
97 """
98 if plugins.get_plugin('udc').disable_graphdb:
99 raise logic.ValidationError(_("GraphDB integration is not enabled"))
101 user = context.get("user")
102 id = data_dict.get('id')
104 if not user:
105 raise logic.ValidationError(_("You are not logged in"))
107 if not id:
108 raise logic.ValidationError(_("file format id is required"))
110 if not authz.is_sysadmin(user):
111 raise logic.NotAuthorized(_("You are not authorized to delete this file format"))
113 query = f"""
114 PREFIX cff: <{PREFIX}>
115 PREFIX term: <http://purl.org/dc/terms/>
116 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
117 DELETE WHERE {{
118 GRAPH <{GRAPH}> {{
119 <{id}> a term:MediaType, owl:NamedIndividual;
120 rdfs:label ?label.
122 }}
123 }}
124 """
125 client = get_client()
126 try:
127 client.execute_sparql(query)
128 except Exception as e:
129 raise logic.ActionError("Error in KG:" + str(e))
131 dropdown_reload("file_format")
133 return {"success": True}
136def before_package_update(context, data_dict):
137 # Pre-process custom file format
138 # If the provided file_format is not an URI (starts with http), create the custom file format
139 if data_dict.get('file_format'):
140 file_formats = data_dict.get('file_format').split(",")
141 file_formats_list = []
142 for file_format in file_formats:
143 if not file_format.startswith("http"):
144 file_format_result = file_format_create(context, {"label": file_format})
145 file_formats_list.append(file_format_result["id"])
146 else:
147 file_formats_list.append(file_format)
148 data_dict['file_format'] = ','.join(file_formats_list)