]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/v1/format.py
Merge branch 'osm-search:master' into check-database-on-frozen-database
[nominatim.git] / nominatim / api / v1 / format.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Output formatters for API version v1.
9 """
10 from typing import Mapping, Any
11 import collections
12
13 import nominatim.api as napi
14 from nominatim.api.result_formatting import FormatDispatcher
15 from nominatim.api.v1.classtypes import ICONS
16 from nominatim.api.v1 import format_json, format_xml
17 from nominatim.utils.json_writer import JsonWriter
18
19 dispatch = FormatDispatcher()
20
21 @dispatch.format_func(napi.StatusResult, 'text')
22 def _format_status_text(result: napi.StatusResult, _: Mapping[str, Any]) -> str:
23     if result.status:
24         return f"ERROR: {result.message}"
25
26     return 'OK'
27
28
29 @dispatch.format_func(napi.StatusResult, 'json')
30 def _format_status_json(result: napi.StatusResult, _: Mapping[str, Any]) -> str:
31     out = JsonWriter()
32
33     out.start_object()\
34          .keyval('status', result.status)\
35          .keyval('message', result.message)\
36          .keyval_not_none('data_updated', result.data_updated,
37                           lambda v: v.isoformat())\
38          .keyval('software_version', str(result.software_version))\
39          .keyval_not_none('database_version', result.database_version, str)\
40        .end_object()
41
42     return out()
43
44
45 def _add_address_row(writer: JsonWriter, row: napi.AddressLine,
46                      locales: napi.Locales) -> None:
47     writer.start_object()\
48             .keyval('localname', locales.display_name(row.names))\
49             .keyval_not_none('place_id', row.place_id)
50
51     if row.osm_object is not None:
52         writer.keyval('osm_id', row.osm_object[1])\
53               .keyval('osm_type', row.osm_object[0])
54
55     if row.extratags:
56         writer.keyval_not_none('place_type', row.extratags.get('place_type'))
57
58     writer.keyval('class', row.category[0])\
59           .keyval('type', row.category[1])\
60           .keyval_not_none('admin_level', row.admin_level)\
61           .keyval('rank_address', row.rank_address)\
62           .keyval('distance', row.distance)\
63           .keyval('isaddress', row.isaddress)\
64         .end_object()
65
66
67 def _add_address_rows(writer: JsonWriter, section: str, rows: napi.AddressLines,
68                       locales: napi.Locales) -> None:
69     writer.key(section).start_array()
70     for row in rows:
71         _add_address_row(writer, row, locales)
72         writer.next()
73     writer.end_array().next()
74
75
76 def _add_parent_rows_grouped(writer: JsonWriter, rows: napi.AddressLines,
77                              locales: napi.Locales) -> None:
78     # group by category type
79     data = collections.defaultdict(list)
80     for row in rows:
81         sub = JsonWriter()
82         _add_address_row(sub, row, locales)
83         data[row.category[1]].append(sub())
84
85     writer.key('hierarchy').start_object()
86     for group, grouped in data.items():
87         writer.key(group).start_array()
88         grouped.sort() # sorts alphabetically by local name
89         for line in grouped:
90             writer.raw(line).next()
91         writer.end_array().next()
92
93     writer.end_object().next()
94
95
96 @dispatch.format_func(napi.DetailedResult, 'json')
97 def _format_details_json(result: napi.DetailedResult, options: Mapping[str, Any]) -> str:
98     locales = options.get('locales', napi.Locales())
99     geom = result.geometry.get('geojson')
100     centroid = result.centroid.to_geojson()
101
102     out = JsonWriter()
103     out.start_object()\
104          .keyval_not_none('place_id', result.place_id)\
105          .keyval_not_none('parent_place_id', result.parent_place_id)
106
107     if result.osm_object is not None:
108         out.keyval('osm_type', result.osm_object[0])\
109            .keyval('osm_id', result.osm_object[1])
110
111     out.keyval('category', result.category[0])\
112          .keyval('type', result.category[1])\
113          .keyval('admin_level', result.admin_level)\
114          .keyval('localname', result.locale_name or '')\
115          .keyval('names', result.names or {})\
116          .keyval('addresstags', result.address or {})\
117          .keyval_not_none('housenumber', result.housenumber)\
118          .keyval_not_none('calculated_postcode', result.postcode)\
119          .keyval_not_none('country_code', result.country_code)\
120          .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
121          .keyval_not_none('importance', result.importance)\
122          .keyval('calculated_importance', result.calculated_importance())\
123          .keyval('extratags', result.extratags or {})\
124          .keyval_not_none('calculated_wikipedia', result.wikipedia)\
125          .keyval('rank_address', result.rank_address)\
126          .keyval('rank_search', result.rank_search)\
127          .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
128          .key('centroid').raw(centroid).next()\
129          .key('geometry').raw(geom or centroid).next()
130
131     if options.get('icon_base_url', None):
132         icon = ICONS.get(result.category)
133         if icon:
134             out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
135
136     if result.address_rows is not None:
137         _add_address_rows(out, 'address', result.address_rows, locales)
138
139     if result.linked_rows is not None:
140         _add_address_rows(out, 'linked_places', result.linked_rows, locales)
141
142     if result.name_keywords is not None or result.address_keywords is not None:
143         out.key('keywords').start_object()
144
145         for sec, klist in (('name', result.name_keywords), ('address', result.address_keywords)):
146             out.key(sec).start_array()
147             for word in (klist or []):
148                 out.start_object()\
149                      .keyval('id', word.word_id)\
150                      .keyval('token', word.word_token)\
151                    .end_object().next()
152             out.end_array().next()
153
154         out.end_object().next()
155
156     if result.parented_rows is not None:
157         if options.get('group_hierarchy', False):
158             _add_parent_rows_grouped(out, result.parented_rows, locales)
159         else:
160             _add_address_rows(out, 'hierarchy', result.parented_rows, locales)
161
162     out.end_object()
163
164     return out()
165
166
167 @dispatch.format_func(napi.ReverseResults, 'xml')
168 def _format_reverse_xml(results: napi.ReverseResults, options: Mapping[str, Any]) -> str:
169     return format_xml.format_base_xml(results,
170                                       options, True, 'reversegeocode',
171                                       {'querystring': options.get('query', '')})
172
173
174 @dispatch.format_func(napi.ReverseResults, 'geojson')
175 def _format_reverse_geojson(results: napi.ReverseResults,
176                             options: Mapping[str, Any]) -> str:
177     return format_json.format_base_geojson(results, options, True)
178
179
180 @dispatch.format_func(napi.ReverseResults, 'geocodejson')
181 def _format_reverse_geocodejson(results: napi.ReverseResults,
182                                 options: Mapping[str, Any]) -> str:
183     return format_json.format_base_geocodejson(results, options, True)
184
185
186 @dispatch.format_func(napi.ReverseResults, 'json')
187 def _format_reverse_json(results: napi.ReverseResults,
188                          options: Mapping[str, Any]) -> str:
189     return format_json.format_base_json(results, options, True,
190                                         class_label='class')
191
192
193 @dispatch.format_func(napi.ReverseResults, 'jsonv2')
194 def _format_reverse_jsonv2(results: napi.ReverseResults,
195                            options: Mapping[str, Any]) -> str:
196     return format_json.format_base_json(results, options, True,
197                                         class_label='category')
198
199
200 @dispatch.format_func(napi.SearchResults, 'xml')
201 def _format_search_xml(results: napi.SearchResults, options: Mapping[str, Any]) -> str:
202     extra = {'querystring': options.get('query', '')}
203     for attr in ('more_url', 'exclude_place_ids', 'viewbox'):
204         if options.get(attr):
205             extra[attr] = options[attr]
206     return format_xml.format_base_xml(results, options, False, 'searchresults',
207                                       extra)
208
209
210
211 @dispatch.format_func(napi.SearchResults, 'geojson')
212 def _format_search_geojson(results: napi.SearchResults,
213                             options: Mapping[str, Any]) -> str:
214     return format_json.format_base_geojson(results, options, False)
215
216
217 @dispatch.format_func(napi.SearchResults, 'geocodejson')
218 def _format_search_geocodejson(results: napi.SearchResults,
219                                 options: Mapping[str, Any]) -> str:
220     return format_json.format_base_geocodejson(results, options, False)
221
222
223 @dispatch.format_func(napi.SearchResults, 'json')
224 def _format_search_json(results: napi.SearchResults,
225                          options: Mapping[str, Any]) -> str:
226     return format_json.format_base_json(results, options, False,
227                                         class_label='class')
228
229
230 @dispatch.format_func(napi.SearchResults, 'jsonv2')
231 def _format_search_jsonv2(results: napi.SearchResults,
232                            options: Mapping[str, Any]) -> str:
233     return format_json.format_base_json(results, options, False,
234                                         class_label='category')