Coverage for ckanext/udc/graph/logic.py: 9%

217 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1import ckan.plugins as plugins 

2import ckan.plugins.toolkit as tk 

3 

4import rdflib 

5from rdflib import Graph, term 

6from rdflib.namespace import split_uri 

7from rdflib.plugins.serializers.turtle import TurtleSerializer 

8from rdflib.serializer import Serializer 

9import pyld 

10import logging 

11import json 

12 

13from .serializer import * 

14from .template import compile_template, compile_with_temp_value 

15from .mapping_helpers import all_helpers 

16from .ckan_field import prepare_data_dict 

17from .queries import get_uri_as_object_usage, get_client, get_num_paths 

18 

19 

20def get_mappings(): 

21 # print("mappings: ", plugins.get_plugin('udc').mappings) 

22 return pyld.jsonld.expand(plugins.get_plugin('udc').mappings) 

23 

24 

25def find_existing_instance_uris(data_dict) -> list: 

26 """Return a list of URIs""" 

27 prepared_dict = prepare_data_dict(data_dict) 

28 g = Graph() 

29 compiled_template = compile_with_temp_value(get_mappings(), all_helpers, 

30 prepared_dict) 

31 catalogue_uri = compiled_template["@id"] 

32 # print("compiled_template", compiled_template) 

33 

34 # Ignore warnings from rdflib.term when parsing the template with temp values 

35 logging.getLogger("rdflib.term").setLevel(logging.ERROR) 

36 g.parse(data=compiled_template, format='json-ld') 

37 logging.getLogger("rdflib.term").setLevel(logging.WARNING) 

38 

39 # Get all uris that is used as subject (s, any, any) 

40 subjects = set(g.subjects(None, None)) 

41 

42 # Find all (s1, p, s2) triples and generate mappings: s -> tuple(s, p, o) 

43 s2spo = {} 

44 for s1 in subjects: 

45 for s2 in subjects: 

46 if s1 == s2: 

47 continue 

48 for p in g.predicates(s1, s2): 

49 if s1 in s2spo: 

50 s2spo[s1].append((s1, p, s2)) 

51 else: 

52 s2spo[s1] = [(s1, p, s2)] 

53 # print("s2spo", s2spo) 

54 

55 # Generate s -> vars mappings 

56 s2var = {} 

57 cnt = 0 

58 for s in subjects: 

59 if s != catalogue_uri: 

60 s2var[str(s)] = f'var{cnt}' 

61 cnt += 1 

62 

63 # print('s2var', s2var) 

64 

65 # o -> (s, sparql part) 

66 query_parts = {} 

67 

68 # Generate select query 

69 triples = '' 

70 for spos in s2spo.values(): 

71 for s, p, o in spos: 

72 # First level 

73 if s == rdflib.term.URIRef(catalogue_uri): 

74 query_parts[str(o)] = (str(s), f"<{s}> <{p}> ?{s2var[str(o)]}") 

75 # triples += f"\tOPTIONAL {{ <{s}> <{p}> ?{s2var[str(o)]} }}\n" 

76 else: 

77 query_parts[str(o)] = (str(s), f"?{s2var[str(s)]} <{p}> ?{s2var[str(o)]}") 

78 # triples += f"\tOPTIONAL {{ ?{s2var[str(s)]} <{p}> ?{s2var[str(o)]} }}\n" 

79 for o, (s, part) in query_parts.items(): 

80 if s == catalogue_uri: 

81 triples += f"\tOPTIONAL {{ {part} }}\n" 

82 else: 

83 path = [part] 

84 curr = s 

85 max_iteration = 100 

86 i = 0 

87 while curr != catalogue_uri: 

88 upper_level = query_parts.get(curr) 

89 if not upper_level: 

90 raise ValueError("Cannot find path to the catalogue entry") 

91 path.append(upper_level[1]) 

92 curr = upper_level[0] 

93 i += 1 

94 if max_iteration < i: 

95 raise ValueError("Reached max iteration to find the path to the catalogue entry") 

96 inner_query = '' 

97 for step in path: 

98 inner_query = f"\tOPTIONAL {{ {step} {inner_query} }}\n" 

99 triples += inner_query 

100 

101 

102 query = f'SELECT DISTINCT * WHERE {{\n{triples}}}' 

103 # print('select-query', query) 

104 client = get_client() 

105 result = client.execute_sparql(query) 

106 if len(result["results"]["bindings"]) == 0: 

107 return {} 

108 elif len(result["results"]["bindings"]) > 1: 

109 raise ValueError("KG may not be consistent.") 

110 

111 s2uri = {} 

112 results = result["results"]["bindings"][0] 

113 for s, var in s2var.items(): 

114 if results.get(var) and results[var]["type"] != "bnode": # Skip blank nodes 

115 s2uri[s] = results[var]["value"] 

116 # print("s2uri", s2uri) 

117 return [*s2uri.values()] 

118 

119 

120def onUpdateCatalogue(context, data_dict): 

121 # print(f"onUpdateCatalogue Update: ", data_dict) 

122 

123 # Remove empty fields 

124 for key in [*data_dict.keys()]: 

125 if data_dict[key] == '': 

126 del data_dict[key] 

127 

128 uris_to_del = find_existing_instance_uris(data_dict) 

129 

130 # print("data_dict", json.dumps(data_dict, indent=2)) 

131 prepared_dict = prepare_data_dict(data_dict) 

132 # print("prepared_dict", json.dumps(prepared_dict, indent=2)) 

133 compiled_template = compile_template(get_mappings(), all_helpers, 

134 prepared_dict) 

135 # print("compiled_template", json.dumps(compiled_template, indent=2)) 

136 catalogue_uri = compiled_template["@id"] 

137 

138 g = Graph() 

139 g.parse(data=compiled_template, format='json-ld') 

140 

141 prefixes = {} 

142 

143 def normalize_uri(uri): 

144 """Normalize a URI into a QName(short name) and store the prefix.""" 

145 prefix, uri, val = g.compute_qname(uri) 

146 prefixes[prefix] = str(uri) 

147 return f"{prefix}:{val}" 

148 

149 def generate_delete_sparql(): 

150 subjects = set(uris_to_del) 

151 subjects.add(catalogue_uri) 

152 # for s, p, o in g: 

153 # subjects.add(s) 

154 # print(s, p, o) 

155 

156 delete_clause = [] 

157 

158 # Find the occurrences of the `s` is used as an object 

159 for s in subjects: 

160 # If 's' is not used by the current catalogue, skip it 

161 paths_used_by_catalogue = get_num_paths(catalogue_uri, s) 

162 num_paths_used_by_catalogue = len(paths_used_by_catalogue) 

163 if num_paths_used_by_catalogue == 0: 

164 continue 

165 # Check if 's' is used by any other triples as an object 

166 uri_as_object_usage = get_uri_as_object_usage(s) 

167 # print('uri_as_object_usage', uri_as_object_usage, num_paths_used_by_catalogue) 

168 

169 if uri_as_object_usage == num_paths_used_by_catalogue: 

170 # Remove this instance if it is only used by this catalogue 

171 delete_clause.append(f'{normalize_uri(s)} ?p ?o') 

172 delete_clause.append(f'?s ?p {normalize_uri(s)}') 

173 elif uri_as_object_usage > num_paths_used_by_catalogue: 

174 # Remove this instance in this catalogue only 

175 for spos in paths_used_by_catalogue.values(): 

176 for _s, p, o in spos: 

177 # Remove the last link only 

178 if o == rdflib.term.URIRef(s): 

179 delete_clause.append(f'{normalize_uri(_s)} {normalize_uri(p)} {normalize_uri(o)}') 

180 

181 # Remove all triples direcly linked to the catalogue 

182 delete_clause.append(f'{normalize_uri(catalogue_uri)} ?p ?o') 

183 

184 

185 return '\n'.join([f"PREFIX {prefix}: <{ns}>" for prefix, ns in prefixes.items()]) + '\n' \ 

186 + '\n'.join([f"DELETE WHERE {{\n\t{triple}.\n}};" for triple in delete_clause]) 

187 

188 delete_query = generate_delete_sparql() 

189 print('delete_query') 

190 print(delete_query) 

191 print(g.serialize(format="sparql-insert")) 

192 

193 client = get_client() 

194 client.execute_sparql(delete_query) 

195 client.execute_sparql(g.serialize(format="sparql-insert")) 

196 

197 

198def onDeleteCatalogue(context, data_dict): 

199 # print(f"onDeleteCatalogue: ", data_dict) 

200 

201 uris_to_del = find_existing_instance_uris(data_dict) 

202 

203 prepared_dict = prepare_data_dict(data_dict) 

204 compiled_template = compile_with_temp_value(get_mappings(), all_helpers, 

205 prepared_dict) 

206 # print("compiled_template", compiled_template) 

207 catalogue_uri = compiled_template["@id"] 

208 

209 g = Graph() 

210 logging.getLogger("rdflib.term").setLevel(logging.ERROR) 

211 g.parse(data=compiled_template, format='json-ld') 

212 logging.getLogger("rdflib.term").setLevel(logging.WARNING) 

213 

214 prefixes = {} 

215 

216 def normalize_uri(uri): 

217 """Normalize a URI into a QName(short name) and store the prefix.""" 

218 prefix, uri, val = g.compute_qname(uri) 

219 prefixes[prefix] = str(uri) 

220 

221 # Sometimes the uri ends with a dot, this will result in a wrong prefix when using the prefixed name 

222 if val.endswith('.'): 

223 return f"<{uri}>" 

224 

225 return f"{prefix}:{val}" 

226 

227 def generate_delete_sparql(): 

228 subjects = set(uris_to_del) 

229 subjects.add(catalogue_uri) 

230 # for s, p, o in g: 

231 # subjects.add(s) 

232 # print(s, p, o) 

233 

234 delete_clause = [] 

235 # Find the occurrences of the `s` is used as an object 

236 for s in subjects: 

237 # If 's' is not used by the current catalogue, skip it 

238 paths_used_by_catalogue = get_num_paths(catalogue_uri, s) 

239 num_paths_used_by_catalogue = len(paths_used_by_catalogue) 

240 if num_paths_used_by_catalogue == 0: 

241 continue 

242 # Check if 's' is used by any other triples as an object 

243 uri_as_object_usage = get_uri_as_object_usage(s) 

244 # print('uri_as_object_usage', uri_as_object_usage, num_paths_used_by_catalogue) 

245 if uri_as_object_usage == num_paths_used_by_catalogue: 

246 delete_clause.append(f'{normalize_uri(s)} ?p ?o') 

247 delete_clause.append(f'?s ?p {normalize_uri(s)}') 

248 elif uri_as_object_usage > num_paths_used_by_catalogue: 

249 # Remove this instance in this catalogue only 

250 for spos in paths_used_by_catalogue.values(): 

251 for s, p, o in spos: 

252 delete_clause.append(f'{normalize_uri(s)} {normalize_uri(p)} {normalize_uri(o)}') 

253 

254 # Remove all triples direcly linked to the catalogue 

255 delete_clause.append(f'{normalize_uri(catalogue_uri)} ?p ?o') 

256 

257 return '\n'.join([f"PREFIX {prefix}: <{ns}>" for prefix, ns in prefixes.items()]) + '\n' \ 

258 + '\n'.join([f"DELETE WHERE {{\n\t{triple}.\n}};" for triple in delete_clause]) 

259 

260 delete_query = generate_delete_sparql() 

261 print('delete_query') 

262 print(delete_query) 

263 

264 client = get_client() 

265 client.execute_sparql(delete_query) 

266 

267 

268def get_catalogue_graph(package_id_or_name: str, format: str = "turtle") -> str: 

269 """ 

270 Retrieve the knowledge graph for a specific catalogue entry. 

271  

272 Args: 

273 package_id_or_name: The package ID or name 

274 format: Output format - 'turtle', 'json-ld', 'xml', 'n3', 'nt' (default: 'turtle') 

275  

276 Returns: 

277 str: Serialized RDF graph in the requested format 

278  

279 Raises: 

280 ValueError: If package not found or invalid format 

281 """ 

282 # Get the package to ensure it exists and get its ID 

283 try: 

284 package = tk.get_action('package_show')({}, {'id': package_id_or_name}) 

285 except tk.ObjectNotFound: 

286 raise ValueError(f"Package '{package_id_or_name}' not found") 

287 

288 package_id = package.get('id') 

289 if not package_id: 

290 raise ValueError("Package ID not found") 

291 

292 # Get the catalogue URI by compiling the template with the package data 

293 prepared_dict = prepare_data_dict(package) 

294 compiled_template = compile_with_temp_value(get_mappings(), all_helpers, prepared_dict) 

295 catalogue_uri = compiled_template["@id"] 

296 

297 # Validate format 

298 valid_formats = {'turtle', 'json-ld', 'xml', 'n3', 'nt', 'pretty-xml'} 

299 if format not in valid_formats: 

300 raise ValueError(f"Invalid format '{format}'. Must be one of: {', '.join(valid_formats)}") 

301 

302 # Build SPARQL CONSTRUCT query to retrieve all triples related to this catalogue entry 

303 # We retrieve: 

304 # 1. All triples where the catalogue URI is the subject 

305 # 2. All triples from objects (both blank nodes and URIs) 

306 # 3. Nested blank nodes up to 3 levels deep 

307 query = f""" 

308 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> 

309 PREFIX owl: <http://www.w3.org/2002/07/owl#> 

310 PREFIX dcat: <http://www.w3.org/ns/dcat#> 

311 PREFIX skos: <http://www.w3.org/2004/02/skos/core#> 

312 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> 

313 PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> 

314 PREFIX dct: <http://purl.org/dc/terms/> 

315 PREFIX dcterms: <http://purl.org/dc/terms/> 

316 PREFIX foaf: <http://xmlns.com/foaf/0.1/> 

317 PREFIX cudr: <http://data.urbandatacentre.ca/> 

318 PREFIX cudrc: <http://data.urbandatacentre.ca/catalogue/> 

319 PREFIX fair: <http://ontology.eil.utoronto.ca/fair#> 

320 PREFIX adms: <http://www.w3.org/ns/adms#> 

321 PREFIX locn: <http://www.w3.org/ns/locn#> 

322 PREFIX odrl: <http://www.w3.org/ns/odrl/2/> 

323 PREFIX prov: <http://www.w3.org/ns/prov#> 

324 PREFIX oa: <http://www.w3.org/ns/oa#> 

325 PREFIX vcard: <http://www.w3.org/2006/vcard/ns#> 

326 PREFIX dqv: <http://www.w3.org/ns/dqv#> 

327 PREFIX sc: <http://schema.org/> 

328  

329 CONSTRUCT {{ 

330 <{catalogue_uri}> ?p ?o . 

331 ?o ?p2 ?o2 . 

332 ?o2 ?p3 ?o3 . 

333 ?o3 ?p4 ?o4 . 

334 }} 

335 WHERE {{ 

336 {{ 

337 # Level 1: Direct properties of catalogue 

338 <{catalogue_uri}> ?p ?o . 

339 }} 

340 UNION 

341 {{ 

342 # Level 2: Properties of objects (expand both URIs and blank nodes) 

343 <{catalogue_uri}> ?p ?o . 

344 ?o ?p2 ?o2 . 

345 }} 

346 UNION 

347 {{ 

348 # Level 3: Properties of nested objects 

349 <{catalogue_uri}> ?p ?o . 

350 ?o ?p2 ?o2 . 

351 ?o2 ?p3 ?o3 . 

352 }} 

353 UNION 

354 {{ 

355 # Level 4: Properties of deeply nested objects 

356 <{catalogue_uri}> ?p ?o . 

357 ?o ?p2 ?o2 . 

358 ?o2 ?p3 ?o3 . 

359 ?o3 ?p4 ?o4 . 

360 }} 

361 }} 

362 """ 

363 

364 # Execute query - execute_sparql will automatically detect CONSTRUCT and return Turtle text 

365 client = get_client() 

366 try: 

367 result = client.execute_sparql(query) 

368 except Exception as e: 

369 logging.getLogger(__name__).error(f"Error executing SPARQL query for package {package_id}: {str(e)}") 

370 raise ValueError(f"Failed to retrieve graph: {str(e)}") 

371 

372 # Parse the result into an RDF graph 

373 g = Graph() 

374 

375 # Bind namespace prefixes to the graph for better serialization 

376 g.bind('xsd', 'http://www.w3.org/2001/XMLSchema#') 

377 g.bind('owl', 'http://www.w3.org/2002/07/owl#') 

378 g.bind('dcat', 'http://www.w3.org/ns/dcat#') 

379 g.bind('skos', 'http://www.w3.org/2004/02/skos/core#') 

380 g.bind('rdfs', 'http://www.w3.org/2000/01/rdf-schema#') 

381 g.bind('rdf', 'http://www.w3.org/1999/02/22-rdf-syntax-ns#') 

382 g.bind('dct', 'http://purl.org/dc/terms/') 

383 g.bind('dcterms', 'http://purl.org/dc/terms/') 

384 g.bind('foaf', 'http://xmlns.com/foaf/0.1/') 

385 g.bind('cudr', 'http://data.urbandatacentre.ca/') 

386 g.bind('cudrc', 'http://data.urbandatacentre.ca/catalogue/') 

387 g.bind('fair', 'http://ontology.eil.utoronto.ca/fair#') 

388 g.bind('adms', 'http://www.w3.org/ns/adms#') 

389 g.bind('locn', 'http://www.w3.org/ns/locn#') 

390 g.bind('odrl', 'http://www.w3.org/ns/odrl/2/') 

391 g.bind('prov', 'http://www.w3.org/ns/prov#') 

392 g.bind('oa', 'http://www.w3.org/ns/oa#') 

393 g.bind('vcard', 'http://www.w3.org/2006/vcard/ns#') 

394 g.bind('dqv', 'http://www.w3.org/ns/dqv#') 

395 g.bind('sc', 'http://schema.org/') 

396 

397 # Result is now a Turtle string from the SPARQL endpoint 

398 if result and isinstance(result, str) and result.strip(): 

399 try: 

400 # Parse as Turtle (the default format returned by GraphDB for CONSTRUCT) 

401 g.parse(data=result, format='turtle') 

402 except Exception as e: 

403 logging.getLogger(__name__).error(f"Error parsing SPARQL result: {str(e)}") 

404 # Return empty graph if parsing fails 

405 pass 

406 else: 

407 # Empty result 

408 logging.getLogger(__name__).warning(f"No triples found for catalogue {package_id}") 

409 

410 # Serialize to requested format 

411 try: 

412 return g.serialize(format=format) 

413 except Exception as e: 

414 logging.getLogger(__name__).error(f"Error serializing graph to {format}: {str(e)}") 

415 raise ValueError(f"Failed to serialize graph to {format}: {str(e)}")