Coverage for ckanext/udc/graph/sparql_client.py: 27%

114 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1import re 

2import requests 

3import urllib 

4 

5from requests.adapters import HTTPAdapter, Retry 

6 

7POST = 'POST' 

8GET = 'GET' 

9 

10 

11class SPARQLResponse: 

12 def __init__(self, response: requests.Response, is_update=False, is_graph_query=False): 

13 self.response = response 

14 self.is_update = is_update 

15 self.is_graph_query = is_graph_query 

16 if response.status_code >= 400: 

17 raise ValueError(f'{response.status_code}: {response.text}') 

18 

19 def json(self): 

20 if self.is_update: 

21 return 

22 return self.response.json() 

23 

24 def text(self): 

25 """Return text response for CONSTRUCT/DESCRIBE queries""" 

26 return self.response.text 

27 

28 

29class SPARQLWrapper: 

30 # https://www.w3.org/TR/rdf-sparql-query/#rPN_CHARS_BASE 

31 PN_CHARS_BASE = r"[A-Z]|[a-z]|[\u00C0-\u00D6]|[\u00D8-\u00F6]|[\u00F8-\u02FF]|[\u0370-\u037D]|[\u037F-\u1FFF]" \ 

32 r"|[\u200C-\u200D]|[\u2070-\u218F]|[\u2C00-\u2FEF]|[\u3001-\uD7FF]|[\uF900-\uFDCF]" \ 

33 r"|[\uFDF0-\uFFFD]|[\U00010000-\U000EFFFF]" 

34 PN_CHARS_U = PN_CHARS_BASE + r"|_" 

35 PN_CHARS = PN_CHARS_U + r"|-|[0-9]|\u00B7|[\u0300-\u036F]|[\u203F-\u2040]" 

36 PN_PREFIX = f'({PN_CHARS_BASE})(({PN_CHARS}|\\.)*({PN_CHARS}))?' 

37 PNAME_NS = f'({PN_PREFIX})?:' 

38 IRI_REF = r'<([^<>\"{}|^`\]\[\x00-\x20])*>' 

39 PrefixDecl = re.compile(f'[Pp][Rr][Ee][Ff][Ii][Xx]\\s({PNAME_NS})\\s({IRI_REF})') 

40 

41 @staticmethod 

42 def is_update_request(query_string): 

43 """ 

44 Get the sparql query type: 'select' or 'update'. 

45 This is required for the sparql endpoint. 

46 """ 

47 # Remove prefixes 

48 query_string = re.sub(re.compile(SPARQLWrapper.PrefixDecl), '', query_string) 

49 # Remove comments 

50 query_string = re.sub(r'^\s*#.*$', '', query_string, flags=re.MULTILINE) 

51 

52 

53 # Trim the query 

54 query_string = query_string.strip() 

55 if re.match(r'^(select|construct|describe|ask)', query_string, re.IGNORECASE): 

56 return False 

57 else: 

58 return True 

59 

60 @staticmethod 

61 def is_graph_query(query_string): 

62 """ 

63 Check if the query is a CONSTRUCT or DESCRIBE query that returns RDF graph. 

64 """ 

65 # Remove prefixes 

66 query_string = re.sub(re.compile(SPARQLWrapper.PrefixDecl), '', query_string) 

67 # Remove comments 

68 query_string = re.sub(r'^\s*#.*$', '', query_string, flags=re.MULTILINE) 

69 # Trim the query 

70 query_string = query_string.strip() 

71 return bool(re.match(r'^(construct|describe)', query_string, re.IGNORECASE)) 

72 

73 def __init__(self, endpoint, retry_attempts = 3, is_update=False, is_graph_query=False): 

74 """ 

75 retry_attempts: number of retry attempts after timeouts 

76 is_graph_query: True if query is CONSTRUCT/DESCRIBE (returns RDF graph) 

77 """ 

78 self.endpoint = endpoint 

79 self.is_update = is_update 

80 self.is_graph_query = is_graph_query 

81 self.username = None 

82 self.password = None 

83 self.method = 'POST' 

84 self.session = None 

85 self._query = None 

86 self.retry_attempts = retry_attempts 

87 

88 def set_method(self, method): 

89 self.method = method 

90 

91 def set_query(self, query): 

92 self._query = query 

93 

94 def set_credentials(self, username, password=None): 

95 self.username = username 

96 self.password = password 

97 

98 def query(self, infer=True): 

99 # Init session 

100 if self.session is None: 

101 self.session = requests.Session() 

102 # Automatic retries 

103 retries = Retry(total=self.retry_attempts, 

104 backoff_factor=0.1, 

105 status_forcelist=[ 500, 502, 503, 504 ]) 

106 self.session.mount(self.endpoint, HTTPAdapter(max_retries=retries)) 

107 

108 if self.username: 

109 self.session.auth = (self.username, self.password) 

110 

111 if self.method == POST: 

112 if self.is_update: 

113 response = self.session.request(self.method, self.endpoint, 

114 data=f'update={urllib.parse.quote(self._query)}', 

115 headers={ 

116 'Accept': 'text/plain', 

117 'Content-Type': 'application/x-www-form-urlencoded' 

118 }) 

119 elif self.is_graph_query: 

120 # CONSTRUCT/DESCRIBE queries return RDF graphs, not JSON results 

121 response = self.session.request(self.method, self.endpoint, 

122 data=f'query={urllib.parse.quote(self._query)}&infer={"true" if infer else "false"}', 

123 headers={ 

124 'Accept': 'text/turtle, application/rdf+xml, application/n-triples', 

125 'Content-Type': 'application/x-www-form-urlencoded' 

126 }) 

127 else: 

128 response = self.session.request(self.method, self.endpoint, 

129 data=f'query={urllib.parse.quote(self._query)}&infer={"true" if infer else "false"}', 

130 headers={ 

131 'Accept': 'application/x-sparqlstar-results+json, application/sparql-results+json', 

132 'Content-Type': 'application/x-www-form-urlencoded' 

133 }) 

134 elif self.method == GET: 

135 if self.is_update: 

136 raise ValueError('update operations MUST be done by POST') 

137 

138 if self.is_graph_query: 

139 response = self.session.request(self.method, self.endpoint, 

140 params={'query': urllib.parse.quote(self._query), 'infer': "true" if infer else "false"}, 

141 headers={ 

142 'Accept': 'text/turtle, application/rdf+xml, application/n-triples' 

143 }) 

144 else: 

145 response = self.session.request(self.method, self.endpoint, 

146 params={'query': urllib.parse.quote(self._query), 'infer': "true" if infer else "false"}, 

147 headers={ 

148 'Accept': 'application/x-sparqlstar-results+json,application/sparql-results+json' 

149 }) 

150 else: 

151 raise ValueError('Illegal method:', self.method) 

152 

153 return SPARQLResponse(response, is_update=self.is_update, is_graph_query=self.is_graph_query) 

154 

155 

156class SparqlClient: 

157 

158 def __init__(self, endpoint, username=None, password=None): 

159 self.query_client = SPARQLWrapper(endpoint, is_update=False, is_graph_query=False) 

160 self.update_client = SPARQLWrapper(endpoint + '/statements', is_update=True, is_graph_query=False) 

161 self.graph_client = SPARQLWrapper(endpoint, is_update=False, is_graph_query=True) 

162 if username: 

163 self.query_client.set_credentials(username, password) 

164 self.update_client.set_credentials(username, password) 

165 self.graph_client.set_credentials(username, password) 

166 

167 def execute_sparql(self, *query, infer=False, method=None): 

168 """ 

169 Execute sparql query only without post processing 

170 method could be 'select', 'update', 'construct', or None. 

171 If method is None, SparqlClient.parse_sparql_type is invoked to check SPARQL format. 

172 """ 

173 query_string = ';'.join(query) 

174 

175 # Check which client to use 

176 if method == 'construct' or (method is None and SPARQLWrapper.is_graph_query(query_string)): 

177 client = self.graph_client 

178 elif method == 'select' or not SPARQLWrapper.is_update_request(query_string): 

179 client = self.query_client 

180 else: 

181 client = self.update_client 

182 

183 client.set_method(POST) 

184 client.set_query(query_string) 

185 try: 

186 response = client.query(infer=infer) 

187 

188 # For graph queries (CONSTRUCT/DESCRIBE), return text (Turtle format) 

189 if client == self.graph_client: 

190 return response.text() 

191 else: 

192 return response.json() 

193 

194 except: 

195 print('error with the below sparql query using ' + ( 

196 'update client' if client == self.update_client else 

197 'graph client' if client == self.graph_client else 'normal client')) 

198 print(query_string.strip()) 

199 raise 

200 

201 def test_connecetion(self): 

202 self.query_client.set_query("SELECT * WHERE {?s ?p ?o.} LIMIT 1") 

203 try: 

204 self.query_client.query().json() 

205 except Exception as e: 

206 print(e) 

207 return False 

208 return True