1 # SPDX-License-Identifier: GPL-3.0-or-later
3 # This file is part of Nominatim. (https://nominatim.org)
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
8 Output formatters for API version v1.
10 from typing import List, Dict, Mapping, Any
14 from ..utils.json_writer import JsonWriter
15 from ..status import StatusResult
16 from ..results import DetailedResult, ReverseResults, SearchResults, \
17 AddressLines, AddressLine
18 from ..localization import Locales
19 from ..result_formatting import FormatDispatcher
20 from .classtypes import ICONS
21 from . import format_json, format_xml
22 from .. import logging as loglib
23 from ..server import content_types as ct
26 class RawDataList(List[Dict[str, Any]]):
27 """ Data type for formatting raw data lists 'as is' in json.
31 dispatch = FormatDispatcher({'text': ct.CONTENT_TEXT,
32 'xml': ct.CONTENT_XML,
33 'debug': ct.CONTENT_HTML})
36 @dispatch.error_format_func
37 def _format_error(content_type: str, msg: str, status: int) -> str:
38 if content_type == ct.CONTENT_XML:
39 return f"""<?xml version="1.0" encoding="UTF-8" ?>
42 <message>{msg}</message>
46 if content_type == ct.CONTENT_JSON:
47 return f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
49 if content_type == ct.CONTENT_HTML:
50 loglib.log().section('Execution error')
51 loglib.log().var_dump('Status', status)
52 loglib.log().var_dump('Message', msg)
53 return loglib.get_and_disable()
55 return f"ERROR {status}: {msg}"
58 @dispatch.format_func(StatusResult, 'text')
59 def _format_status_text(result: StatusResult, _: Mapping[str, Any]) -> str:
61 return f"ERROR: {result.message}"
66 @dispatch.format_func(StatusResult, 'json')
67 def _format_status_json(result: StatusResult, _: Mapping[str, Any]) -> str:
71 .keyval('status', result.status)\
72 .keyval('message', result.message)\
73 .keyval_not_none('data_updated', result.data_updated,
74 lambda v: v.isoformat())\
75 .keyval('software_version', str(result.software_version))\
76 .keyval_not_none('database_version', result.database_version, str)\
82 def _add_address_row(writer: JsonWriter, row: AddressLine,
83 locales: Locales) -> None:
84 writer.start_object()\
85 .keyval('localname', locales.display_name(row.names))\
86 .keyval_not_none('place_id', row.place_id)
88 if row.osm_object is not None:
89 writer.keyval('osm_id', row.osm_object[1])\
90 .keyval('osm_type', row.osm_object[0])
93 writer.keyval_not_none('place_type', row.extratags.get('place_type'))
95 writer.keyval('class', row.category[0])\
96 .keyval('type', row.category[1])\
97 .keyval_not_none('admin_level', row.admin_level)\
98 .keyval('rank_address', row.rank_address)\
99 .keyval('distance', row.distance)\
100 .keyval('isaddress', row.isaddress)\
104 def _add_address_rows(writer: JsonWriter, section: str, rows: AddressLines,
105 locales: Locales) -> None:
106 writer.key(section).start_array()
108 _add_address_row(writer, row, locales)
110 writer.end_array().next()
113 def _add_parent_rows_grouped(writer: JsonWriter, rows: AddressLines,
114 locales: Locales) -> None:
115 # group by category type
116 data = collections.defaultdict(list)
119 _add_address_row(sub, row, locales)
120 data[row.category[1]].append(sub())
122 writer.key('hierarchy').start_object()
123 for group, grouped in data.items():
124 writer.key(group).start_array()
125 grouped.sort() # sorts alphabetically by local name
127 writer.raw(line).next()
128 writer.end_array().next()
130 writer.end_object().next()
133 @dispatch.format_func(DetailedResult, 'json')
134 def _format_details_json(result: DetailedResult, options: Mapping[str, Any]) -> str:
135 locales = options.get('locales', Locales())
136 geom = result.geometry.get('geojson')
137 centroid = result.centroid.to_geojson()
141 .keyval_not_none('place_id', result.place_id)\
142 .keyval_not_none('parent_place_id', result.parent_place_id)
144 if result.osm_object is not None:
145 out.keyval('osm_type', result.osm_object[0])\
146 .keyval('osm_id', result.osm_object[1])
148 out.keyval('category', result.category[0])\
149 .keyval('type', result.category[1])\
150 .keyval('admin_level', result.admin_level)\
151 .keyval('localname', result.locale_name or '')\
152 .keyval('names', result.names or {})\
153 .keyval('addresstags', result.address or {})\
154 .keyval_not_none('housenumber', result.housenumber)\
155 .keyval_not_none('calculated_postcode', result.postcode)\
156 .keyval_not_none('country_code', result.country_code)\
157 .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
158 .keyval_not_none('importance', result.importance)\
159 .keyval('calculated_importance', result.calculated_importance())\
160 .keyval('extratags', result.extratags or {})\
161 .keyval_not_none('calculated_wikipedia', result.wikipedia)\
162 .keyval('rank_address', result.rank_address)\
163 .keyval('rank_search', result.rank_search)\
164 .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
165 .key('centroid').raw(centroid).next()\
166 .key('geometry').raw(geom or centroid).next()
168 if options.get('icon_base_url', None):
169 icon = ICONS.get(result.category)
171 out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
173 if result.address_rows is not None:
174 _add_address_rows(out, 'address', result.address_rows, locales)
176 if result.linked_rows:
177 _add_address_rows(out, 'linked_places', result.linked_rows, locales)
179 if result.name_keywords is not None or result.address_keywords is not None:
180 out.key('keywords').start_object()
182 for sec, klist in (('name', result.name_keywords), ('address', result.address_keywords)):
183 out.key(sec).start_array()
184 for word in (klist or []):
186 .keyval('id', word.word_id)\
187 .keyval('token', word.word_token)\
189 out.end_array().next()
191 out.end_object().next()
193 if result.parented_rows is not None:
194 if options.get('group_hierarchy', False):
195 _add_parent_rows_grouped(out, result.parented_rows, locales)
197 _add_address_rows(out, 'hierarchy', result.parented_rows, locales)
204 @dispatch.format_func(ReverseResults, 'xml')
205 def _format_reverse_xml(results: ReverseResults, options: Mapping[str, Any]) -> str:
206 return format_xml.format_base_xml(results,
207 options, True, 'reversegeocode',
208 {'querystring': options.get('query', '')})
211 @dispatch.format_func(ReverseResults, 'geojson')
212 def _format_reverse_geojson(results: ReverseResults,
213 options: Mapping[str, Any]) -> str:
214 return format_json.format_base_geojson(results, options, True)
217 @dispatch.format_func(ReverseResults, 'geocodejson')
218 def _format_reverse_geocodejson(results: ReverseResults,
219 options: Mapping[str, Any]) -> str:
220 return format_json.format_base_geocodejson(results, options, True)
223 @dispatch.format_func(ReverseResults, 'json')
224 def _format_reverse_json(results: ReverseResults,
225 options: Mapping[str, Any]) -> str:
226 return format_json.format_base_json(results, options, True,
230 @dispatch.format_func(ReverseResults, 'jsonv2')
231 def _format_reverse_jsonv2(results: ReverseResults,
232 options: Mapping[str, Any]) -> str:
233 return format_json.format_base_json(results, options, True,
234 class_label='category')
237 @dispatch.format_func(SearchResults, 'xml')
238 def _format_search_xml(results: SearchResults, options: Mapping[str, Any]) -> str:
239 extra = {'querystring': options.get('query', '')}
240 for attr in ('more_url', 'exclude_place_ids', 'viewbox'):
241 if options.get(attr):
242 extra[attr] = options[attr]
243 return format_xml.format_base_xml(results, options, False, 'searchresults',
247 @dispatch.format_func(SearchResults, 'geojson')
248 def _format_search_geojson(results: SearchResults,
249 options: Mapping[str, Any]) -> str:
250 return format_json.format_base_geojson(results, options, False)
253 @dispatch.format_func(SearchResults, 'geocodejson')
254 def _format_search_geocodejson(results: SearchResults,
255 options: Mapping[str, Any]) -> str:
256 return format_json.format_base_geocodejson(results, options, False)
259 @dispatch.format_func(SearchResults, 'json')
260 def _format_search_json(results: SearchResults,
261 options: Mapping[str, Any]) -> str:
262 return format_json.format_base_json(results, options, False,
266 @dispatch.format_func(SearchResults, 'jsonv2')
267 def _format_search_jsonv2(results: SearchResults,
268 options: Mapping[str, Any]) -> str:
269 return format_json.format_base_json(results, options, False,
270 class_label='category')
273 @dispatch.format_func(RawDataList, 'json')
274 def _format_raw_data_json(results: RawDataList, _: Mapping[str, Any]) -> str:
279 for k, v in res.items():
280 if isinstance(v, dt.datetime):
281 out.keyval(k, v.isoformat(sep=' ', timespec='seconds'))
284 out.end_object().next()