]> git.openstreetmap.org Git - nominatim.git/blob - nominatim/api/v1/format.py
avoid lambda SQL in connection with alias tables
[nominatim.git] / nominatim / api / v1 / format.py
1 # SPDX-License-Identifier: GPL-2.0-only
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2023 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Output formatters for API version v1.
9 """
10 from typing import List, Dict, Mapping, Any
11 import collections
12 import datetime as dt
13
14 import nominatim.api as napi
15 from nominatim.api.result_formatting import FormatDispatcher
16 from nominatim.api.v1.classtypes import ICONS
17 from nominatim.api.v1 import format_json, format_xml
18 from nominatim.utils.json_writer import JsonWriter
19
20 class RawDataList(List[Dict[str, Any]]):
21     """ Data type for formatting raw data lists 'as is' in json.
22     """
23
24 dispatch = FormatDispatcher()
25
26 @dispatch.format_func(napi.StatusResult, 'text')
27 def _format_status_text(result: napi.StatusResult, _: Mapping[str, Any]) -> str:
28     if result.status:
29         return f"ERROR: {result.message}"
30
31     return 'OK'
32
33
34 @dispatch.format_func(napi.StatusResult, 'json')
35 def _format_status_json(result: napi.StatusResult, _: Mapping[str, Any]) -> str:
36     out = JsonWriter()
37
38     out.start_object()\
39          .keyval('status', result.status)\
40          .keyval('message', result.message)\
41          .keyval_not_none('data_updated', result.data_updated,
42                           lambda v: v.isoformat())\
43          .keyval('software_version', str(result.software_version))\
44          .keyval_not_none('database_version', result.database_version, str)\
45        .end_object()
46
47     return out()
48
49
50 def _add_address_row(writer: JsonWriter, row: napi.AddressLine,
51                      locales: napi.Locales) -> None:
52     writer.start_object()\
53             .keyval('localname', locales.display_name(row.names))\
54             .keyval_not_none('place_id', row.place_id)
55
56     if row.osm_object is not None:
57         writer.keyval('osm_id', row.osm_object[1])\
58               .keyval('osm_type', row.osm_object[0])
59
60     if row.extratags:
61         writer.keyval_not_none('place_type', row.extratags.get('place_type'))
62
63     writer.keyval('class', row.category[0])\
64           .keyval('type', row.category[1])\
65           .keyval_not_none('admin_level', row.admin_level)\
66           .keyval('rank_address', row.rank_address)\
67           .keyval('distance', row.distance)\
68           .keyval('isaddress', row.isaddress)\
69         .end_object()
70
71
72 def _add_address_rows(writer: JsonWriter, section: str, rows: napi.AddressLines,
73                       locales: napi.Locales) -> None:
74     writer.key(section).start_array()
75     for row in rows:
76         _add_address_row(writer, row, locales)
77         writer.next()
78     writer.end_array().next()
79
80
81 def _add_parent_rows_grouped(writer: JsonWriter, rows: napi.AddressLines,
82                              locales: napi.Locales) -> None:
83     # group by category type
84     data = collections.defaultdict(list)
85     for row in rows:
86         sub = JsonWriter()
87         _add_address_row(sub, row, locales)
88         data[row.category[1]].append(sub())
89
90     writer.key('hierarchy').start_object()
91     for group, grouped in data.items():
92         writer.key(group).start_array()
93         grouped.sort() # sorts alphabetically by local name
94         for line in grouped:
95             writer.raw(line).next()
96         writer.end_array().next()
97
98     writer.end_object().next()
99
100
101 @dispatch.format_func(napi.DetailedResult, 'json')
102 def _format_details_json(result: napi.DetailedResult, options: Mapping[str, Any]) -> str:
103     locales = options.get('locales', napi.Locales())
104     geom = result.geometry.get('geojson')
105     centroid = result.centroid.to_geojson()
106
107     out = JsonWriter()
108     out.start_object()\
109          .keyval_not_none('place_id', result.place_id)\
110          .keyval_not_none('parent_place_id', result.parent_place_id)
111
112     if result.osm_object is not None:
113         out.keyval('osm_type', result.osm_object[0])\
114            .keyval('osm_id', result.osm_object[1])
115
116     out.keyval('category', result.category[0])\
117          .keyval('type', result.category[1])\
118          .keyval('admin_level', result.admin_level)\
119          .keyval('localname', result.locale_name or '')\
120          .keyval('names', result.names or {})\
121          .keyval('addresstags', result.address or {})\
122          .keyval_not_none('housenumber', result.housenumber)\
123          .keyval_not_none('calculated_postcode', result.postcode)\
124          .keyval_not_none('country_code', result.country_code)\
125          .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
126          .keyval_not_none('importance', result.importance)\
127          .keyval('calculated_importance', result.calculated_importance())\
128          .keyval('extratags', result.extratags or {})\
129          .keyval_not_none('calculated_wikipedia', result.wikipedia)\
130          .keyval('rank_address', result.rank_address)\
131          .keyval('rank_search', result.rank_search)\
132          .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
133          .key('centroid').raw(centroid).next()\
134          .key('geometry').raw(geom or centroid).next()
135
136     if options.get('icon_base_url', None):
137         icon = ICONS.get(result.category)
138         if icon:
139             out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
140
141     if result.address_rows is not None:
142         _add_address_rows(out, 'address', result.address_rows, locales)
143
144     if result.linked_rows is not None:
145         _add_address_rows(out, 'linked_places', result.linked_rows, locales)
146
147     if result.name_keywords is not None or result.address_keywords is not None:
148         out.key('keywords').start_object()
149
150         for sec, klist in (('name', result.name_keywords), ('address', result.address_keywords)):
151             out.key(sec).start_array()
152             for word in (klist or []):
153                 out.start_object()\
154                      .keyval('id', word.word_id)\
155                      .keyval('token', word.word_token)\
156                    .end_object().next()
157             out.end_array().next()
158
159         out.end_object().next()
160
161     if result.parented_rows is not None:
162         if options.get('group_hierarchy', False):
163             _add_parent_rows_grouped(out, result.parented_rows, locales)
164         else:
165             _add_address_rows(out, 'hierarchy', result.parented_rows, locales)
166
167     out.end_object()
168
169     return out()
170
171
172 @dispatch.format_func(napi.ReverseResults, 'xml')
173 def _format_reverse_xml(results: napi.ReverseResults, options: Mapping[str, Any]) -> str:
174     return format_xml.format_base_xml(results,
175                                       options, True, 'reversegeocode',
176                                       {'querystring': options.get('query', '')})
177
178
179 @dispatch.format_func(napi.ReverseResults, 'geojson')
180 def _format_reverse_geojson(results: napi.ReverseResults,
181                             options: Mapping[str, Any]) -> str:
182     return format_json.format_base_geojson(results, options, True)
183
184
185 @dispatch.format_func(napi.ReverseResults, 'geocodejson')
186 def _format_reverse_geocodejson(results: napi.ReverseResults,
187                                 options: Mapping[str, Any]) -> str:
188     return format_json.format_base_geocodejson(results, options, True)
189
190
191 @dispatch.format_func(napi.ReverseResults, 'json')
192 def _format_reverse_json(results: napi.ReverseResults,
193                          options: Mapping[str, Any]) -> str:
194     return format_json.format_base_json(results, options, True,
195                                         class_label='class')
196
197
198 @dispatch.format_func(napi.ReverseResults, 'jsonv2')
199 def _format_reverse_jsonv2(results: napi.ReverseResults,
200                            options: Mapping[str, Any]) -> str:
201     return format_json.format_base_json(results, options, True,
202                                         class_label='category')
203
204
205 @dispatch.format_func(napi.SearchResults, 'xml')
206 def _format_search_xml(results: napi.SearchResults, options: Mapping[str, Any]) -> str:
207     extra = {'querystring': options.get('query', '')}
208     for attr in ('more_url', 'exclude_place_ids', 'viewbox'):
209         if options.get(attr):
210             extra[attr] = options[attr]
211     return format_xml.format_base_xml(results, options, False, 'searchresults',
212                                       extra)
213
214
215
216 @dispatch.format_func(napi.SearchResults, 'geojson')
217 def _format_search_geojson(results: napi.SearchResults,
218                             options: Mapping[str, Any]) -> str:
219     return format_json.format_base_geojson(results, options, False)
220
221
222 @dispatch.format_func(napi.SearchResults, 'geocodejson')
223 def _format_search_geocodejson(results: napi.SearchResults,
224                                 options: Mapping[str, Any]) -> str:
225     return format_json.format_base_geocodejson(results, options, False)
226
227
228 @dispatch.format_func(napi.SearchResults, 'json')
229 def _format_search_json(results: napi.SearchResults,
230                          options: Mapping[str, Any]) -> str:
231     return format_json.format_base_json(results, options, False,
232                                         class_label='class')
233
234
235 @dispatch.format_func(napi.SearchResults, 'jsonv2')
236 def _format_search_jsonv2(results: napi.SearchResults,
237                            options: Mapping[str, Any]) -> str:
238     return format_json.format_base_json(results, options, False,
239                                         class_label='category')
240
241 @dispatch.format_func(RawDataList, 'json')
242 def _format_raw_data_json(results: RawDataList,  _: Mapping[str, Any]) -> str:
243     out = JsonWriter()
244     out.start_array()
245     for res in results:
246         out.start_object()
247         for k, v in res.items():
248             if isinstance(v, dt.datetime):
249                 out.keyval(k, v.isoformat(sep= ' ', timespec='seconds'))
250             else:
251                 out.keyval(k, v)
252         out.end_object().next()
253
254     out.end_array()
255
256     return out()