Coverage for ckanext/udc/graph/template.py: 95%

82 statements  

« prev     ^ index     » next       coverage.py v7.7.1, created at 2026-01-19 23:48 +0000

1import sys 

2import re 

3from pyld import jsonld 

4from copy import deepcopy 

5 

6from ckan.plugins.core import get_plugin 

7from .mapping_helpers import generate_uuid 

8from .contants import EMPTY_FIELD 

9 

10 

11def is_all_attrs_starts_with_at(data: dict): 

12 for k in data: 

13 if not k.startswith('@'): 

14 return False 

15 return True 

16 

17def filter_out_empty_values(data_list: list): 

18 def is_not_empty(data_dict: dict): 

19 if len(data_dict) == 1 and data_dict.get("@id"): 

20 return True 

21 elif len(data_dict) == 0: 

22 return False 

23 elif is_all_attrs_starts_with_at(data_dict) and not data_dict.get("@value"): 

24 return False 

25 return True 

26 

27 return list(filter(is_not_empty, data_list)) 

28 

29 

30def compile_template(template, global_vars, local_vars, nested=False): 

31 """ 

32 Parse the mapping config into json-ld data. 

33 Empty fields are removed. 

34 """ 

35 udcPlugin = get_plugin('udc') 

36 result = deepcopy(template) 

37 if not isinstance(result, list): 

38 result = [result] 

39 

40 for idx, item in enumerate(result): 

41 if isinstance(item, str): 

42 val = eval(f'f"{item}"', global_vars, local_vars) 

43 if val == '' or val is None: 

44 result[idx] = EMPTY_FIELD 

45 else: 

46 result[idx] = val 

47 

48 else: 

49 attrs_to_del = [] 

50 for attr in item: 

51 if not isinstance(item[attr], str): 

52 item[attr] = filter_out_empty_values(compile_template( 

53 item[attr], global_vars, local_vars, nested=True)) 

54 

55 if len(item[attr]) == 0 or (len(item[attr]) == 1 and len(item[attr][0]) == 0): 

56 attrs_to_del.append(attr) 

57 

58 else: 

59 try: 

60 should_eval = item[attr].startswith('eval(') and item[attr].endswith(')') 

61 if should_eval: 

62 expression = item[attr][5:-1].strip() 

63 # If the expression within eval() is a string field name, 

64 # and that field is a localized text, we need to convert it to json-ld 

65 # All text fields defined in UDC plugin are localized text fields 

66 if expression in udcPlugin.text_fields: 

67 # If the expression is a text field, i.e. `eval(title)`, we need to map it to multiple languages 

68 val = eval(f'map_to_multiple_languages({expression})', global_vars, local_vars) 

69 else: 

70 # General eval() syntax 

71 val = eval(expression, global_vars, local_vars) 

72 

73 # Remove [{ "@value": '....' }] wrapping 

74 if len(result) == 1 and len(item) == 1 and attr == '@value': 

75 if val is None or isinstance(val, str) and len(val) == 0: 

76 return [] 

77 return val 

78 

79 else: 

80 # Literals 

81 val = eval(f'f"{item[attr]}"', global_vars, local_vars) 

82 if val == '' or val is None: 

83 item[attr] = EMPTY_FIELD 

84 else: 

85 item[attr] = val 

86 

87 if EMPTY_FIELD in item[attr]: 

88 attrs_to_del.append(attr) 

89 except Exception as e: 

90 if not "is not defined" in str(e): 

91 print(f'Unable to evaluate: {item[attr]}; {str(e)}', file=sys.stderr) 

92 attrs_to_del.append(attr) 

93 for attr in attrs_to_del: 

94 del item[attr] 

95 

96 result = list(filter(lambda x: x != EMPTY_FIELD, filter_out_empty_values(result))) 

97 

98 if not nested and len(result) == 1: 

99 return result[0] 

100 else: 

101 return result 

102 

103 

104def compile_with_temp_value(mappings, global_vars, local_vars, nested=False): 

105 """ 

106 Parse the mapping config into json-ld data. 

107 Empty fields are preserved and filled with some string "TEMP_VALUE" or EMPTY_FIELD. 

108 """ 

109 result = deepcopy(mappings) 

110 if not isinstance(result, list): 

111 result = [result] 

112 for idx, item in enumerate(result): 

113 if not isinstance(item, str): 

114 for attr in item: 

115 if not isinstance(item[attr], str): 

116 item[attr] = compile_with_temp_value(item[attr], global_vars, local_vars) 

117 else: 

118 try: 

119 item[attr] = eval(f'f"{item[attr]}"', global_vars, local_vars) 

120 except: 

121 item[attr] = re.sub(r"\{.*}", "TEMP_VALUE", item[attr], flags=re.M) 

122 

123 # Make sure the result is a dict 

124 if not nested and len(result) == 1: 

125 return result[0] 

126 else: 

127 return result