Coverage for ckanext/udc/graph/logic.py: 9%
217 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1import ckan.plugins as plugins
2import ckan.plugins.toolkit as tk
4import rdflib
5from rdflib import Graph, term
6from rdflib.namespace import split_uri
7from rdflib.plugins.serializers.turtle import TurtleSerializer
8from rdflib.serializer import Serializer
9import pyld
10import logging
11import json
13from .serializer import *
14from .template import compile_template, compile_with_temp_value
15from .mapping_helpers import all_helpers
16from .ckan_field import prepare_data_dict
17from .queries import get_uri_as_object_usage, get_client, get_num_paths
20def get_mappings():
21 # print("mappings: ", plugins.get_plugin('udc').mappings)
22 return pyld.jsonld.expand(plugins.get_plugin('udc').mappings)
25def find_existing_instance_uris(data_dict) -> list:
26 """Return a list of URIs"""
27 prepared_dict = prepare_data_dict(data_dict)
28 g = Graph()
29 compiled_template = compile_with_temp_value(get_mappings(), all_helpers,
30 prepared_dict)
31 catalogue_uri = compiled_template["@id"]
32 # print("compiled_template", compiled_template)
34 # Ignore warnings from rdflib.term when parsing the template with temp values
35 logging.getLogger("rdflib.term").setLevel(logging.ERROR)
36 g.parse(data=compiled_template, format='json-ld')
37 logging.getLogger("rdflib.term").setLevel(logging.WARNING)
39 # Get all uris that is used as subject (s, any, any)
40 subjects = set(g.subjects(None, None))
42 # Find all (s1, p, s2) triples and generate mappings: s -> tuple(s, p, o)
43 s2spo = {}
44 for s1 in subjects:
45 for s2 in subjects:
46 if s1 == s2:
47 continue
48 for p in g.predicates(s1, s2):
49 if s1 in s2spo:
50 s2spo[s1].append((s1, p, s2))
51 else:
52 s2spo[s1] = [(s1, p, s2)]
53 # print("s2spo", s2spo)
55 # Generate s -> vars mappings
56 s2var = {}
57 cnt = 0
58 for s in subjects:
59 if s != catalogue_uri:
60 s2var[str(s)] = f'var{cnt}'
61 cnt += 1
63 # print('s2var', s2var)
65 # o -> (s, sparql part)
66 query_parts = {}
68 # Generate select query
69 triples = ''
70 for spos in s2spo.values():
71 for s, p, o in spos:
72 # First level
73 if s == rdflib.term.URIRef(catalogue_uri):
74 query_parts[str(o)] = (str(s), f"<{s}> <{p}> ?{s2var[str(o)]}")
75 # triples += f"\tOPTIONAL {{ <{s}> <{p}> ?{s2var[str(o)]} }}\n"
76 else:
77 query_parts[str(o)] = (str(s), f"?{s2var[str(s)]} <{p}> ?{s2var[str(o)]}")
78 # triples += f"\tOPTIONAL {{ ?{s2var[str(s)]} <{p}> ?{s2var[str(o)]} }}\n"
79 for o, (s, part) in query_parts.items():
80 if s == catalogue_uri:
81 triples += f"\tOPTIONAL {{ {part} }}\n"
82 else:
83 path = [part]
84 curr = s
85 max_iteration = 100
86 i = 0
87 while curr != catalogue_uri:
88 upper_level = query_parts.get(curr)
89 if not upper_level:
90 raise ValueError("Cannot find path to the catalogue entry")
91 path.append(upper_level[1])
92 curr = upper_level[0]
93 i += 1
94 if max_iteration < i:
95 raise ValueError("Reached max iteration to find the path to the catalogue entry")
96 inner_query = ''
97 for step in path:
98 inner_query = f"\tOPTIONAL {{ {step} {inner_query} }}\n"
99 triples += inner_query
102 query = f'SELECT DISTINCT * WHERE {{\n{triples}}}'
103 # print('select-query', query)
104 client = get_client()
105 result = client.execute_sparql(query)
106 if len(result["results"]["bindings"]) == 0:
107 return {}
108 elif len(result["results"]["bindings"]) > 1:
109 raise ValueError("KG may not be consistent.")
111 s2uri = {}
112 results = result["results"]["bindings"][0]
113 for s, var in s2var.items():
114 if results.get(var) and results[var]["type"] != "bnode": # Skip blank nodes
115 s2uri[s] = results[var]["value"]
116 # print("s2uri", s2uri)
117 return [*s2uri.values()]
120def onUpdateCatalogue(context, data_dict):
121 # print(f"onUpdateCatalogue Update: ", data_dict)
123 # Remove empty fields
124 for key in [*data_dict.keys()]:
125 if data_dict[key] == '':
126 del data_dict[key]
128 uris_to_del = find_existing_instance_uris(data_dict)
130 # print("data_dict", json.dumps(data_dict, indent=2))
131 prepared_dict = prepare_data_dict(data_dict)
132 # print("prepared_dict", json.dumps(prepared_dict, indent=2))
133 compiled_template = compile_template(get_mappings(), all_helpers,
134 prepared_dict)
135 # print("compiled_template", json.dumps(compiled_template, indent=2))
136 catalogue_uri = compiled_template["@id"]
138 g = Graph()
139 g.parse(data=compiled_template, format='json-ld')
141 prefixes = {}
143 def normalize_uri(uri):
144 """Normalize a URI into a QName(short name) and store the prefix."""
145 prefix, uri, val = g.compute_qname(uri)
146 prefixes[prefix] = str(uri)
147 return f"{prefix}:{val}"
149 def generate_delete_sparql():
150 subjects = set(uris_to_del)
151 subjects.add(catalogue_uri)
152 # for s, p, o in g:
153 # subjects.add(s)
154 # print(s, p, o)
156 delete_clause = []
158 # Find the occurrences of the `s` is used as an object
159 for s in subjects:
160 # If 's' is not used by the current catalogue, skip it
161 paths_used_by_catalogue = get_num_paths(catalogue_uri, s)
162 num_paths_used_by_catalogue = len(paths_used_by_catalogue)
163 if num_paths_used_by_catalogue == 0:
164 continue
165 # Check if 's' is used by any other triples as an object
166 uri_as_object_usage = get_uri_as_object_usage(s)
167 # print('uri_as_object_usage', uri_as_object_usage, num_paths_used_by_catalogue)
169 if uri_as_object_usage == num_paths_used_by_catalogue:
170 # Remove this instance if it is only used by this catalogue
171 delete_clause.append(f'{normalize_uri(s)} ?p ?o')
172 delete_clause.append(f'?s ?p {normalize_uri(s)}')
173 elif uri_as_object_usage > num_paths_used_by_catalogue:
174 # Remove this instance in this catalogue only
175 for spos in paths_used_by_catalogue.values():
176 for _s, p, o in spos:
177 # Remove the last link only
178 if o == rdflib.term.URIRef(s):
179 delete_clause.append(f'{normalize_uri(_s)} {normalize_uri(p)} {normalize_uri(o)}')
181 # Remove all triples direcly linked to the catalogue
182 delete_clause.append(f'{normalize_uri(catalogue_uri)} ?p ?o')
185 return '\n'.join([f"PREFIX {prefix}: <{ns}>" for prefix, ns in prefixes.items()]) + '\n' \
186 + '\n'.join([f"DELETE WHERE {{\n\t{triple}.\n}};" for triple in delete_clause])
188 delete_query = generate_delete_sparql()
189 print('delete_query')
190 print(delete_query)
191 print(g.serialize(format="sparql-insert"))
193 client = get_client()
194 client.execute_sparql(delete_query)
195 client.execute_sparql(g.serialize(format="sparql-insert"))
198def onDeleteCatalogue(context, data_dict):
199 # print(f"onDeleteCatalogue: ", data_dict)
201 uris_to_del = find_existing_instance_uris(data_dict)
203 prepared_dict = prepare_data_dict(data_dict)
204 compiled_template = compile_with_temp_value(get_mappings(), all_helpers,
205 prepared_dict)
206 # print("compiled_template", compiled_template)
207 catalogue_uri = compiled_template["@id"]
209 g = Graph()
210 logging.getLogger("rdflib.term").setLevel(logging.ERROR)
211 g.parse(data=compiled_template, format='json-ld')
212 logging.getLogger("rdflib.term").setLevel(logging.WARNING)
214 prefixes = {}
216 def normalize_uri(uri):
217 """Normalize a URI into a QName(short name) and store the prefix."""
218 prefix, uri, val = g.compute_qname(uri)
219 prefixes[prefix] = str(uri)
221 # Sometimes the uri ends with a dot, this will result in a wrong prefix when using the prefixed name
222 if val.endswith('.'):
223 return f"<{uri}>"
225 return f"{prefix}:{val}"
227 def generate_delete_sparql():
228 subjects = set(uris_to_del)
229 subjects.add(catalogue_uri)
230 # for s, p, o in g:
231 # subjects.add(s)
232 # print(s, p, o)
234 delete_clause = []
235 # Find the occurrences of the `s` is used as an object
236 for s in subjects:
237 # If 's' is not used by the current catalogue, skip it
238 paths_used_by_catalogue = get_num_paths(catalogue_uri, s)
239 num_paths_used_by_catalogue = len(paths_used_by_catalogue)
240 if num_paths_used_by_catalogue == 0:
241 continue
242 # Check if 's' is used by any other triples as an object
243 uri_as_object_usage = get_uri_as_object_usage(s)
244 # print('uri_as_object_usage', uri_as_object_usage, num_paths_used_by_catalogue)
245 if uri_as_object_usage == num_paths_used_by_catalogue:
246 delete_clause.append(f'{normalize_uri(s)} ?p ?o')
247 delete_clause.append(f'?s ?p {normalize_uri(s)}')
248 elif uri_as_object_usage > num_paths_used_by_catalogue:
249 # Remove this instance in this catalogue only
250 for spos in paths_used_by_catalogue.values():
251 for s, p, o in spos:
252 delete_clause.append(f'{normalize_uri(s)} {normalize_uri(p)} {normalize_uri(o)}')
254 # Remove all triples direcly linked to the catalogue
255 delete_clause.append(f'{normalize_uri(catalogue_uri)} ?p ?o')
257 return '\n'.join([f"PREFIX {prefix}: <{ns}>" for prefix, ns in prefixes.items()]) + '\n' \
258 + '\n'.join([f"DELETE WHERE {{\n\t{triple}.\n}};" for triple in delete_clause])
260 delete_query = generate_delete_sparql()
261 print('delete_query')
262 print(delete_query)
264 client = get_client()
265 client.execute_sparql(delete_query)
268def get_catalogue_graph(package_id_or_name: str, format: str = "turtle") -> str:
269 """
270 Retrieve the knowledge graph for a specific catalogue entry.
272 Args:
273 package_id_or_name: The package ID or name
274 format: Output format - 'turtle', 'json-ld', 'xml', 'n3', 'nt' (default: 'turtle')
276 Returns:
277 str: Serialized RDF graph in the requested format
279 Raises:
280 ValueError: If package not found or invalid format
281 """
282 # Get the package to ensure it exists and get its ID
283 try:
284 package = tk.get_action('package_show')({}, {'id': package_id_or_name})
285 except tk.ObjectNotFound:
286 raise ValueError(f"Package '{package_id_or_name}' not found")
288 package_id = package.get('id')
289 if not package_id:
290 raise ValueError("Package ID not found")
292 # Get the catalogue URI by compiling the template with the package data
293 prepared_dict = prepare_data_dict(package)
294 compiled_template = compile_with_temp_value(get_mappings(), all_helpers, prepared_dict)
295 catalogue_uri = compiled_template["@id"]
297 # Validate format
298 valid_formats = {'turtle', 'json-ld', 'xml', 'n3', 'nt', 'pretty-xml'}
299 if format not in valid_formats:
300 raise ValueError(f"Invalid format '{format}'. Must be one of: {', '.join(valid_formats)}")
302 # Build SPARQL CONSTRUCT query to retrieve all triples related to this catalogue entry
303 # We retrieve:
304 # 1. All triples where the catalogue URI is the subject
305 # 2. All triples from objects (both blank nodes and URIs)
306 # 3. Nested blank nodes up to 3 levels deep
307 query = f"""
308 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
309 PREFIX owl: <http://www.w3.org/2002/07/owl#>
310 PREFIX dcat: <http://www.w3.org/ns/dcat#>
311 PREFIX skos: <http://www.w3.org/2004/02/skos/core#>
312 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
313 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
314 PREFIX dct: <http://purl.org/dc/terms/>
315 PREFIX dcterms: <http://purl.org/dc/terms/>
316 PREFIX foaf: <http://xmlns.com/foaf/0.1/>
317 PREFIX cudr: <http://data.urbandatacentre.ca/>
318 PREFIX cudrc: <http://data.urbandatacentre.ca/catalogue/>
319 PREFIX fair: <http://ontology.eil.utoronto.ca/fair#>
320 PREFIX adms: <http://www.w3.org/ns/adms#>
321 PREFIX locn: <http://www.w3.org/ns/locn#>
322 PREFIX odrl: <http://www.w3.org/ns/odrl/2/>
323 PREFIX prov: <http://www.w3.org/ns/prov#>
324 PREFIX oa: <http://www.w3.org/ns/oa#>
325 PREFIX vcard: <http://www.w3.org/2006/vcard/ns#>
326 PREFIX dqv: <http://www.w3.org/ns/dqv#>
327 PREFIX sc: <http://schema.org/>
329 CONSTRUCT {{
330 <{catalogue_uri}> ?p ?o .
331 ?o ?p2 ?o2 .
332 ?o2 ?p3 ?o3 .
333 ?o3 ?p4 ?o4 .
334 }}
335 WHERE {{
336 {{
337 # Level 1: Direct properties of catalogue
338 <{catalogue_uri}> ?p ?o .
339 }}
340 UNION
341 {{
342 # Level 2: Properties of objects (expand both URIs and blank nodes)
343 <{catalogue_uri}> ?p ?o .
344 ?o ?p2 ?o2 .
345 }}
346 UNION
347 {{
348 # Level 3: Properties of nested objects
349 <{catalogue_uri}> ?p ?o .
350 ?o ?p2 ?o2 .
351 ?o2 ?p3 ?o3 .
352 }}
353 UNION
354 {{
355 # Level 4: Properties of deeply nested objects
356 <{catalogue_uri}> ?p ?o .
357 ?o ?p2 ?o2 .
358 ?o2 ?p3 ?o3 .
359 ?o3 ?p4 ?o4 .
360 }}
361 }}
362 """
364 # Execute query - execute_sparql will automatically detect CONSTRUCT and return Turtle text
365 client = get_client()
366 try:
367 result = client.execute_sparql(query)
368 except Exception as e:
369 logging.getLogger(__name__).error(f"Error executing SPARQL query for package {package_id}: {str(e)}")
370 raise ValueError(f"Failed to retrieve graph: {str(e)}")
372 # Parse the result into an RDF graph
373 g = Graph()
375 # Bind namespace prefixes to the graph for better serialization
376 g.bind('xsd', 'http://www.w3.org/2001/XMLSchema#')
377 g.bind('owl', 'http://www.w3.org/2002/07/owl#')
378 g.bind('dcat', 'http://www.w3.org/ns/dcat#')
379 g.bind('skos', 'http://www.w3.org/2004/02/skos/core#')
380 g.bind('rdfs', 'http://www.w3.org/2000/01/rdf-schema#')
381 g.bind('rdf', 'http://www.w3.org/1999/02/22-rdf-syntax-ns#')
382 g.bind('dct', 'http://purl.org/dc/terms/')
383 g.bind('dcterms', 'http://purl.org/dc/terms/')
384 g.bind('foaf', 'http://xmlns.com/foaf/0.1/')
385 g.bind('cudr', 'http://data.urbandatacentre.ca/')
386 g.bind('cudrc', 'http://data.urbandatacentre.ca/catalogue/')
387 g.bind('fair', 'http://ontology.eil.utoronto.ca/fair#')
388 g.bind('adms', 'http://www.w3.org/ns/adms#')
389 g.bind('locn', 'http://www.w3.org/ns/locn#')
390 g.bind('odrl', 'http://www.w3.org/ns/odrl/2/')
391 g.bind('prov', 'http://www.w3.org/ns/prov#')
392 g.bind('oa', 'http://www.w3.org/ns/oa#')
393 g.bind('vcard', 'http://www.w3.org/2006/vcard/ns#')
394 g.bind('dqv', 'http://www.w3.org/ns/dqv#')
395 g.bind('sc', 'http://schema.org/')
397 # Result is now a Turtle string from the SPARQL endpoint
398 if result and isinstance(result, str) and result.strip():
399 try:
400 # Parse as Turtle (the default format returned by GraphDB for CONSTRUCT)
401 g.parse(data=result, format='turtle')
402 except Exception as e:
403 logging.getLogger(__name__).error(f"Error parsing SPARQL result: {str(e)}")
404 # Return empty graph if parsing fails
405 pass
406 else:
407 # Empty result
408 logging.getLogger(__name__).warning(f"No triples found for catalogue {package_id}")
410 # Serialize to requested format
411 try:
412 return g.serialize(format=format)
413 except Exception as e:
414 logging.getLogger(__name__).error(f"Error serializing graph to {format}: {str(e)}")
415 raise ValueError(f"Failed to serialize graph to {format}: {str(e)}")