Coverage for ckanext/udc/file_format/logic.py: 23%

75 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1from ckan import model, authz, logic 

2import ckan.plugins as plugins 

3import logging 

4import re 

5import json 

6from ckan.common import _ 

7 

8from ckanext.udc.graph.queries import get_client 

9from ckanext.udc.graph.preload import dropdown_reload 

10 

11log = logging.getLogger(__name__) 

12 

13GRAPH = "http://data.urbandatacentre.ca/custom-file-format" 

14PREFIX = GRAPH + "#" 

15 

16 

17def file_format_create(context, data_dict): 

18 """ 

19 Any user can create a custom file format. 

20 """ 

21 

22 if plugins.get_plugin('udc').disable_graphdb: 

23 raise logic.ValidationError(_("GraphDB integration is not enabled")) 

24 

25 user = context.get("user") 

26 id = data_dict.get('id') 

27 label = data_dict.get('label') 

28 

29 if not user: 

30 raise logic.ValidationError(_("You are not logged in")) 

31 

32 if not label: 

33 raise logic.ValidationError(_("file format label is required")) 

34 

35 if not id: 

36 # Replace whitespaces 

37 id = re.sub(r'\s', '-', label) 

38 # Remove unsafe characters in URL 

39 id = re.sub(r'[<>#%\{\}|\^~\[\]]/', '', id) 

40 

41 query = f""" 

42 PREFIX cff: <{PREFIX}> 

43 PREFIX term: <http://purl.org/dc/terms/> 

44 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> 

45 INSERT DATA {{ 

46 GRAPH <{GRAPH}> {{ 

47 cff:{id} a term:MediaType, owl:NamedIndividual; 

48 rdfs:label {json.dumps(label)} 

49 }} 

50 }} 

51 """ 

52 client = get_client() 

53 try: 

54 client.execute_sparql(query) 

55 except Exception as e: 

56 raise logic.ActionError("Error in KG:" + str(e)) 

57 

58 dropdown_reload("file_format") 

59 

60 return {"success": True, "id": PREFIX + id} 

61 

62 

63@logic.side_effect_free 

64def file_formats_get(context): 

65 """ 

66 Any user can get all custom file formats. 

67 """ 

68 

69 if plugins.get_plugin('udc').disable_graphdb: 

70 raise logic.ValidationError("GraphDB integration is not enabled") 

71 

72 query = f""" 

73 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> 

74 SELECT ?uri ?label FROM <{GRAPH}> WHERE {{ 

75 ?uri a term:MediaType; 

76 rdfs:label ?label. 

77 }} 

78 """ 

79 client = get_client() 

80 try: 

81 data = client.execute_sparql(query) 

82 except Exception as e: 

83 raise logic.ActionError(_("Error in KG:") + str(e)) 

84 

85 result = [] 

86 for item in result["results"]["bindings"]: 

87 result.append({ 

88 "id": item["uri"]["value"], 

89 "label": item["label"]["value"] 

90 }) 

91 return {"success": True, "data": data} 

92 

93 

94def file_format_delete(context, data_dict): 

95 """ 

96 Admin can delete a custom file format. 

97 """ 

98 if plugins.get_plugin('udc').disable_graphdb: 

99 raise logic.ValidationError(_("GraphDB integration is not enabled")) 

100 

101 user = context.get("user") 

102 id = data_dict.get('id') 

103 

104 if not user: 

105 raise logic.ValidationError(_("You are not logged in")) 

106 

107 if not id: 

108 raise logic.ValidationError(_("file format id is required")) 

109 

110 if not authz.is_sysadmin(user): 

111 raise logic.NotAuthorized(_("You are not authorized to delete this file format")) 

112 

113 query = f""" 

114 PREFIX cff: <{PREFIX}> 

115 PREFIX term: <http://purl.org/dc/terms/> 

116 PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> 

117 DELETE WHERE {{ 

118 GRAPH <{GRAPH}> {{ 

119 <{id}> a term:MediaType, owl:NamedIndividual; 

120 rdfs:label ?label. 

121  

122 }} 

123 }} 

124 """ 

125 client = get_client() 

126 try: 

127 client.execute_sparql(query) 

128 except Exception as e: 

129 raise logic.ActionError("Error in KG:" + str(e)) 

130 

131 dropdown_reload("file_format") 

132 

133 return {"success": True} 

134 

135 

136def before_package_update(context, data_dict): 

137 # Pre-process custom file format 

138 # If the provided file_format is not an URI (starts with http), create the custom file format 

139 if data_dict.get('file_format'): 

140 file_formats = data_dict.get('file_format').split(",") 

141 file_formats_list = [] 

142 for file_format in file_formats: 

143 if not file_format.startswith("http"): 

144 file_format_result = file_format_create(context, {"label": file_format}) 

145 file_formats_list.append(file_format_result["id"]) 

146 else: 

147 file_formats_list.append(file_format) 

148 data_dict['file_format'] = ','.join(file_formats_list)