]> git.openstreetmap.org Git - nominatim.git/blob - src/nominatim_api/v1/format.py
switch instruction to use wikimedia CSV file
[nominatim.git] / src / nominatim_api / v1 / format.py
1 # SPDX-License-Identifier: GPL-3.0-or-later
2 #
3 # This file is part of Nominatim. (https://nominatim.org)
4 #
5 # Copyright (C) 2024 by the Nominatim developer community.
6 # For a full list of authors see the git log.
7 """
8 Output formatters for API version v1.
9 """
10 from typing import List, Dict, Mapping, Any
11 import collections
12 import datetime as dt
13
14 from ..utils.json_writer import JsonWriter
15 from ..status import StatusResult
16 from ..results import DetailedResult, ReverseResults, SearchResults, \
17                       AddressLines, AddressLine
18 from ..localization import Locales
19 from ..result_formatting import FormatDispatcher
20 from .classtypes import ICONS
21 from . import format_json, format_xml
22
23 class RawDataList(List[Dict[str, Any]]):
24     """ Data type for formatting raw data lists 'as is' in json.
25     """
26
27 dispatch = FormatDispatcher()
28
29 @dispatch.format_func(StatusResult, 'text')
30 def _format_status_text(result: StatusResult, _: Mapping[str, Any]) -> str:
31     if result.status:
32         return f"ERROR: {result.message}"
33
34     return 'OK'
35
36
37 @dispatch.format_func(StatusResult, 'json')
38 def _format_status_json(result: StatusResult, _: Mapping[str, Any]) -> str:
39     out = JsonWriter()
40
41     out.start_object()\
42          .keyval('status', result.status)\
43          .keyval('message', result.message)\
44          .keyval_not_none('data_updated', result.data_updated,
45                           lambda v: v.isoformat())\
46          .keyval('software_version', str(result.software_version))\
47          .keyval_not_none('database_version', result.database_version, str)\
48        .end_object()
49
50     return out()
51
52
53 def _add_address_row(writer: JsonWriter, row: AddressLine,
54                      locales: Locales) -> None:
55     writer.start_object()\
56             .keyval('localname', locales.display_name(row.names))\
57             .keyval_not_none('place_id', row.place_id)
58
59     if row.osm_object is not None:
60         writer.keyval('osm_id', row.osm_object[1])\
61               .keyval('osm_type', row.osm_object[0])
62
63     if row.extratags:
64         writer.keyval_not_none('place_type', row.extratags.get('place_type'))
65
66     writer.keyval('class', row.category[0])\
67           .keyval('type', row.category[1])\
68           .keyval_not_none('admin_level', row.admin_level)\
69           .keyval('rank_address', row.rank_address)\
70           .keyval('distance', row.distance)\
71           .keyval('isaddress', row.isaddress)\
72         .end_object()
73
74
75 def _add_address_rows(writer: JsonWriter, section: str, rows: AddressLines,
76                       locales: Locales) -> None:
77     writer.key(section).start_array()
78     for row in rows:
79         _add_address_row(writer, row, locales)
80         writer.next()
81     writer.end_array().next()
82
83
84 def _add_parent_rows_grouped(writer: JsonWriter, rows: AddressLines,
85                              locales: Locales) -> None:
86     # group by category type
87     data = collections.defaultdict(list)
88     for row in rows:
89         sub = JsonWriter()
90         _add_address_row(sub, row, locales)
91         data[row.category[1]].append(sub())
92
93     writer.key('hierarchy').start_object()
94     for group, grouped in data.items():
95         writer.key(group).start_array()
96         grouped.sort() # sorts alphabetically by local name
97         for line in grouped:
98             writer.raw(line).next()
99         writer.end_array().next()
100
101     writer.end_object().next()
102
103
104 @dispatch.format_func(DetailedResult, 'json')
105 def _format_details_json(result: DetailedResult, options: Mapping[str, Any]) -> str:
106     locales = options.get('locales', Locales())
107     geom = result.geometry.get('geojson')
108     centroid = result.centroid.to_geojson()
109
110     out = JsonWriter()
111     out.start_object()\
112          .keyval_not_none('place_id', result.place_id)\
113          .keyval_not_none('parent_place_id', result.parent_place_id)
114
115     if result.osm_object is not None:
116         out.keyval('osm_type', result.osm_object[0])\
117            .keyval('osm_id', result.osm_object[1])
118
119     out.keyval('category', result.category[0])\
120          .keyval('type', result.category[1])\
121          .keyval('admin_level', result.admin_level)\
122          .keyval('localname', result.locale_name or '')\
123          .keyval('names', result.names or {})\
124          .keyval('addresstags', result.address or {})\
125          .keyval_not_none('housenumber', result.housenumber)\
126          .keyval_not_none('calculated_postcode', result.postcode)\
127          .keyval_not_none('country_code', result.country_code)\
128          .keyval_not_none('indexed_date', result.indexed_date, lambda v: v.isoformat())\
129          .keyval_not_none('importance', result.importance)\
130          .keyval('calculated_importance', result.calculated_importance())\
131          .keyval('extratags', result.extratags or {})\
132          .keyval_not_none('calculated_wikipedia', result.wikipedia)\
133          .keyval('rank_address', result.rank_address)\
134          .keyval('rank_search', result.rank_search)\
135          .keyval('isarea', 'Polygon' in (geom or result.geometry.get('type') or ''))\
136          .key('centroid').raw(centroid).next()\
137          .key('geometry').raw(geom or centroid).next()
138
139     if options.get('icon_base_url', None):
140         icon = ICONS.get(result.category)
141         if icon:
142             out.keyval('icon', f"{options['icon_base_url']}/{icon}.p.20.png")
143
144     if result.address_rows is not None:
145         _add_address_rows(out, 'address', result.address_rows, locales)
146
147     if result.linked_rows:
148         _add_address_rows(out, 'linked_places', result.linked_rows, locales)
149
150     if result.name_keywords is not None or result.address_keywords is not None:
151         out.key('keywords').start_object()
152
153         for sec, klist in (('name', result.name_keywords), ('address', result.address_keywords)):
154             out.key(sec).start_array()
155             for word in (klist or []):
156                 out.start_object()\
157                      .keyval('id', word.word_id)\
158                      .keyval('token', word.word_token)\
159                    .end_object().next()
160             out.end_array().next()
161
162         out.end_object().next()
163
164     if result.parented_rows is not None:
165         if options.get('group_hierarchy', False):
166             _add_parent_rows_grouped(out, result.parented_rows, locales)
167         else:
168             _add_address_rows(out, 'hierarchy', result.parented_rows, locales)
169
170     out.end_object()
171
172     return out()
173
174
175 @dispatch.format_func(ReverseResults, 'xml')
176 def _format_reverse_xml(results: ReverseResults, options: Mapping[str, Any]) -> str:
177     return format_xml.format_base_xml(results,
178                                       options, True, 'reversegeocode',
179                                       {'querystring': options.get('query', '')})
180
181
182 @dispatch.format_func(ReverseResults, 'geojson')
183 def _format_reverse_geojson(results: ReverseResults,
184                             options: Mapping[str, Any]) -> str:
185     return format_json.format_base_geojson(results, options, True)
186
187
188 @dispatch.format_func(ReverseResults, 'geocodejson')
189 def _format_reverse_geocodejson(results: ReverseResults,
190                                 options: Mapping[str, Any]) -> str:
191     return format_json.format_base_geocodejson(results, options, True)
192
193
194 @dispatch.format_func(ReverseResults, 'json')
195 def _format_reverse_json(results: ReverseResults,
196                          options: Mapping[str, Any]) -> str:
197     return format_json.format_base_json(results, options, True,
198                                         class_label='class')
199
200
201 @dispatch.format_func(ReverseResults, 'jsonv2')
202 def _format_reverse_jsonv2(results: ReverseResults,
203                            options: Mapping[str, Any]) -> str:
204     return format_json.format_base_json(results, options, True,
205                                         class_label='category')
206
207
208 @dispatch.format_func(SearchResults, 'xml')
209 def _format_search_xml(results: SearchResults, options: Mapping[str, Any]) -> str:
210     extra = {'querystring': options.get('query', '')}
211     for attr in ('more_url', 'exclude_place_ids', 'viewbox'):
212         if options.get(attr):
213             extra[attr] = options[attr]
214     return format_xml.format_base_xml(results, options, False, 'searchresults',
215                                       extra)
216
217
218
219 @dispatch.format_func(SearchResults, 'geojson')
220 def _format_search_geojson(results: SearchResults,
221                             options: Mapping[str, Any]) -> str:
222     return format_json.format_base_geojson(results, options, False)
223
224
225 @dispatch.format_func(SearchResults, 'geocodejson')
226 def _format_search_geocodejson(results: SearchResults,
227                                 options: Mapping[str, Any]) -> str:
228     return format_json.format_base_geocodejson(results, options, False)
229
230
231 @dispatch.format_func(SearchResults, 'json')
232 def _format_search_json(results: SearchResults,
233                          options: Mapping[str, Any]) -> str:
234     return format_json.format_base_json(results, options, False,
235                                         class_label='class')
236
237
238 @dispatch.format_func(SearchResults, 'jsonv2')
239 def _format_search_jsonv2(results: SearchResults,
240                            options: Mapping[str, Any]) -> str:
241     return format_json.format_base_json(results, options, False,
242                                         class_label='category')
243
244 @dispatch.format_func(RawDataList, 'json')
245 def _format_raw_data_json(results: RawDataList,  _: Mapping[str, Any]) -> str:
246     out = JsonWriter()
247     out.start_array()
248     for res in results:
249         out.start_object()
250         for k, v in res.items():
251             if isinstance(v, dt.datetime):
252                 out.keyval(k, v.isoformat(sep= ' ', timespec='seconds'))
253             else:
254                 out.keyval(k, v)
255         out.end_object().next()
256
257     out.end_array()
258
259     return out()