]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/v1/format.py
use custom result formatters in CLI commands
[nominatim.git] / src / nominatim_api / v1 / format.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Output formatters for API version v1.
9 """
10 from typing import List, Dict, Mapping, Any
11 import collections
12 import datetime as dt
13
14 from ..utils.json_writer import JsonWriter
15 from ..status import StatusResult
16 from ..results import DetailedResult, ReverseResults, SearchResults, \
17                       AddressLines, AddressLine
18 from ..localization import Locales
19 from ..result_formatting import FormatDispatcher
20 from .classtypes import ICONS
21 from . import format_json, format_xml
22 from .. import logging as loglib
23 from ..server import content_types as ct
24
25 class RawDataList(List[Dict[str, Any]]):
26     """ Data type for formatting raw data lists 'as is' in json.
27     """
28
29 dispatch = FormatDispatcher({'text': ct.CONTENT_TEXT,
30                              'xml': ct.CONTENT_XML,
31                              'debug': ct.CONTENT_HTML})
32
33 @dispatch.error_format_func
34 def _format_error(content_type: str, msg: str, status: int) -> str:
35     if content_type == ct.CONTENT_XML:
36         return f"""<?xml version="1.0" encoding="UTF-8" ?>
37                    <error>
38                      <code>{status}</code>
39                      <message>{msg}</message>
40                    </error>
41                 """
42
43     if content_type == ct.CONTENT_JSON:
44         return f"""{{"error":{{"code":{status},"message":"{msg}"}}}}"""
45
46     if content_type == ct.CONTENT_HTML:
47         loglib.log().section('Execution error')
48         loglib.log().var_dump('Status', status)
49         loglib.log().var_dump('Message', msg)
50         return loglib.get_and_disable()
51
52     return f"ERROR {status}: {msg}"
53
54
55 @dispatch.format_func(StatusResult, 'text')
56 def _format_status_text(result: StatusResult, _: Mapping[str, Any]) -> str:
57     if result.status:
58         return f"ERROR: {result.message}"
59
60     return 'OK'
61
62
63 @dispatch.format_func(StatusResult, 'json')
64 def _format_status_json(result: StatusResult, _: Mapping[str, Any]) -> str:
65     out = JsonWriter()
66
67     out.start_object()\
68          .keyval('status', result.status)\
69          .keyval('message', result.message)\
70          .keyval_not_none('data_updated', result.data_updated,
71                           lambda v: v.isoformat())\
72          .keyval('software_version', str(result.software_version))\
73          .keyval_not_none('database_version', result.database_version, str)\
74        .end_object()
75
76     return out()
77
78
79 def _add_address_row(writer: JsonWriter, row: AddressLine,
80                      locales: Locales) -> None:
81     writer.start_object()\
82             .keyval('localname', locales.display_name(row.names))\
83             .keyval_not_none('place_id', row.place_id)
84
85     if row.osm_object is not None:
86         writer.keyval('osm_id', row.osm_object[1])\
87               .keyval('osm_type', row.osm_object[0])
88
89     if row.extratags:
90         writer.keyval_not_none('place_type', row.extratags.get('place_type'))
91
92     writer.keyval('class', row.category[0])\
93           .keyval('type', row.category[1])\
94           .keyval_not_none('admin_level', row.admin_level)\
95           .keyval('rank_address', row.rank_address)\
96           .keyval('distance', row.distance)\
97           .keyval('isaddress', row.isaddress)\
98         .end_object()
99
100
101 def _add_address_rows(writer: JsonWriter, section: str, rows: AddressLines,
102                       locales: Locales) -> None:
103     writer.key(section).start_array()
104     for row in rows:
105         _add_address_row(writer, row, locales)
106         writer.next()
107     writer.end_array().next()
108
109
110 def _add_parent_rows_grouped(writer: JsonWriter, rows: AddressLines,
111                              locales: Locales) -> None:
112     # group by category type
113     data = collections.defaultdict(list)
114     for row in rows:
115         sub = JsonWriter()
116         _add_address_row(sub, row, locales)
117         data[row.category[1]].append(sub())
118
119     writer.key('hierarchy').start_object()
120     for group, grouped in data.items():
121         writer.key(group).start_array()
122         grouped.sort() # sorts alphabetically by local name
123         for line in grouped:
124             writer.raw(line).next()
125         writer.end_array().next()
126
127     writer.end_object().next()
128
129
130 @dispatch.format_func(DetailedResult, 'json')
131 def _format_details_json(result: DetailedResult, options: Mapping[str, Any]) -> str:
132     locales = options.get('locales', Locales())
133     geom = result.geometry.get('geojson')
134     centroid = result.centroid.to_geojson()
135
136     out = JsonWriter()
137     out.start_object()\
138          .keyval_not_none('place_id', result.place_id)\
139          .keyval_not_none('parent_place_id', result.parent_place_id)
140
141     if result.osm_object is not None:
142         out.keyval('osm_type', result.osm_object[0])\
143            .keyval('osm_id', result.osm_object[1])
144
145     out.keyval('category', result.category[0])\
146          .keyval('type', result.category[1])\
147          .keyval('admin_level', result.admin_level)\
148          .keyval('localname', result.locale_name or '')\
149          .keyval('names', result.names or {})\
150          .keyval('addresstags', result.address or {})\
151          .keyval_not_none('housenumber', result.housenumber)\
152          .keyval_not_none('calculated_postcode', result.postcode)\
153          .keyval_not_none('country_code', result.country_code)\
154          .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
155          .keyval_not_none('importance', result.importance)\
156          .keyval('calculated_importance', result.calculated_importance())\
157          .keyval('extratags', result.extratags or {})\
158          .keyval_not_none('calculated_wikipedia', result.wikipedia)\
159          .keyval('rank_address', result.rank_address)\
160          .keyval('rank_search', result.rank_search)\
161          .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
162          .key('centroid').raw(centroid).next()\
163          .key('geometry').raw(geom or centroid).next()
164
165     if options.get('icon_base_url', None):
166         icon = ICONS.get(result.category)
167         if icon:
168             out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
169
170     if result.address_rows is not None:
171         _add_address_rows(out, 'address', result.address_rows, locales)
172
173     if result.linked_rows:
174         _add_address_rows(out, 'linked_places', result.linked_rows, locales)
175
176     if result.name_keywords is not None or result.address_keywords is not None:
177         out.key('keywords').start_object()
178
179         for sec, klist in (('name', result.name_keywords), ('address', result.address_keywords)):
180             out.key(sec).start_array()
181             for word in (klist or []):
182                 out.start_object()\
183                      .keyval('id', word.word_id)\
184                      .keyval('token', word.word_token)\
185                    .end_object().next()
186             out.end_array().next()
187
188         out.end_object().next()
189
190     if result.parented_rows is not None:
191         if options.get('group_hierarchy', False):
192             _add_parent_rows_grouped(out, result.parented_rows, locales)
193         else:
194             _add_address_rows(out, 'hierarchy', result.parented_rows, locales)
195
196     out.end_object()
197
198     return out()
199
200
201 @dispatch.format_func(ReverseResults, 'xml')
202 def _format_reverse_xml(results: ReverseResults, options: Mapping[str, Any]) -> str:
203     return format_xml.format_base_xml(results,
204                                       options, True, 'reversegeocode',
205                                       {'querystring': options.get('query', '')})
206
207
208 @dispatch.format_func(ReverseResults, 'geojson')
209 def _format_reverse_geojson(results: ReverseResults,
210                             options: Mapping[str, Any]) -> str:
211     return format_json.format_base_geojson(results, options, True)
212
213
214 @dispatch.format_func(ReverseResults, 'geocodejson')
215 def _format_reverse_geocodejson(results: ReverseResults,
216                                 options: Mapping[str, Any]) -> str:
217     return format_json.format_base_geocodejson(results, options, True)
218
219
220 @dispatch.format_func(ReverseResults, 'json')
221 def _format_reverse_json(results: ReverseResults,
222                          options: Mapping[str, Any]) -> str:
223     return format_json.format_base_json(results, options, True,
224                                         class_label='class')
225
226
227 @dispatch.format_func(ReverseResults, 'jsonv2')
228 def _format_reverse_jsonv2(results: ReverseResults,
229                            options: Mapping[str, Any]) -> str:
230     return format_json.format_base_json(results, options, True,
231                                         class_label='category')
232
233
234 @dispatch.format_func(SearchResults, 'xml')
235 def _format_search_xml(results: SearchResults, options: Mapping[str, Any]) -> str:
236     extra = {'querystring': options.get('query', '')}
237     for attr in ('more_url', 'exclude_place_ids', 'viewbox'):
238         if options.get(attr):
239             extra[attr] = options[attr]
240     return format_xml.format_base_xml(results, options, False, 'searchresults',
241                                       extra)
242
243
244
245 @dispatch.format_func(SearchResults, 'geojson')
246 def _format_search_geojson(results: SearchResults,
247                             options: Mapping[str, Any]) -> str:
248     return format_json.format_base_geojson(results, options, False)
249
250
251 @dispatch.format_func(SearchResults, 'geocodejson')
252 def _format_search_geocodejson(results: SearchResults,
253                                 options: Mapping[str, Any]) -> str:
254     return format_json.format_base_geocodejson(results, options, False)
255
256
257 @dispatch.format_func(SearchResults, 'json')
258 def _format_search_json(results: SearchResults,
259                          options: Mapping[str, Any]) -> str:
260     return format_json.format_base_json(results, options, False,
261                                         class_label='class')
262
263
264 @dispatch.format_func(SearchResults, 'jsonv2')
265 def _format_search_jsonv2(results: SearchResults,
266                            options: Mapping[str, Any]) -> str:
267     return format_json.format_base_json(results, options, False,
268                                         class_label='category')
269
270 @dispatch.format_func(RawDataList, 'json')
271 def _format_raw_data_json(results: RawDataList,  _: Mapping[str, Any]) -> str:
272     out = JsonWriter()
273     out.start_array()
274     for res in results:
275         out.start_object()
276         for k, v in res.items():
277             if isinstance(v, dt.datetime):
278                 out.keyval(k, v.isoformat(sep= ' ', timespec='seconds'))
279             else:
280                 out.keyval(k, v)
281         out.end_object().next()
282
283     out.end_array()
284
285     return out()