Coverage for ckanext/udc/graph/template.py: 95%
82 statements
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2026-01-19 23:48 +0000
1import sys
2import re
3from pyld import jsonld
4from copy import deepcopy
6from ckan.plugins.core import get_plugin
7from .mapping_helpers import generate_uuid
8from .contants import EMPTY_FIELD
11def is_all_attrs_starts_with_at(data: dict):
12 for k in data:
13 if not k.startswith('@'):
14 return False
15 return True
17def filter_out_empty_values(data_list: list):
18 def is_not_empty(data_dict: dict):
19 if len(data_dict) == 1 and data_dict.get("@id"):
20 return True
21 elif len(data_dict) == 0:
22 return False
23 elif is_all_attrs_starts_with_at(data_dict) and not data_dict.get("@value"):
24 return False
25 return True
27 return list(filter(is_not_empty, data_list))
30def compile_template(template, global_vars, local_vars, nested=False):
31 """
32 Parse the mapping config into json-ld data.
33 Empty fields are removed.
34 """
35 udcPlugin = get_plugin('udc')
36 result = deepcopy(template)
37 if not isinstance(result, list):
38 result = [result]
40 for idx, item in enumerate(result):
41 if isinstance(item, str):
42 val = eval(f'f"{item}"', global_vars, local_vars)
43 if val == '' or val is None:
44 result[idx] = EMPTY_FIELD
45 else:
46 result[idx] = val
48 else:
49 attrs_to_del = []
50 for attr in item:
51 if not isinstance(item[attr], str):
52 item[attr] = filter_out_empty_values(compile_template(
53 item[attr], global_vars, local_vars, nested=True))
55 if len(item[attr]) == 0 or (len(item[attr]) == 1 and len(item[attr][0]) == 0):
56 attrs_to_del.append(attr)
58 else:
59 try:
60 should_eval = item[attr].startswith('eval(') and item[attr].endswith(')')
61 if should_eval:
62 expression = item[attr][5:-1].strip()
63 # If the expression within eval() is a string field name,
64 # and that field is a localized text, we need to convert it to json-ld
65 # All text fields defined in UDC plugin are localized text fields
66 if expression in udcPlugin.text_fields:
67 # If the expression is a text field, i.e. `eval(title)`, we need to map it to multiple languages
68 val = eval(f'map_to_multiple_languages({expression})', global_vars, local_vars)
69 else:
70 # General eval() syntax
71 val = eval(expression, global_vars, local_vars)
73 # Remove [{ "@value": '....' }] wrapping
74 if len(result) == 1 and len(item) == 1 and attr == '@value':
75 if val is None or isinstance(val, str) and len(val) == 0:
76 return []
77 return val
79 else:
80 # Literals
81 val = eval(f'f"{item[attr]}"', global_vars, local_vars)
82 if val == '' or val is None:
83 item[attr] = EMPTY_FIELD
84 else:
85 item[attr] = val
87 if EMPTY_FIELD in item[attr]:
88 attrs_to_del.append(attr)
89 except Exception as e:
90 if not "is not defined" in str(e):
91 print(f'Unable to evaluate: {item[attr]}; {str(e)}', file=sys.stderr)
92 attrs_to_del.append(attr)
93 for attr in attrs_to_del:
94 del item[attr]
96 result = list(filter(lambda x: x != EMPTY_FIELD, filter_out_empty_values(result)))
98 if not nested and len(result) == 1:
99 return result[0]
100 else:
101 return result
104def compile_with_temp_value(mappings, global_vars, local_vars, nested=False):
105 """
106 Parse the mapping config into json-ld data.
107 Empty fields are preserved and filled with some string "TEMP_VALUE" or EMPTY_FIELD.
108 """
109 result = deepcopy(mappings)
110 if not isinstance(result, list):
111 result = [result]
112 for idx, item in enumerate(result):
113 if not isinstance(item, str):
114 for attr in item:
115 if not isinstance(item[attr], str):
116 item[attr] = compile_with_temp_value(item[attr], global_vars, local_vars)
117 else:
118 try:
119 item[attr] = eval(f'f"{item[attr]}"', global_vars, local_vars)
120 except:
121 item[attr] = re.sub(r"\{.*}", "TEMP_VALUE", item[attr], flags=re.M)
123 # Make sure the result is a dict
124 if not nested and len(result) == 1:
125 return result[0]
126 else:
127 return result