Coverage for ckanext/udc/graph/sparql_client.py: 27%
114 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1import re
2import requests
3import urllib
5from requests.adapters import HTTPAdapter, Retry
7POST = 'POST'
8GET = 'GET'
11class SPARQLResponse:
12 def __init__(self, response: requests.Response, is_update=False, is_graph_query=False):
13 self.response = response
14 self.is_update = is_update
15 self.is_graph_query = is_graph_query
16 if response.status_code >= 400:
17 raise ValueError(f'{response.status_code}: {response.text}')
19 def json(self):
20 if self.is_update:
21 return
22 return self.response.json()
24 def text(self):
25 """Return text response for CONSTRUCT/DESCRIBE queries"""
26 return self.response.text
29class SPARQLWrapper:
30 # https://www.w3.org/TR/rdf-sparql-query/#rPN_CHARS_BASE
31 PN_CHARS_BASE = r"[A-Z]|[a-z]|[\u00C0-\u00D6]|[\u00D8-\u00F6]|[\u00F8-\u02FF]|[\u0370-\u037D]|[\u037F-\u1FFF]" \
32 r"|[\u200C-\u200D]|[\u2070-\u218F]|[\u2C00-\u2FEF]|[\u3001-\uD7FF]|[\uF900-\uFDCF]" \
33 r"|[\uFDF0-\uFFFD]|[\U00010000-\U000EFFFF]"
34 PN_CHARS_U = PN_CHARS_BASE + r"|_"
35 PN_CHARS = PN_CHARS_U + r"|-|[0-9]|\u00B7|[\u0300-\u036F]|[\u203F-\u2040]"
36 PN_PREFIX = f'({PN_CHARS_BASE})(({PN_CHARS}|\\.)*({PN_CHARS}))?'
37 PNAME_NS = f'({PN_PREFIX})?:'
38 IRI_REF = r'<([^<>\"{}|^`\]\[\x00-\x20])*>'
39 PrefixDecl = re.compile(f'[Pp][Rr][Ee][Ff][Ii][Xx]\\s({PNAME_NS})\\s({IRI_REF})')
41 @staticmethod
42 def is_update_request(query_string):
43 """
44 Get the sparql query type: 'select' or 'update'.
45 This is required for the sparql endpoint.
46 """
47 # Remove prefixes
48 query_string = re.sub(re.compile(SPARQLWrapper.PrefixDecl), '', query_string)
49 # Remove comments
50 query_string = re.sub(r'^\s*#.*$', '', query_string, flags=re.MULTILINE)
53 # Trim the query
54 query_string = query_string.strip()
55 if re.match(r'^(select|construct|describe|ask)', query_string, re.IGNORECASE):
56 return False
57 else:
58 return True
60 @staticmethod
61 def is_graph_query(query_string):
62 """
63 Check if the query is a CONSTRUCT or DESCRIBE query that returns RDF graph.
64 """
65 # Remove prefixes
66 query_string = re.sub(re.compile(SPARQLWrapper.PrefixDecl), '', query_string)
67 # Remove comments
68 query_string = re.sub(r'^\s*#.*$', '', query_string, flags=re.MULTILINE)
69 # Trim the query
70 query_string = query_string.strip()
71 return bool(re.match(r'^(construct|describe)', query_string, re.IGNORECASE))
73 def __init__(self, endpoint, retry_attempts = 3, is_update=False, is_graph_query=False):
74 """
75 retry_attempts: number of retry attempts after timeouts
76 is_graph_query: True if query is CONSTRUCT/DESCRIBE (returns RDF graph)
77 """
78 self.endpoint = endpoint
79 self.is_update = is_update
80 self.is_graph_query = is_graph_query
81 self.username = None
82 self.password = None
83 self.method = 'POST'
84 self.session = None
85 self._query = None
86 self.retry_attempts = retry_attempts
88 def set_method(self, method):
89 self.method = method
91 def set_query(self, query):
92 self._query = query
94 def set_credentials(self, username, password=None):
95 self.username = username
96 self.password = password
98 def query(self, infer=True):
99 # Init session
100 if self.session is None:
101 self.session = requests.Session()
102 # Automatic retries
103 retries = Retry(total=self.retry_attempts,
104 backoff_factor=0.1,
105 status_forcelist=[ 500, 502, 503, 504 ])
106 self.session.mount(self.endpoint, HTTPAdapter(max_retries=retries))
108 if self.username:
109 self.session.auth = (self.username, self.password)
111 if self.method == POST:
112 if self.is_update:
113 response = self.session.request(self.method, self.endpoint,
114 data=f'update={urllib.parse.quote(self._query)}',
115 headers={
116 'Accept': 'text/plain',
117 'Content-Type': 'application/x-www-form-urlencoded'
118 })
119 elif self.is_graph_query:
120 # CONSTRUCT/DESCRIBE queries return RDF graphs, not JSON results
121 response = self.session.request(self.method, self.endpoint,
122 data=f'query={urllib.parse.quote(self._query)}&infer={"true" if infer else "false"}',
123 headers={
124 'Accept': 'text/turtle, application/rdf+xml, application/n-triples',
125 'Content-Type': 'application/x-www-form-urlencoded'
126 })
127 else:
128 response = self.session.request(self.method, self.endpoint,
129 data=f'query={urllib.parse.quote(self._query)}&infer={"true" if infer else "false"}',
130 headers={
131 'Accept': 'application/x-sparqlstar-results+json, application/sparql-results+json',
132 'Content-Type': 'application/x-www-form-urlencoded'
133 })
134 elif self.method == GET:
135 if self.is_update:
136 raise ValueError('update operations MUST be done by POST')
138 if self.is_graph_query:
139 response = self.session.request(self.method, self.endpoint,
140 params={'query': urllib.parse.quote(self._query), 'infer': "true" if infer else "false"},
141 headers={
142 'Accept': 'text/turtle, application/rdf+xml, application/n-triples'
143 })
144 else:
145 response = self.session.request(self.method, self.endpoint,
146 params={'query': urllib.parse.quote(self._query), 'infer': "true" if infer else "false"},
147 headers={
148 'Accept': 'application/x-sparqlstar-results+json,application/sparql-results+json'
149 })
150 else:
151 raise ValueError('Illegal method:', self.method)
153 return SPARQLResponse(response, is_update=self.is_update, is_graph_query=self.is_graph_query)
156class SparqlClient:
158 def __init__(self, endpoint, username=None, password=None):
159 self.query_client = SPARQLWrapper(endpoint, is_update=False, is_graph_query=False)
160 self.update_client = SPARQLWrapper(endpoint + '/statements', is_update=True, is_graph_query=False)
161 self.graph_client = SPARQLWrapper(endpoint, is_update=False, is_graph_query=True)
162 if username:
163 self.query_client.set_credentials(username, password)
164 self.update_client.set_credentials(username, password)
165 self.graph_client.set_credentials(username, password)
167 def execute_sparql(self, *query, infer=False, method=None):
168 """
169 Execute sparql query only without post processing
170 method could be 'select', 'update', 'construct', or None.
171 If method is None, SparqlClient.parse_sparql_type is invoked to check SPARQL format.
172 """
173 query_string = ';'.join(query)
175 # Check which client to use
176 if method == 'construct' or (method is None and SPARQLWrapper.is_graph_query(query_string)):
177 client = self.graph_client
178 elif method == 'select' or not SPARQLWrapper.is_update_request(query_string):
179 client = self.query_client
180 else:
181 client = self.update_client
183 client.set_method(POST)
184 client.set_query(query_string)
185 try:
186 response = client.query(infer=infer)
188 # For graph queries (CONSTRUCT/DESCRIBE), return text (Turtle format)
189 if client == self.graph_client:
190 return response.text()
191 else:
192 return response.json()
194 except:
195 print('error with the below sparql query using ' + (
196 'update client' if client == self.update_client else
197 'graph client' if client == self.graph_client else 'normal client'))
198 print(query_string.strip())
199 raise
201 def test_connecetion(self):
202 self.query_client.set_query("SELECT * WHERE {?s ?p ?o.} LIMIT 1")
203 try:
204 self.query_client.query().json()
205 except Exception as e:
206 print(e)
207 return False
208 return True